Python - Lightweight database application layer - Quick and lightweight solution if you are doing prototypes - NO need to set up any DBMS and you will have NoSQL database

Do you want to implement a quick prototype application and you don't want to be spending time setting up your choice of DBMS because the database aspect of your prototype is not the main focus?

If yes, then you can easily emulate the concept of NoSQL DBMS using just plain flat files and that can be easily abstracted from your application. For example, all your application knows is that it is saving a customer record somewhere or it saving an order record somewhere and your application can easily retrieve the information by passing the unique id for that table. You just need to specify the unique ids and the table name and everything else just work. That's all.

Below is a Python class that contains what you need. You just need import this file in your code and you will be able to save information into your simple database and get information from your simple database. The main() function contains a few use cases where I am doing the following:


  • getting a specific "customer" record from database
  • saving a specific customer record to database
  • getting a specific "order" record from database
  • saving a specific "order" record to database

If you look at the signature on my functions, I am assuming that customer id is something you would have in all your tables. That's why for the "order" table, I am passing both the customer id and the order id into the function. 

The class has a member variable that allows you specify if you want to keep the history of old records just in case.

Here is what my database folder contains after running this Python code a few times:




NOTE:
This is something that I put together very quickly. If you like the idea, you can clean it up and use it for your prototyping.

#!/usr/bin/python
import os
import shutil
import datetime


############################################################
def main():
    my_db_instance = AMDatabase("./am_database", False)

    # Try 1
    result = my_db_instance.get_info("customer", "100001", "")
    if result == "-1":
        print "*** NOT found customer"
    else:
        print "*** customer found"

    # Try 2
    customer_info = result
    is_good = my_db_instance.save_info("customer", "100001", "", customer_info)
    if is_good:
        print "*** save successful"
    else:
        print ":::ERROR: save failed"

    # Try 3
    result2 = my_db_instance.get_info("customer", "100002", "")
    customer_info2 = result2
    is_good = my_db_instance.save_info("customer", "100002", "", customer_info2)
    if is_good:
        print "*** save successful"
    else:
        print ":::ERROR: save failed"

    # Try 4
    my_db_instance.save_info("customer", "100003", "", '{"customer_id": "100003"}')

    # Try 5
    my_db_instance.save_info("order", "100003", "1000001", '{"order_id": "1000001", "customer_id": "100003"}')
    order_info = my_db_instance.get_info("order", "100003", "1000001")
    print "order_info: " + order_info



############################################################
class AMDatabase:

    def __init__(self, database_folder_location, if_save_history):
        self.database_folder_location = database_folder_location
        self.if_save_history = if_save_history

    ############################################################
    def get_info(self, table_name, customer_id, table_unique_id):
        # Search through the file system with the file following the naming convention using the unique ID.
        # Read that file and return the JSON string from that file back to the caller.

        print "get info from db"

        DATABASE_DIR = self.database_folder_location

        if table_unique_id == "":
            file_pattern = str(table_name) + "_" + str(customer_id)
        else:
            file_pattern = str(table_name) + "_" + str(customer_id) + "_" + str(table_unique_id)

        is_success = True
        try:
            src_files = os.listdir(DATABASE_DIR)
            src_files.sort(reverse=True)
            for file_name in src_files:
                full_file_name = os.path.join(DATABASE_DIR, file_name)
                if os.path.isfile(full_file_name) and file_name.find(file_pattern) > -1:
                    print "full_file_name: " + full_file_name

                    # Read the file and get the JSON string
                    myfile = open(full_file_name)
                    file_content = myfile.read()
                    myfile.close()

                    print "file content is:" + str(file_content)

                    break
            is_success = True
        except:
            print ":::ERROR: Failed to get info"
            is_success = False
        if is_success:
            return file_content
        else:
            return "-1"


    ############################################################
    def save_info(self, table_name, customer_id, table_unique_id, data):
        # First need to check if this is an existing customer to find out if to do insert or update
        # If there are no files with that ID on the file system, then it is an insert
        # if there are some files with that ID on the file system, then it is an update so that means
        # that I have to rename the old files and ONLY keep the new file with proper naming convention.

        print "save info to db"

        DATABASE_DIR = self.database_folder_location
        existing_full_file_name = ""

        if table_unique_id == "":
            file_pattern = str(table_name) + "_" + str(customer_id)
        else:
            file_pattern = str(table_name) + "_" + str(customer_id) + "_" + str(table_unique_id)

        is_success = True
        try:
            src_files = os.listdir(DATABASE_DIR)
            src_files.sort(reverse=True)
            for file_name in src_files:
                full_file_name = os.path.join(DATABASE_DIR, file_name)
                if os.path.isfile(full_file_name) and file_name.find(file_pattern) > -1:
                    print "full_file_name: " + full_file_name
                    existing_full_file_name = full_file_name
                    break
            now = datetime.datetime.now()
            output_file_name = file_pattern + "_" + str(now.year) + "_" + str(now.month).zfill(2) + "_" + str(now.day).zfill(2) + "_" + str(now.hour).zfill(2) + "_" + str(now.minute).zfill(2) + "_" + str(now.second).zfill(2) + "_" + str(now.microsecond) + ".json"
            output_full_file_name = os.path.join(DATABASE_DIR, output_file_name)
            myoutput_file = open(output_full_file_name, "w")
            myoutput_file.write(data + "\n")
            myoutput_file.close()

            if existing_full_file_name != "":
                if self.if_save_history:
                    os.rename(existing_full_file_name, existing_full_file_name + ".old")
                else:
                    os.remove(existing_full_file_name)

            is_success = True
        except:
            print ":::ERROR: Failed save info"
            is_success = False
        return is_success

    ############################################################

if __name__ == "__main__": main()


- almirsCorner.com -

#python #programming #code #coding #software #softwaredeveloper #softwareengineering #programmer

No comments:

Post a Comment