HOME | Blog | YouTube | LinkedIn | About Me         || Calculators    | SoftEng/Tech Posts    | Code/Scripts

Python - Lightweight Database Application Layer for your prototyping needs - Version 2 (IMPROVED)

Do you want to implement a quick prototype application and you don't want to be spending time setting up your choice of DBMS because the database aspect of your prototype is not the main focus?

If yes, then you can easily emulate the concept of NoSQL DBMS using just plain flat files and that can be easily abstracted from your application. For example, all your application knows is that it is saving a customer record somewhere or it saving an order record somewhere and your application can easily retrieve the information by passing the unique id for that table. You just need to specify the unique ids and the table name and everything else just work. That's all.

Below is a Python class that contains what you need. You just need import this file in your code and you will be able to save information into your simple database and get information from your simple database. The main() function contains a few use cases where I am doing the following:


  • getting a specific "customer" record from database
  • saving a specific customer record to database
  • getting a specific "order" record from database
  • saving a specific "order" record to database

If you look at the signature on my functions, I am assuming that customer id is something you would have in all your tables. That's why for the "order" table, I am passing both the customer id and the order id into the function. 

The class has a member variable that allows you specify if you want to keep the history of old records just in case.


#!/usr/bin/python

import os
import shutil
import datetime


############################################################

def main():
    # test1()
    test2()
    # test3()


def test1():
    my_db_instance = AMDatabase("./am_database", False)

    unique_id = my_db_instance.helper_calculate_next_unique_id("customer")
    print "unique_id=" + str(unique_id)

    unique_id = my_db_instance.helper_calculate_next_unique_id("order")
    print "unique_id=" + str(unique_id)


def test2():
    my_db_instance = AMDatabase("./am_database", True)

    my_db_instance.insert_info_to_db("customer", 0, '{"customer_name": "John Smith"}')
    my_db_instance.insert_info_to_db("customer", 0, '{"customer_name": "Jane McKey"}')

    my_db_instance.insert_info_to_db("order", 100001, '{"order_product": "phone", "order_price": "12.99"}')
    my_db_instance.insert_info_to_db("order", 100001, '{"order_product": "hdmi cable", "order_price": "9.99"}')

    my_db_instance.update_info("customer", 100001, 0, '{"customer_name": "John James Smith"}')
    my_db_instance.update_info("order", 100001, 1000001, '{"order_product": "hdmi cable", "order_price": "9.99"}')

    result = my_db_instance.get_info("customer", 100001, 0)
    if result == "-1":
        print "*** NOT found customer"
    else:
        print "*** customer found"
    print "result: " + result

    result = my_db_instance.get_info("order", 100001, 1000002)
    if result == "-1":
        print "*** NOT found order"
    else:
        print "*** order found"
    print "result: " + result


def test3():
    my_db_instance = AMDatabase("./am_database", False)

    my_db_instance.update_info("customer", 100002, 0, '{"customer_name": "Jane Smith"}')
    result = my_db_instance.get_info("customer", 100002, 0)
    if result == "-1":
        print "*** NOT updated customer"
    else:
        print "*** customer data updated"
    print "result: " + result

############################################################


class AMDatabase:

    def __init__(self, database_folder_location, if_save_history):
        self.database_folder_location = database_folder_location
        self.if_save_history = if_save_history

    ############################################################

    def get_info(self, table_name, customer_id, table_unique_id):
        print "get_info method start..."

        DATABASE_DIR = self.database_folder_location
        file_content = ""

        if table_unique_id == 0:
            file_pattern = str(table_name) + "_" + str(customer_id) + "_" + str(customer_id)
        else:
            file_pattern = str(table_name) + "_" + str(customer_id) + "_" + str(table_unique_id)

        is_success = True
        try:
            src_files = os.listdir(DATABASE_DIR)
            src_files.sort(reverse=True)
            for file_name in src_files:
                full_file_name = os.path.join(DATABASE_DIR, file_name)
                if os.path.isfile(full_file_name) and file_name.find(file_pattern) > -1:
                    #print "full_file_name: " + full_file_name

                    # Read the file and get the JSON string
                    myfile = open(full_file_name)
                    file_content = myfile.read()
                    myfile.close()

                    # print "file content is:" + str(file_content)
                    break

            is_success = True
        except:
            print ":::ERROR: get_info: Failed to get info"
            is_success = False

        if is_success:
            return file_content
        else:
            return "-1"


    ############################################################

    def update_info(self, table_name, customer_id, table_unique_id, data):
        print "update_info method start..."

        DATABASE_DIR = self.database_folder_location
        existing_full_file_name = ""

        if table_unique_id == 0:
            file_pattern = str(table_name) + "_" + str(customer_id) + "_" + str(customer_id)
        else:
            file_pattern = str(table_name) + "_" + str(customer_id) + "_" + str(table_unique_id)

        is_success = True
        try:
            src_files = os.listdir(DATABASE_DIR)
            src_files.sort(reverse=True)
            for file_name in src_files:
                full_file_name = os.path.join(DATABASE_DIR, file_name)
                if os.path.isfile(full_file_name) and file_name.find(file_pattern) > -1:
                    # print "full_file_name: " + full_file_name
                    existing_full_file_name = full_file_name
                    break

            now = datetime.datetime.now()
            output_file_name = file_pattern + "_" + str(now.year) + "_" + str(now.month).zfill(2) + "_" + str(now.day).zfill(2) + "_" + str(now.hour).zfill(2) + "_" + str(now.minute).zfill(2) + "_" + str(now.second).zfill(2) + "_" + str(now.microsecond) + ".json"
            output_full_file_name = os.path.join(DATABASE_DIR, output_file_name)
            myoutput_file = open(output_full_file_name, "w")
            myoutput_file.write(data + "\n")
            myoutput_file.close()

            if existing_full_file_name != "":
                if self.if_save_history:
                    os.rename(existing_full_file_name, existing_full_file_name + ".old")
                else:
                    os.remove(existing_full_file_name)

            is_success = True
        except:
            print ":::ERROR: update_info: Failed save info"
            is_success = False

        return is_success

    ############################################################

    def insert_info_to_db(self, table_name, customer_id, data):
        print "insert_info_to_db method start..."

        DATABASE_DIR = self.database_folder_location
        existing_full_file_name = ""

        table_unique_id = self.helper_calculate_next_unique_id(table_name)

        if table_unique_id != -1:
            if customer_id == 0:
                file_pattern = str(table_name) + "_" + str(table_unique_id) + "_" + str(table_unique_id)
            else:
                file_pattern = str(table_name) + "_" + str(customer_id) + "_" + str(table_unique_id)
        else:
            return False

        is_success = True
        try:
            src_files = os.listdir(DATABASE_DIR)
            src_files.sort(reverse=True)
            for file_name in src_files:
                full_file_name = os.path.join(DATABASE_DIR, file_name)
                if os.path.isfile(full_file_name) and file_name.find(file_pattern) > -1:
                    # print "full_file_name: " + full_file_name
                    existing_full_file_name = full_file_name
                    break

            now = datetime.datetime.now()
            output_file_name = file_pattern + "_" + str(now.year) + "_" + str(now.month).zfill(2) + "_" + str(now.day).zfill(2) + "_" + str(now.hour).zfill(2) + "_" + str(now.minute).zfill(2) + "_" + str(now.second).zfill(2) + "_" + str(now.microsecond) + ".json"
            output_full_file_name = os.path.join(DATABASE_DIR, output_file_name)
            myoutput_file = open(output_full_file_name, "w")
            myoutput_file.write(data + "\n")
            myoutput_file.close()

            if existing_full_file_name != "":
                if self.if_save_history:
                    os.rename(existing_full_file_name, existing_full_file_name + ".old")
                else:
                    os.remove(existing_full_file_name)

            is_success = True
        except:
            print ":::ERROR: insert_info_to_db: Failed save info"
            is_success = False

        return is_success

    ############################################################

    def helper_calculate_next_unique_id(self, table_name):
        unique_id = 100000
        last_full_file_name = ""
        last_id = 0

        _database_dir = self.database_folder_location

        file_pattern = str(table_name) + "_"

        try:
            src_files = os.listdir(_database_dir)
            src_files.sort(reverse=True)
            for file_name in src_files:
                full_file_name = os.path.join(_database_dir, file_name)
                if os.path.isfile(full_file_name) and file_name.find(file_pattern) > -1:
                    last_full_file_name = full_file_name
                    break

            item_in_file_name = last_full_file_name.split('_')
            if len(item_in_file_name) >= 4:
                last_id = int(item_in_file_name[3])

            unique_id = last_id + 1
        except:
            print ":::ERROR: helper_calculate_next_unique_id: Failed to calculate the next unique id"
            unique_id = -1

        return unique_id

    ############################################################


if __name__ == "__main__": main()




- almirsCorner.com -

#python #programming #code #coding #software #softwaredeveloper #softwareengineering #programmer

No comments:

Post a Comment