If yes, then you can easily emulate the concept of NoSQL DBMS using just plain flat files and that can be easily abstracted from your application. For example, all your application knows is that it is saving a customer record somewhere or it saving an order record somewhere and your application can easily retrieve the information by passing the unique id for that table. You just need to specify the unique ids and the table name and everything else just work. That's all.
Below is a Python class that contains what you need. You just need import this file in your code and you will be able to save information into your simple database and get information from your simple database. The main() function contains a few use cases where I am doing the following:
- getting a specific "customer" record from database
- saving a specific customer record to database
- getting a specific "order" record from database
- saving a specific "order" record to database
If you look at the signature on my functions, I am assuming that customer id is something you would have in all your tables. That's why for the "order" table, I am passing both the customer id and the order id into the function.
The class has a member variable that allows you specify if you want to keep the history of old records just in case.
#!/usr/bin/python
import os
import shutil
import datetime
############################################################
def main():
# test1()
test2()
# test3()
def test1():
my_db_instance = AMDatabase("./am_database", False)
unique_id = my_db_instance.helper_calculate_next_unique_id("customer")
print "unique_id=" + str(unique_id)
unique_id = my_db_instance.helper_calculate_next_unique_id("order")
print "unique_id=" + str(unique_id)
def test2():
my_db_instance = AMDatabase("./am_database", True)
my_db_instance.insert_info_to_db("customer", 0, '{"customer_name": "John Smith"}')
my_db_instance.insert_info_to_db("customer", 0, '{"customer_name": "Jane McKey"}')
my_db_instance.insert_info_to_db("order", 100001, '{"order_product": "phone", "order_price": "12.99"}')
my_db_instance.insert_info_to_db("order", 100001, '{"order_product": "hdmi cable", "order_price": "9.99"}')
my_db_instance.update_info("customer", 100001, 0, '{"customer_name": "John James Smith"}')
my_db_instance.update_info("order", 100001, 1000001, '{"order_product": "hdmi cable", "order_price": "9.99"}')
result = my_db_instance.get_info("customer", 100001, 0)
if result == "-1":
print "*** NOT found customer"
else:
print "*** customer found"
print "result: " + result
result = my_db_instance.get_info("order", 100001, 1000002)
if result == "-1":
print "*** NOT found order"
else:
print "*** order found"
print "result: " + result
def test3():
my_db_instance = AMDatabase("./am_database", False)
my_db_instance.update_info("customer", 100002, 0, '{"customer_name": "Jane Smith"}')
result = my_db_instance.get_info("customer", 100002, 0)
if result == "-1":
print "*** NOT updated customer"
else:
print "*** customer data updated"
print "result: " + result
############################################################
class AMDatabase:
def __init__(self, database_folder_location, if_save_history):
self.database_folder_location = database_folder_location
self.if_save_history = if_save_history
############################################################
def get_info(self, table_name, customer_id, table_unique_id):
print "get_info method start..."
DATABASE_DIR = self.database_folder_location
file_content = ""
if table_unique_id == 0:
file_pattern = str(table_name) + "_" + str(customer_id) + "_" + str(customer_id)
else:
file_pattern = str(table_name) + "_" + str(customer_id) + "_" + str(table_unique_id)
is_success = True
try:
src_files = os.listdir(DATABASE_DIR)
src_files.sort(reverse=True)
for file_name in src_files:
full_file_name = os.path.join(DATABASE_DIR, file_name)
if os.path.isfile(full_file_name) and file_name.find(file_pattern) > -1:
#print "full_file_name: " + full_file_name
# Read the file and get the JSON string
myfile = open(full_file_name)
file_content = myfile.read()
myfile.close()
# print "file content is:" + str(file_content)
break
is_success = True
except:
print ":::ERROR: get_info: Failed to get info"
is_success = False
if is_success:
return file_content
else:
return "-1"
############################################################
def update_info(self, table_name, customer_id, table_unique_id, data):
print "update_info method start..."
DATABASE_DIR = self.database_folder_location
existing_full_file_name = ""
if table_unique_id == 0:
file_pattern = str(table_name) + "_" + str(customer_id) + "_" + str(customer_id)
else:
file_pattern = str(table_name) + "_" + str(customer_id) + "_" + str(table_unique_id)
is_success = True
try:
src_files = os.listdir(DATABASE_DIR)
src_files.sort(reverse=True)
for file_name in src_files:
full_file_name = os.path.join(DATABASE_DIR, file_name)
if os.path.isfile(full_file_name) and file_name.find(file_pattern) > -1:
# print "full_file_name: " + full_file_name
existing_full_file_name = full_file_name
break
now = datetime.datetime.now()
output_file_name = file_pattern + "_" + str(now.year) + "_" + str(now.month).zfill(2) + "_" + str(now.day).zfill(2) + "_" + str(now.hour).zfill(2) + "_" + str(now.minute).zfill(2) + "_" + str(now.second).zfill(2) + "_" + str(now.microsecond) + ".json"
output_full_file_name = os.path.join(DATABASE_DIR, output_file_name)
myoutput_file = open(output_full_file_name, "w")
myoutput_file.write(data + "\n")
myoutput_file.close()
if existing_full_file_name != "":
if self.if_save_history:
os.rename(existing_full_file_name, existing_full_file_name + ".old")
else:
os.remove(existing_full_file_name)
is_success = True
except:
print ":::ERROR: update_info: Failed save info"
is_success = False
return is_success
############################################################
def insert_info_to_db(self, table_name, customer_id, data):
print "insert_info_to_db method start..."
DATABASE_DIR = self.database_folder_location
existing_full_file_name = ""
table_unique_id = self.helper_calculate_next_unique_id(table_name)
if table_unique_id != -1:
if customer_id == 0:
file_pattern = str(table_name) + "_" + str(table_unique_id) + "_" + str(table_unique_id)
else:
file_pattern = str(table_name) + "_" + str(customer_id) + "_" + str(table_unique_id)
else:
return False
is_success = True
try:
src_files = os.listdir(DATABASE_DIR)
src_files.sort(reverse=True)
for file_name in src_files:
full_file_name = os.path.join(DATABASE_DIR, file_name)
if os.path.isfile(full_file_name) and file_name.find(file_pattern) > -1:
# print "full_file_name: " + full_file_name
existing_full_file_name = full_file_name
break
now = datetime.datetime.now()
output_file_name = file_pattern + "_" + str(now.year) + "_" + str(now.month).zfill(2) + "_" + str(now.day).zfill(2) + "_" + str(now.hour).zfill(2) + "_" + str(now.minute).zfill(2) + "_" + str(now.second).zfill(2) + "_" + str(now.microsecond) + ".json"
output_full_file_name = os.path.join(DATABASE_DIR, output_file_name)
myoutput_file = open(output_full_file_name, "w")
myoutput_file.write(data + "\n")
myoutput_file.close()
if existing_full_file_name != "":
if self.if_save_history:
os.rename(existing_full_file_name, existing_full_file_name + ".old")
else:
os.remove(existing_full_file_name)
is_success = True
except:
print ":::ERROR: insert_info_to_db: Failed save info"
is_success = False
return is_success
############################################################
def helper_calculate_next_unique_id(self, table_name):
unique_id = 100000
last_full_file_name = ""
last_id = 0
_database_dir = self.database_folder_location
file_pattern = str(table_name) + "_"
try:
src_files = os.listdir(_database_dir)
src_files.sort(reverse=True)
for file_name in src_files:
full_file_name = os.path.join(_database_dir, file_name)
if os.path.isfile(full_file_name) and file_name.find(file_pattern) > -1:
last_full_file_name = full_file_name
break
item_in_file_name = last_full_file_name.split('_')
if len(item_in_file_name) >= 4:
last_id = int(item_in_file_name[3])
unique_id = last_id + 1
except:
print ":::ERROR: helper_calculate_next_unique_id: Failed to calculate the next unique id"
unique_id = -1
return unique_id
############################################################
if __name__ == "__main__": main()
- almirsCorner.com -
#python #programming #code #coding #software #softwaredeveloper #softwareengineering #programmer
No comments:
Post a Comment