import sqlite3 import threading from typing import cast from pathlib import Path from dataclasses import dataclass from typing import Callable, ParamSpec, TypeVar, Generic import dt import utils import config from logger import LOGGER T = TypeVar('T') P = ParamSpec('P') R = TypeVar('R') @dataclass(frozen=True) class ResponseBody(Generic[T]): """The generic response body for API return.""" success: bool """True if this operation is successful, otherwise false.""" error: str """The error message provided when operation failed.""" data: T | None """The payload provided when operation successed.""" class DbException(Exception): """Error occurs when manipulating with database.""" pass def SafeDatabaseOperation(inner: Callable[P, R]) -> Callable[P, ResponseBody[R]]: def wrapper(*args, **kwargs) -> ResponseBody[R]: # extract self from args self: 'CalendarDatabase' = args[0] # get config cfg = config.get_config() with self.mutex: # try to fetching database and allocate database cursor try: db = self._get_db() self._allocate_cursor() except Exception as e: self._free_cursor() if cfg.others.debug: LOGGER.exception(e) return ResponseBody(False, str(e), None) # do real data work try: currentTime = utils.GetCurrentTimestamp() if currentTime - self.latestClean > cfg.others.auto_token_clean_duration: self.latestClean = currentTime LOGGER.info('Cleaning outdated token...') self.tokenOper_clean() result = ResponseBody(True, '', inner(*args, **kwargs)) self._free_cursor() db.commit() return result except Exception as e: self._free_cursor() db.rollback() if cfg.others.debug: LOGGER.exception(e) return ResponseBody(False, str(e), None) return wrapper class CalendarDatabase: db: sqlite3.Connection | None cursor: sqlite3.Cursor | None mutex: threading.Lock latestClean: int def __init__(self): self.db = None self.cursor = None self.mutex = threading.Lock() self.latestClean = 0 def open(self): if (self.db is not None): raise DbException('Database is already opened') cfg = config.get_config() match cfg.database.driver: case config.DatabaseDriver.SQLITE: self.db = sqlite3.connect(cast(config.SqliteDatabaseConfig, cfg.database.config).path, check_same_thread = False) self.db.execute('PRAGMA encoding = "UTF-8";') self.db.execute('PRAGMA foreign_keys = ON;') case config.DatabaseDriver.MYSQL: raise DbException('Not implemented database') case _: raise DbException('Unknow database type') def init(self, username: str, password: str): if (self.db is not None): raise DbException('Database is already opened') # establish tables cfg = config.get_config() backend_path = Path(__file__).resolve().parent backend_sql_path = backend_path / 'sql' match cfg.database.driver: case config.DatabaseDriver.SQLITE: sql_file = backend_sql_path / 'sqlite.sql' case config.DatabaseDriver.MYSQL: raise DbException('Not implemented database') case _: raise DbException('Unknow database type') self.open() db = self._get_db() self._allocate_cursor() cursor = self._get_cursor() # execute script for creating tables with open(sql_file, 'r', encoding='utf-8') as fsql: cursor.executescript(fsql.read()) # add default user in user table cursor.execute('INSERT INTO user VALUES (?, ?, ?, ?);', ( username, utils.ComputePasswordHash(password), 1, utils.GenerateSalt() )) self._free_cursor() # commit to database db.commit() def close(self): if (self.db is None): LOGGER.warning('Try to close null database.') else: self._free_cursor() self.db.close() self.db = None def _get_db(self) -> sqlite3.Connection: if (self.db is None): raise DbException('There is no opened database') else: return self.db def _allocate_cursor(self) -> None: if (self.cursor is not None): raise DbException('There is already opened database cursor') else: self.cursor = self._get_db().cursor() def _get_cursor(self) -> sqlite3.Cursor: if (self.cursor is None): raise DbException('There is no opened database cursor') else: return self.cursor def _free_cursor(self) -> None: if (self.cursor is None): LOGGER.warning('Try to free null databse cursor.') else: self.cursor.close() self.cursor = None # ======================= token related internal operation def tokenOper_clean(self): # remove outdated token cursor = self._get_cursor() cursor.execute('DELETE FROM token WHERE [token_expire_on] <= ?',(utils.GetCurrentTimestamp(), )) def tokenOper_postpone_expireOn(self, token): cursor = self._get_cursor() cursor.execute('UPDATE token SET [token_expire_on] = ? WHERE [token] = ?;', ( utils.GetTokenExpireOn(), token )) def tokenOper_check_valid(self, token): self.tokenOper_get_username(token) def tokenOper_is_admin(self, username): cursor = self._get_cursor() cursor.execute('SELECT [is_admin] FROM user WHERE [name] = ?;',(username, )) cache = cursor.fetchone()[0] return cache == 1 def tokenOper_get_username(self, token): cursor = self._get_cursor() cursor.execute('SELECT [user] FROM token WHERE [token] = ? AND [token_expire_on] > ?;',( token, utils.GetCurrentTimestamp() )) result = cursor.fetchone()[0] # need postpone expire on time self.tokenOper_postpone_expireOn(token) return result # =============================== # =============================== operation function # =============================== common @SafeDatabaseOperation def common_salt(self, username): cursor = self._get_cursor() salt = utils.GenerateSalt() cursor.execute('UPDATE user SET [salt] = ? WHERE [name] = ?;', ( salt, username )) return salt @SafeDatabaseOperation def common_login(self, username, password, clientUa, clientIp): cursor = self._get_cursor() cursor.execute('SELECT [password], [salt] FROM user WHERE [name] = ?;', (username, )) (gotten_salt, gotten_password) = cursor.fetchone() if password == utils.ComputePasswordHashWithSalt(gotten_password, gotten_salt): token = utils.GenerateToken(username) cursor.execute('UPDATE user SET [salt] = ? WHERE [name] = ?;', ( utils.GenerateSalt(), # regenerate a new slat to prevent re-login try username )) cursor.execute('INSERT INTO token VALUES (?, ?, ?, ?, ?);', ( username, token, utils.GetTokenExpireOn(), # add 2 day from now clientUa, clientIp, )) return token else: # throw a exception to indicate fail to login raise DbException('Login authentication failed') @SafeDatabaseOperation def common_webLogin(self, username, password, clientUa, clientIp): cursor = self._get_cursor() LOGGER.debug(f'WebLogin Username: {username}') LOGGER.debug(f'WebLogin Password: {password}') passwordHash = utils.ComputePasswordHash(password) LOGGER.debug(f'WebLogin Password Hash: {passwordHash}') cursor.execute('SELECT [name] FROM user WHERE [name] = ? AND [password] = ?;', (username, passwordHash)) if len(cursor.fetchall()) != 0: token = utils.GenerateToken(username) cursor.execute('INSERT INTO token VALUES (?, ?, ?, ?, ?);', ( username, token, utils.GetTokenExpireOn(), # add 2 day from now clientUa, clientIp, )) return token else: # throw a exception to indicate fail to login raise DbException('Login authentication failed') @SafeDatabaseOperation def common_logout(self, token): cursor = self._get_cursor() self.tokenOper_check_valid(token) cursor.execute('DELETE FROM token WHERE [token] = ?;', (token, )) return True @SafeDatabaseOperation def common_tokenValid(self, token): self.tokenOper_check_valid(token) return True # =============================== calendar @SafeDatabaseOperation def calendar_getFull(self, token, startDateTime, endDateTime): cursor = self._get_cursor() username = self.tokenOper_get_username(token) cursor.execute('SELECT calendar.* FROM calendar INNER JOIN collection \ ON collection.uuid = calendar.belong_to \ WHERE (collection.user = ? AND calendar.loop_date_time_end >= ? AND calendar.loop_date_time_start - (calendar.event_date_time_end - calendar.event_date_time_start) <= ?);', (username, startDateTime, endDateTime)) return cursor.fetchall() @SafeDatabaseOperation def calendar_getList(self, token, startDateTime, endDateTime): cursor = self._get_cursor() username = self.tokenOper_get_username(token) cursor.execute('SELECT calendar.uuid FROM calendar INNER JOIN collection \ ON collection.uuid = calendar.belong_to \ WHERE (collection.user = ? AND calendar.loop_date_time_end >= ? AND calendar.loop_date_time_start - (calendar.event_date_time_end - calendar.event_date_time_start) <= ?);', (username, startDateTime, endDateTime)) return tuple(map(lambda x: x[0], cursor.fetchall())) @SafeDatabaseOperation def calendar_getDetail(self, token, uuid): cursor = self._get_cursor() self.tokenOper_check_valid(token) cursor.execute('SELECT * FROM calendar WHERE [uuid] = ?;', (uuid, )) return cursor.fetchone() @SafeDatabaseOperation def calendar_update(self, token, uuid, lastChange, **optArgs): cursor = self._get_cursor() self.tokenOper_check_valid(token) # get prev data cursor.execute('SELECT * FROM calendar WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange)) analyseData = list(cursor.fetchone()) # construct update data lastupdate = utils.GenerateUUID() sqlList = [ '[last_change] = ?', ] argumentsList = [ lastupdate, ] # analyse opt arg reAnalyseLoop = False cache = optArgs.get('belongTo', None) if cache is not None: sqlList.append('[belong_to] = ?') argumentsList.append(cache) cache = optArgs.get('title', None) if cache is not None: sqlList.append('[title] = ?') argumentsList.append(cache) cache = optArgs.get('description', None) if cache is not None: sqlList.append('[description] = ?') argumentsList.append(cache) cache = optArgs.get('eventDateTimeStart', None) if cache is not None: sqlList.append('[event_date_time_start] = ?') argumentsList.append(cache) reAnalyseLoop = True analyseData[5] = cache cache = optArgs.get('eventDateTimeEnd', None) if cache is not None: sqlList.append('[event_date_time_end] = ?') argumentsList.append(cache) cache = optArgs.get('loopRules', None) if cache is not None: sqlList.append('[loop_rules] = ?') argumentsList.append(cache) reAnalyseLoop = True analyseData[8] = cache cache = optArgs.get('timezoneOffset', None) if cache is not None: sqlList.append('[timezone_offset] = ?') argumentsList.append(cache) reAnalyseLoop = True analyseData[7] = cache if reAnalyseLoop: # re-compute loop data and upload it into list sqlList.append('[loop_date_time_start] = ?') argumentsList.append(analyseData[5]) sqlList.append('[loop_date_time_end] = ?') argumentsList.append(str(dt.ResolveLoopStr( analyseData[8], analyseData[5], analyseData[7] ))) # execute argumentsList.append(uuid) cursor.execute('UPDATE calendar SET {} WHERE [uuid] = ?;'.format(', '.join(sqlList)), tuple(argumentsList)) if cursor.rowcount != 1: raise DbException('Fail to update due to no matched rows or too much rows.') return lastupdate @SafeDatabaseOperation def calendar_add(self, token, belongTo, title, description, eventDateTimeStart, eventDateTimeEnd, loopRules, timezoneOffset): cursor = self._get_cursor() self.tokenOper_check_valid(token) newuuid = utils.GenerateUUID() lastupdate = utils.GenerateUUID() # analyse loopRules and output following 2 fileds. loopDateTimeStart = eventDateTimeStart loopDateTimeEnd = dt.ResolveLoopStr(loopRules, eventDateTimeStart, timezoneOffset) cursor.execute('INSERT INTO calendar VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);', (newuuid, belongTo, title, description, lastupdate, eventDateTimeStart, eventDateTimeEnd, timezoneOffset, loopRules, loopDateTimeStart, loopDateTimeEnd)) return newuuid @SafeDatabaseOperation def calendar_delete(self, token, uuid, lastChange): cursor = self._get_cursor() self.tokenOper_check_valid(token) cursor.execute('DELETE FROM calendar WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange)) if cursor.rowcount != 1: raise DbException('Fail to delete due to no matched rows or too much rows.') return True # =============================== collection @SafeDatabaseOperation def collection_getFullOwn(self, token): cursor = self._get_cursor() username = self.tokenOper_get_username(token) cursor.execute('SELECT [uuid], [name], [last_change] FROM collection WHERE [user] = ?;', (username, )) return cursor.fetchall() @SafeDatabaseOperation def collection_getListOwn(self, token): cursor = self._get_cursor() username = self.tokenOper_get_username(token) cursor.execute('SELECT [uuid] FROM collection WHERE [user] = ?;', (username, )) return tuple(map(lambda x: x[0], cursor.fetchall())) @SafeDatabaseOperation def collection_getDetailOwn(self, token, uuid): cursor = self._get_cursor() username = self.tokenOper_get_username(token) cursor.execute('SELECT [uuid], [name], [last_change] FROM collection WHERE [user] = ? AND [uuid] = ?;', (username, uuid)) return cursor.fetchone() @SafeDatabaseOperation def collection_addOwn(self, token, newname): cursor = self._get_cursor() username = self.tokenOper_get_username(token) newuuid = utils.GenerateUUID() lastupdate = utils.GenerateUUID() cursor.execute('INSERT INTO collection VALUES (?, ?, ?, ?);', (newuuid, newname, username, lastupdate)) return newuuid @SafeDatabaseOperation def collection_updateOwn(self, token, uuid, newname, lastChange): cursor = self._get_cursor() self.tokenOper_check_valid(token) lastupdate = utils.GenerateUUID() cursor.execute('UPDATE collection SET [name] = ?, [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', ( newname, lastupdate, uuid, lastChange )) if cursor.rowcount != 1: raise DbException('Fail to update due to no matched rows or too much rows.') return lastupdate @SafeDatabaseOperation def collection_deleteOwn(self, token, uuid, lastChange): cursor = self._get_cursor() self.tokenOper_check_valid(token) cursor.execute('DELETE FROM collection WHERE [uuid] = ? AND [last_change] = ?;', ( uuid, lastChange )) if cursor.rowcount != 1: raise DbException('Fail to delete due to no matched rows or too much rows.') return True @SafeDatabaseOperation def collection_getSharing(self, token, uuid): cursor = self._get_cursor() self.tokenOper_check_valid(token) cursor.execute('SELECT [target] FROM share WHERE [uuid] = ?;', (uuid, )) return tuple(map(lambda x: x[0], cursor.fetchall())) @SafeDatabaseOperation def collection_deleteSharing(self, token, uuid, target, lastChange): cursor = self._get_cursor() self.tokenOper_check_valid(token) lastupdate = utils.GenerateUUID() cursor.execute('UPDATE collection SET [last_change] = ?, WHERE [uuid] = ? AND [last_change] = ?;', (lastupdate, uuid, lastChange)) if cursor.rowcount != 1: raise DbException('Fail to delete due to no matched rows or too much rows.') cursor.execute('DELETE FROM share WHERE [uuid] = ? AND [target] = ?;', (uuid, target)) if cursor.rowcount != 1: raise DbException('Fail to delete due to no matched rows or too much rows.') return lastupdate @SafeDatabaseOperation def collection_addSharing(self, token, uuid, target, lastChange): cursor = self._get_cursor() self.tokenOper_check_valid(token) lastupdate = utils.GenerateUUID() cursor.execute('UPDATE collection SET [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', (lastupdate, uuid, lastChange)) if cursor.rowcount != 1: raise DbException('Fail to delete due to no matched rows or too much rows.') cursor.execute('SELECT * FROM share WHERE [uuid] = ? AND [target] = ?;', (uuid, target)) if len(cursor.fetchall()) != 0: raise DbException('Fail to insert duplicated item.') cursor.execute('INSERT INTO share VALUES (?, ?);', (uuid, target)) return lastupdate @SafeDatabaseOperation def collection_getShared(self, token): cursor = self._get_cursor() username = self.tokenOper_get_username(token) cursor.execute('SELECT collection.uuid, collection.name, collection.user \ FROM share INNER JOIN collection \ ON share.uuid = collection.uuid \ WHERE share.target = ?;', (username, )) return cursor.fetchall() # =============================== todo @SafeDatabaseOperation def todo_getFull(self, token): cursor = self._get_cursor() username = self.tokenOper_get_username(token) cursor.execute('SELECT * FROM todo WHERE [belong_to] = ?;', (username, )) return cursor.fetchall() @SafeDatabaseOperation def todo_getList(self, token): cursor = self._get_cursor() username = self.tokenOper_get_username(token) cursor.execute('SELECT [uuid] FROM todo WHERE [belong_to] = ?;', (username, )) return tuple(map(lambda x: x[0], cursor.fetchall())) @SafeDatabaseOperation def todo_getDetail(self, token, uuid): cursor = self._get_cursor() username = self.tokenOper_get_username(token) cursor.execute('SELECT * FROM todo WHERE [belong_to] = ? AND [uuid] = ?;', (username, uuid)) return cursor.fetchone() @SafeDatabaseOperation def todo_add(self, token): cursor = self._get_cursor() username = self.tokenOper_get_username(token) newuuid = utils.GenerateUUID() lastupdate = utils.GenerateUUID() returnedData = ( newuuid, username, '', lastupdate, ) cursor.execute('INSERT INTO todo VALUES (?, ?, ?, ?);', returnedData) return returnedData @SafeDatabaseOperation def todo_update(self, token, uuid, data, lastChange): cursor = self._get_cursor() # check valid token self.tokenOper_check_valid(token) # update newLastChange = utils.GenerateUUID() cursor.execute('UPDATE todo SET [data] = ?, [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', ( data, newLastChange, uuid, lastChange )) if cursor.rowcount != 1: raise DbException('Fail to update due to no matched rows or too much rows.') return newLastChange @SafeDatabaseOperation def todo_delete(self, token, uuid, lastChange): cursor = self._get_cursor() # check valid token self.tokenOper_check_valid(token) # delete cursor.execute('DELETE FROM todo WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange)) if cursor.rowcount != 1: raise DbException('Fail to delete due to no matched rows or too much rows.') return True # =============================== admin @SafeDatabaseOperation def admin_get(self, token): cursor = self._get_cursor() username = self.tokenOper_get_username(token) if not self.tokenOper_is_admin(username): raise DbException('Permission denied.') cursor.execute('SELECT [name], [is_admin] FROM user;') return tuple(map(lambda x: (x[0], x[1] == 1), cursor.fetchall())) @SafeDatabaseOperation def admin_add(self, token, newname): cursor = self._get_cursor() username = self.tokenOper_get_username(token) if not self.tokenOper_is_admin(username): raise DbException('Permission denied.') newpassword = utils.ComputePasswordHash(utils.GenerateUUID()) cursor.execute('INSERT INTO user VALUES (?, ?, ?, ?);', ( newname, newpassword, 0, utils.GenerateSalt() )) return (newname, False) @SafeDatabaseOperation def admin_update(self, token, _username, **optArgs): cursor = self._get_cursor() username = self.tokenOper_get_username(token) if not self.tokenOper_is_admin(username): raise DbException('Permission denied.') # construct data sqlList = [] argumentsList = [] # analyse opt arg cache = optArgs.get('password', None) if cache is not None: sqlList.append('[password] = ?') argumentsList.append(utils.ComputePasswordHash(cache)) cache = optArgs.get('isAdmin', None) if cache is not None: sqlList.append('[is_admin] = ?') argumentsList.append(1 if cache else 0) # execute argumentsList.append(_username) cursor.execute('UPDATE user SET {} WHERE [name] = ?;'.format(', '.join(sqlList)), tuple(argumentsList)) LOGGER.debug(cache) LOGGER.debug(tuple(argumentsList)) if cursor.rowcount != 1: raise DbException('Fail to update due to no matched rows or too much rows.') return True @SafeDatabaseOperation def admin_delete(self, token, username): cursor = self._get_cursor() _username = self.tokenOper_get_username(token) if not self.tokenOper_is_admin(_username): raise DbException('Permission denied.') # delete cursor.execute('DELETE FROM user WHERE [name] = ?;', (username, )) if cursor.rowcount != 1: raise DbException('Fail to delete due to no matched rows or too much rows.') return True # =============================== profile @SafeDatabaseOperation def profile_isAdmin(self, token): cursor = self._get_cursor() username = self.tokenOper_get_username(token) return self.tokenOper_is_admin(username) @SafeDatabaseOperation def profile_changePassword(self, token, newpassword): cursor = self._get_cursor() username = self.tokenOper_get_username(token) cursor.execute('UPDATE user SET [password] = ? WHERE [name] = ?;', ( utils.ComputePasswordHash(newpassword), username )) return True @SafeDatabaseOperation def profile_getToken(self, token): cursor = self._get_cursor() username = self.tokenOper_get_username(token) cursor.execute('SELECT * FROM token WHERE [user] = ?;', ( username, )) return cursor.fetchall() @SafeDatabaseOperation def profile_deleteToken(self, token, deleteToken): cursor = self._get_cursor() _username = self.tokenOper_get_username(token) # delete cursor.execute('DELETE FROM token WHERE [user] = ? AND [token] = ?;', ( _username, deleteToken )) if cursor.rowcount != 1: raise DbException('Fail to delete due to no matched rows or too much rows.') return True