Source code for src.common.sqlite_db

# =========================================================================== #
# File    : sqlite_db.py                                                      #
# Author  : Pfesesani V. van Zyl                                              #
# =========================================================================== #

# Standard library imports
# --------------------------------------------------------------------------- #
import os
import sys
import sqlite3
import numpy as np
from dataclasses import dataclass
from .msgConfiguration import msg_wrapper
from .miscellaneousFunctions import set_table_name

# Local imports
# --------------------------------------------------------------------------- #
[docs] @dataclass class SQLiteDB: dbPath: str # name of database log: object # logger # __dict__: dict
[docs] def create_db(self): """ Create a new database. """ msg_wrapper('debug',self.log.debug,'Opened new Database') self.conn = sqlite3.connect(self.dbPath) self.c = self.conn.cursor()
[docs] def close_db(self): """ Close the database connection.""" msg_wrapper('debug',self.log.debug,'Closed new Database') if self.c: self.conn.close()
[docs] def commit_changes(self): """ Commit/save changes you have implemented to the database. """ self.conn.commit() print('Commited changes')
[docs] def create_table_stmt(self, data, tableName): """ Create table from dictionary. """ sqlStmt = "" for key, value in data.items(): # print(key,value, type(value).__name__) if type(value).__name__ == 'list': # pass data[key]=';'.join(value) # print(type(value)) # Make the filename a foreign key if key == "FILENAME": sqlStmt +=f'CREATE TABLE IF NOT EXISTS {tableName} (' idKey = sqlStmt + "id INTEGER PRIMARY KEY AUTOINCREMENT" + ", " sqlStmt = idKey + key + " " + "TEXT" + " UNIQUE , " elif isinstance(value, float): sqlStmt = sqlStmt + key + " " + "REAL" + " , " elif type(value).__name__ == "float64": sqlStmt = sqlStmt + key + " " + "REAL" + " , " elif isinstance(value, int): sqlStmt = sqlStmt + key + " " + "INTEGER" + " , " elif isinstance(value, str): sqlStmt = sqlStmt + key + " " + "TEXT" + " , " elif isinstance(value, list): sqlStmt = sqlStmt + key + " " + "TEXT" + " , " return sqlStmt[:-2] + ")"
[docs] def create_table(self, data, tableName): """ Create an sql statement to create a table.""" tableName=set_table_name(tableName,self.log).upper() sqlStmt = self.create_table_stmt(data, tableName) # print(sqlStmt) # sys.exit() # self.c.execute(sqlStmt) try: # sqlStmt = self.create_table_stmt(data, tableName) self.c.execute(sqlStmt) print('added') except: # tableName=f'_{tableName}' # sqlStmt = self.create_table_stmt(data, tableName) # self.c.execute(sqlStmt) # print(data) print('Already exists') # print('added') return tableName
[docs] def insert_into_table_stmt_with_pk(self, data, tableName): """ Insert values into table and create a primary key.""" sqlStmt = "" dataListKey = list(data.keys()) dataListKeyString = "" dataListValues = list(data.values()) dataListValueString = "" for i in range(len(data)): if i == 0: dataListKeyString = dataListKeyString + dataListKey[i] else: dataListKeyString = dataListKeyString + ", " + dataListKey[i] placeHolders = "?,"*len(data) sqlStmt = "INSERT INTO " + tableName + \ " (" + dataListKeyString + ") VALUES (" + placeHolders[:-1] + ")" return sqlStmt, dataListValues
[docs] def populate_table(self, data, tableName, key=""): """ populate a database table with values. """ try: self.create_db() except Exception: pass #print('data: ',data) sqlStmt, values = self.insert_into_table_stmt_with_pk(data, tableName) # for i in range(len(values)): # print(sqlStmt) try: self.c.execute(sqlStmt, values) except Exception as e: print("issue: ", e) self.commit_changes()
[docs] def set_database_name(self, databaseName): """ Set the name of the database. """ msg_wrapper("debug", self.log.debug, "Setup database name") if ".db" in databaseName: self.databaseName = databaseName else: self.databaseName = databaseName+".db"
[docs] def get_table_names(self,db): """Get table names from the database. Args: db (str): The name of the database Returns: table_names (list): List of table names """ msg_wrapper('debug',self.log.debug,f"Getting tables from: {db}") self.set_database_name(db) table_names = [] sql_stmt = "SELECT name FROM sqlite_master WHERE type = 'table';" try: tables = self.c.execute(sql_stmt).fetchall() except sqlite3.OperationalError: print("Failed to fetch data from the server") sys.exit() for i in range(len(tables)): if tables[i][0].startswith("data"): table_names.append(tables[i][0]) for i in range(len(tables)): if tables[i][0].startswith("sqlite_sequence"): pass else: table_names.append(tables[i][0]) return table_names
[docs] def get_rows(self, tbname): """ Get the rows in the database table. Parameters ---------- tbname : str table name Returns ------- rows: str table row list """ # print(tbname) # open the database # self.create_db() # read from selected table stmt = f"SELECT * FROM '{tbname}' ORDER BY FILENAME ASC;" # print(stmt) self.c.execute(stmt) data = self.c.fetchall() # get filenames and return them rows = [] for row in data: rows.append(row) return rows
[docs] def get_all_table_coloumns(self, table_name): """ Get coloumns of table return index, coloumn name and coloumn type """ col_ind = [] col_name = [] col_type = [] res = self.c.execute("PRAGMA table_info('%s') " % table_name).fetchall() # print(res) # sys.exit() for i in range(len(res)): col_ind.append(res[i][0]) col_name.append(res[i][1]) col_type.append(res[i][2]) return col_ind, col_name, col_type