# =========================================================================== #
# File : sqlite_db.py #
# Author : Pfesesani V. van Zyl #
# =========================================================================== #
# Standard library imports
# --------------------------------------------------------------------------- #
import os
import sys
import sqlite3
import numpy as np
from dataclasses import dataclass
from .msgConfiguration import msg_wrapper
from .miscellaneousFunctions import set_table_name
# Local imports
# --------------------------------------------------------------------------- #
[docs]
@dataclass
class SQLiteDB:
dbPath: str # name of database
log: object # logger
# __dict__: dict
[docs]
def create_db(self):
""" Create a new database. """
msg_wrapper('debug',self.log.debug,'Opened new Database')
self.conn = sqlite3.connect(self.dbPath)
self.c = self.conn.cursor()
[docs]
def close_db(self):
""" Close the database connection."""
msg_wrapper('debug',self.log.debug,'Closed new Database')
if self.c:
self.conn.close()
[docs]
def commit_changes(self):
""" Commit/save changes you have implemented to the database. """
self.conn.commit()
print('Commited changes')
[docs]
def create_table_stmt(self, data, tableName):
""" Create table from dictionary. """
sqlStmt = ""
for key, value in data.items():
# print(key,value, type(value).__name__)
if type(value).__name__ == 'list':
# pass
data[key]=';'.join(value)
# print(type(value))
# Make the filename a foreign key
if key == "FILENAME":
sqlStmt +=f'CREATE TABLE IF NOT EXISTS {tableName} ('
idKey = sqlStmt + "id INTEGER PRIMARY KEY AUTOINCREMENT" + ", "
sqlStmt = idKey + key + " " + "TEXT" + " UNIQUE , "
elif isinstance(value, float):
sqlStmt = sqlStmt + key + " " + "REAL" + " , "
elif type(value).__name__ == "float64":
sqlStmt = sqlStmt + key + " " + "REAL" + " , "
elif isinstance(value, int):
sqlStmt = sqlStmt + key + " " + "INTEGER" + " , "
elif isinstance(value, str):
sqlStmt = sqlStmt + key + " " + "TEXT" + " , "
elif isinstance(value, list):
sqlStmt = sqlStmt + key + " " + "TEXT" + " , "
return sqlStmt[:-2] + ")"
[docs]
def create_table(self, data, tableName):
""" Create an sql statement to create a table."""
tableName=set_table_name(tableName,self.log).upper()
sqlStmt = self.create_table_stmt(data, tableName)
# print(sqlStmt)
# sys.exit()
# self.c.execute(sqlStmt)
try:
# sqlStmt = self.create_table_stmt(data, tableName)
self.c.execute(sqlStmt)
print('added')
except:
# tableName=f'_{tableName}'
# sqlStmt = self.create_table_stmt(data, tableName)
# self.c.execute(sqlStmt)
# print(data)
print('Already exists')
# print('added')
return tableName
[docs]
def insert_into_table_stmt_with_pk(self, data, tableName):
""" Insert values into table and create a primary key."""
sqlStmt = ""
dataListKey = list(data.keys())
dataListKeyString = ""
dataListValues = list(data.values())
dataListValueString = ""
for i in range(len(data)):
if i == 0:
dataListKeyString = dataListKeyString + dataListKey[i]
else:
dataListKeyString = dataListKeyString + ", " + dataListKey[i]
placeHolders = "?,"*len(data)
sqlStmt = "INSERT INTO " + tableName + \
" (" + dataListKeyString + ") VALUES (" + placeHolders[:-1] + ")"
return sqlStmt, dataListValues
[docs]
def populate_table(self, data, tableName, key=""):
""" populate a database table with values. """
try:
self.create_db()
except Exception:
pass
#print('data: ',data)
sqlStmt, values = self.insert_into_table_stmt_with_pk(data, tableName)
# for i in range(len(values)):
# print(sqlStmt)
try:
self.c.execute(sqlStmt, values)
except Exception as e:
print("issue: ", e)
self.commit_changes()
[docs]
def set_database_name(self, databaseName):
""" Set the name of the database. """
msg_wrapper("debug", self.log.debug, "Setup database name")
if ".db" in databaseName:
self.databaseName = databaseName
else:
self.databaseName = databaseName+".db"
[docs]
def get_table_names(self,db):
"""Get table names from the database.
Args:
db (str): The name of the database
Returns:
table_names (list): List of table names
"""
msg_wrapper('debug',self.log.debug,f"Getting tables from: {db}")
self.set_database_name(db)
table_names = []
sql_stmt = "SELECT name FROM sqlite_master WHERE type = 'table';"
try:
tables = self.c.execute(sql_stmt).fetchall()
except sqlite3.OperationalError:
print("Failed to fetch data from the server")
sys.exit()
for i in range(len(tables)):
if tables[i][0].startswith("data"):
table_names.append(tables[i][0])
for i in range(len(tables)):
if tables[i][0].startswith("sqlite_sequence"):
pass
else:
table_names.append(tables[i][0])
return table_names
[docs]
def get_rows(self, tbname):
""" Get the rows in the database table.
Parameters
----------
tbname : str
table name
Returns
-------
rows: str
table row list
"""
# print(tbname)
# open the database
# self.create_db()
# read from selected table
stmt = f"SELECT * FROM '{tbname}' ORDER BY FILENAME ASC;"
# print(stmt)
self.c.execute(stmt)
data = self.c.fetchall()
# get filenames and return them
rows = []
for row in data:
rows.append(row)
return rows
[docs]
def get_all_table_coloumns(self, table_name):
"""
Get coloumns of table
return index, coloumn name and coloumn type
"""
col_ind = []
col_name = []
col_type = []
res = self.c.execute("PRAGMA table_info('%s') " %
table_name).fetchall()
# print(res)
# sys.exit()
for i in range(len(res)):
col_ind.append(res[i][0])
col_name.append(res[i][1])
col_type.append(res[i][2])
return col_ind, col_name, col_type