diff --git a/backend/database.py b/backend/database.py index 7e3452b..354e94b 100644 --- a/backend/database.py +++ b/backend/database.py @@ -3,33 +3,50 @@ import threading from typing import cast from pathlib import Path from dataclasses import dataclass -from typing import Any +from typing import Callable, ParamSpec, TypeVar, Generic import dt import utils import config from logger import LOGGER + +T = TypeVar('T') +P = ParamSpec('P') +R = TypeVar('R') + + @dataclass(frozen=True) -class ResponseBody: +class ResponseBody(Generic[T]): + """The generic response body for API return.""" + success: bool """True if this operation is successful, otherwise false.""" error: str """The error message provided when operation failed.""" - data: Any + data: T | None """The payload provided when operation successed.""" -def SafeDatabaseOperation(func): - def wrapper(self: 'CalendarDatabase', *args, **kwargs): + +class DbException(Exception): + """Error occurs when manipulating with database.""" + pass + + +def SafeDatabaseOperation(inner: Callable[P, R]) -> Callable[P, ResponseBody[R]]: + def wrapper(*args, **kwargs) -> ResponseBody[R]: + # extract self from args + self: 'CalendarDatabase' = args[0] + # get config cfg = config.get_config() with self.mutex: - # check database and acquire cursor + # try to fetching database and allocate database cursor try: - self.check_database() - self.cursor = self.db.cursor() + db = self._get_db() + self._allocate_cursor() except Exception as e: - self.cursor = None + self._free_cursor() if cfg.others.debug: LOGGER.exception(e) return ResponseBody(False, str(e), None) @@ -42,15 +59,13 @@ def SafeDatabaseOperation(func): LOGGER.info('Cleaning outdated token...') self.tokenOper_clean() - result = ResponseBody(True, '', func(self, *args, **kwargs)) - self.cursor.close() - self.cursor = None - self.db.commit() + result = ResponseBody(True, '', inner(*args, **kwargs)) + self._free_cursor() + db.commit() return result except Exception as e: - self.cursor.close() - self.cursor = None - self.db.rollback() + self._free_cursor() + db.rollback() if cfg.others.debug: LOGGER.exception(e) return ResponseBody(False, str(e), None) @@ -59,8 +74,8 @@ def SafeDatabaseOperation(func): class CalendarDatabase: - db: sqlite3.Connection - cursor: sqlite3.Cursor + db: sqlite3.Connection | None + cursor: sqlite3.Cursor | None mutex: threading.Lock latestClean: int @@ -71,8 +86,8 @@ class CalendarDatabase: self.latestClean = 0 def open(self): - if (self.is_database_valid()): - raise Exception('Databade is opened') + if (self.db is not None): + raise DbException('Database is already opened') cfg = config.get_config() match cfg.database.driver: @@ -81,13 +96,13 @@ class CalendarDatabase: self.db.execute('PRAGMA encoding = "UTF-8";') self.db.execute('PRAGMA foreign_keys = ON;') case config.DatabaseDriver.MYSQL: - raise Exception('Not implemented database') + raise DbException('Not implemented database') case _: - raise Exception('Unknow database type') + raise DbException('Unknow database type') - def init(self, username, password): - if (self.is_database_valid()): - raise Exception('Database is opened') + def init(self, username: str, password: str): + if (self.db is not None): + raise DbException('Database is already opened') # establish tables cfg = config.get_config() @@ -97,44 +112,74 @@ class CalendarDatabase: case config.DatabaseDriver.SQLITE: sql_file = backend_sql_path / 'sqlite.sql' case config.DatabaseDriver.MYSQL: - raise Exception('Not implemented database') + raise DbException('Not implemented database') case _: - raise Exception('Unknow database type') + raise DbException('Unknow database type') self.open() - cursor = self.db.cursor() + db = self._get_db() + + self._allocate_cursor() + cursor = self._get_cursor() + + # execute script for creating tables with open(sql_file, 'r', encoding='utf-8') as fsql: cursor.executescript(fsql.read()) - - # finish init + # add default user in user table cursor.execute('INSERT INTO user VALUES (?, ?, ?, ?);', ( username, utils.ComputePasswordHash(password), 1, utils.GenerateSalt() )) - cursor.close() - self.db.commit() + + self._free_cursor() + + # commit to database + db.commit() def close(self): - self.check_database() - self.db.close() - self.db = None + if (self.db is None): + LOGGER.warning('Try to close null database.') + else: + self._free_cursor() + self.db.close() + self.db = None - def check_database(self): - if (not self.is_database_valid()): - raise Exception('Databade is None') + def _get_db(self) -> sqlite3.Connection: + if (self.db is None): + raise DbException('There is no opened database') + else: + return self.db - def is_database_valid(self): - return not (self.db == None) + def _allocate_cursor(self) -> None: + if (self.cursor is not None): + raise DbException('There is already opened database cursor') + else: + self.cursor = self._get_db().cursor() + + def _get_cursor(self) -> sqlite3.Cursor: + if (self.cursor is None): + raise DbException('There is no opened database cursor') + else: + return self.cursor + + def _free_cursor(self) -> None: + if (self.cursor is None): + LOGGER.warning('Try to free null databse cursor.') + else: + self.cursor.close() + self.cursor = None # ======================= token related internal operation def tokenOper_clean(self): # remove outdated token - self.cursor.execute('DELETE FROM token WHERE [token_expire_on] <= ?',(utils.GetCurrentTimestamp(), )) + cursor = self._get_cursor() + cursor.execute('DELETE FROM token WHERE [token_expire_on] <= ?',(utils.GetCurrentTimestamp(), )) def tokenOper_postpone_expireOn(self, token): - self.cursor.execute('UPDATE token SET [token_expire_on] = ? WHERE [token] = ?;', ( + cursor = self._get_cursor() + cursor.execute('UPDATE token SET [token_expire_on] = ? WHERE [token] = ?;', ( utils.GetTokenExpireOn(), token )) @@ -143,16 +188,18 @@ class CalendarDatabase: self.tokenOper_get_username(token) def tokenOper_is_admin(self, username): - self.cursor.execute('SELECT [is_admin] FROM user WHERE [name] = ?;',(username, )) - cache = self.cursor.fetchone()[0] + cursor = self._get_cursor() + cursor.execute('SELECT [is_admin] FROM user WHERE [name] = ?;',(username, )) + cache = cursor.fetchone()[0] return cache == 1 def tokenOper_get_username(self, token): - self.cursor.execute('SELECT [user] FROM token WHERE [token] = ? AND [token_expire_on] > ?;',( + cursor = self._get_cursor() + cursor.execute('SELECT [user] FROM token WHERE [token] = ? AND [token_expire_on] > ?;',( token, utils.GetCurrentTimestamp() )) - result = self.cursor.fetchone()[0] + result = cursor.fetchone()[0] # need postpone expire on time self.tokenOper_postpone_expireOn(token) return result @@ -162,8 +209,9 @@ class CalendarDatabase: @SafeDatabaseOperation def common_salt(self, username): + cursor = self._get_cursor() salt = utils.GenerateSalt() - self.cursor.execute('UPDATE user SET [salt] = ? WHERE [name] = ?;', ( + cursor.execute('UPDATE user SET [salt] = ? WHERE [name] = ?;', ( salt, username )) @@ -171,16 +219,17 @@ class CalendarDatabase: @SafeDatabaseOperation def common_login(self, username, password, clientUa, clientIp): - self.cursor.execute('SELECT [password], [salt] FROM user WHERE [name] = ?;', (username, )) - (gotten_salt, gotten_password) = self.cursor.fetchone() + cursor = self._get_cursor() + cursor.execute('SELECT [password], [salt] FROM user WHERE [name] = ?;', (username, )) + (gotten_salt, gotten_password) = cursor.fetchone() if password == utils.ComputePasswordHashWithSalt(gotten_password, gotten_salt): token = utils.GenerateToken(username) - self.cursor.execute('UPDATE user SET [salt] = ? WHERE [name] = ?;', ( + cursor.execute('UPDATE user SET [salt] = ? WHERE [name] = ?;', ( utils.GenerateSalt(), # regenerate a new slat to prevent re-login try username )) - self.cursor.execute('INSERT INTO token VALUES (?, ?, ?, ?, ?);', ( + cursor.execute('INSERT INTO token VALUES (?, ?, ?, ?, ?);', ( username, token, utils.GetTokenExpireOn(), # add 2 day from now @@ -190,20 +239,21 @@ class CalendarDatabase: return token else: # throw a exception to indicate fail to login - raise Exception('Login authentication failed') + raise DbException('Login authentication failed') @SafeDatabaseOperation def common_webLogin(self, username, password, clientUa, clientIp): + cursor = self._get_cursor() LOGGER.debug(f'WebLogin Username: {username}') LOGGER.debug(f'WebLogin Password: {password}') passwordHash = utils.ComputePasswordHash(password) LOGGER.debug(f'WebLogin Password Hash: {passwordHash}') - self.cursor.execute('SELECT [name] FROM user WHERE [name] = ? AND [password] = ?;', (username, passwordHash)) + cursor.execute('SELECT [name] FROM user WHERE [name] = ? AND [password] = ?;', (username, passwordHash)) - if len(self.cursor.fetchall()) != 0: + if len(cursor.fetchall()) != 0: token = utils.GenerateToken(username) - self.cursor.execute('INSERT INTO token VALUES (?, ?, ?, ?, ?);', ( + cursor.execute('INSERT INTO token VALUES (?, ?, ?, ?, ?);', ( username, token, utils.GetTokenExpireOn(), # add 2 day from now @@ -213,12 +263,13 @@ class CalendarDatabase: return token else: # throw a exception to indicate fail to login - raise Exception('Login authentication failed') + raise DbException('Login authentication failed') @SafeDatabaseOperation def common_logout(self, token): + cursor = self._get_cursor() self.tokenOper_check_valid(token) - self.cursor.execute('DELETE FROM token WHERE [token] = ?;', (token, )) + cursor.execute('DELETE FROM token WHERE [token] = ?;', (token, )) return True @SafeDatabaseOperation @@ -229,35 +280,39 @@ class CalendarDatabase: # =============================== calendar @SafeDatabaseOperation def calendar_getFull(self, token, startDateTime, endDateTime): + cursor = self._get_cursor() username = self.tokenOper_get_username(token) - self.cursor.execute('SELECT calendar.* FROM calendar INNER JOIN collection \ + cursor.execute('SELECT calendar.* FROM calendar INNER JOIN collection \ ON collection.uuid = calendar.belong_to \ WHERE (collection.user = ? AND calendar.loop_date_time_end >= ? AND calendar.loop_date_time_start - (calendar.event_date_time_end - calendar.event_date_time_start) <= ?);', (username, startDateTime, endDateTime)) - return self.cursor.fetchall() + return cursor.fetchall() @SafeDatabaseOperation def calendar_getList(self, token, startDateTime, endDateTime): + cursor = self._get_cursor() username = self.tokenOper_get_username(token) - self.cursor.execute('SELECT calendar.uuid FROM calendar INNER JOIN collection \ + cursor.execute('SELECT calendar.uuid FROM calendar INNER JOIN collection \ ON collection.uuid = calendar.belong_to \ WHERE (collection.user = ? AND calendar.loop_date_time_end >= ? AND calendar.loop_date_time_start - (calendar.event_date_time_end - calendar.event_date_time_start) <= ?);', (username, startDateTime, endDateTime)) - return tuple(map(lambda x: x[0], self.cursor.fetchall())) + return tuple(map(lambda x: x[0], cursor.fetchall())) @SafeDatabaseOperation def calendar_getDetail(self, token, uuid): + cursor = self._get_cursor() self.tokenOper_check_valid(token) - self.cursor.execute('SELECT * FROM calendar WHERE [uuid] = ?;', (uuid, )) - return self.cursor.fetchone() + cursor.execute('SELECT * FROM calendar WHERE [uuid] = ?;', (uuid, )) + return cursor.fetchone() @SafeDatabaseOperation def calendar_update(self, token, uuid, lastChange, **optArgs): + cursor = self._get_cursor() self.tokenOper_check_valid(token) # get prev data - self.cursor.execute('SELECT * FROM calendar WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange)) - analyseData = list(self.cursor.fetchone()) + cursor.execute('SELECT * FROM calendar WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange)) + analyseData = list(cursor.fetchone()) # construct update data lastupdate = utils.GenerateUUID() @@ -319,14 +374,15 @@ class CalendarDatabase: # execute argumentsList.append(uuid) - self.cursor.execute('UPDATE calendar SET {} WHERE [uuid] = ?;'.format(', '.join(sqlList)), + cursor.execute('UPDATE calendar SET {} WHERE [uuid] = ?;'.format(', '.join(sqlList)), tuple(argumentsList)) - if self.cursor.rowcount != 1: - raise Exception('Fail to update due to no matched rows or too much rows.') + if cursor.rowcount != 1: + raise DbException('Fail to update due to no matched rows or too much rows.') return lastupdate @SafeDatabaseOperation def calendar_add(self, token, belongTo, title, description, eventDateTimeStart, eventDateTimeEnd, loopRules, timezoneOffset): + cursor = self._get_cursor() self.tokenOper_check_valid(token) newuuid = utils.GenerateUUID() @@ -336,7 +392,7 @@ class CalendarDatabase: loopDateTimeStart = eventDateTimeStart loopDateTimeEnd = dt.ResolveLoopStr(loopRules, eventDateTimeStart, timezoneOffset) - self.cursor.execute('INSERT INTO calendar VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);', + cursor.execute('INSERT INTO calendar VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);', (newuuid, belongTo, title, @@ -352,134 +408,149 @@ class CalendarDatabase: @SafeDatabaseOperation def calendar_delete(self, token, uuid, lastChange): + cursor = self._get_cursor() self.tokenOper_check_valid(token) - self.cursor.execute('DELETE FROM calendar WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange)) - if self.cursor.rowcount != 1: - raise Exception('Fail to delete due to no matched rows or too much rows.') + cursor.execute('DELETE FROM calendar WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange)) + if cursor.rowcount != 1: + raise DbException('Fail to delete due to no matched rows or too much rows.') return True # =============================== collection @SafeDatabaseOperation def collection_getFullOwn(self, token): + cursor = self._get_cursor() username = self.tokenOper_get_username(token) - self.cursor.execute('SELECT [uuid], [name], [last_change] FROM collection WHERE [user] = ?;', (username, )) - return self.cursor.fetchall() + cursor.execute('SELECT [uuid], [name], [last_change] FROM collection WHERE [user] = ?;', (username, )) + return cursor.fetchall() @SafeDatabaseOperation def collection_getListOwn(self, token): + cursor = self._get_cursor() username = self.tokenOper_get_username(token) - self.cursor.execute('SELECT [uuid] FROM collection WHERE [user] = ?;', (username, )) - return tuple(map(lambda x: x[0], self.cursor.fetchall())) + cursor.execute('SELECT [uuid] FROM collection WHERE [user] = ?;', (username, )) + return tuple(map(lambda x: x[0], cursor.fetchall())) @SafeDatabaseOperation def collection_getDetailOwn(self, token, uuid): + cursor = self._get_cursor() username = self.tokenOper_get_username(token) - self.cursor.execute('SELECT [uuid], [name], [last_change] FROM collection WHERE [user] = ? AND [uuid] = ?;', (username, uuid)) - return self.cursor.fetchone() + cursor.execute('SELECT [uuid], [name], [last_change] FROM collection WHERE [user] = ? AND [uuid] = ?;', (username, uuid)) + return cursor.fetchone() @SafeDatabaseOperation def collection_addOwn(self, token, newname): + cursor = self._get_cursor() username = self.tokenOper_get_username(token) newuuid = utils.GenerateUUID() lastupdate = utils.GenerateUUID() - self.cursor.execute('INSERT INTO collection VALUES (?, ?, ?, ?);', + cursor.execute('INSERT INTO collection VALUES (?, ?, ?, ?);', (newuuid, newname, username, lastupdate)) return newuuid @SafeDatabaseOperation def collection_updateOwn(self, token, uuid, newname, lastChange): + cursor = self._get_cursor() self.tokenOper_check_valid(token) lastupdate = utils.GenerateUUID() - self.cursor.execute('UPDATE collection SET [name] = ?, [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', ( + cursor.execute('UPDATE collection SET [name] = ?, [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', ( newname, lastupdate, uuid, lastChange )) - if self.cursor.rowcount != 1: - raise Exception('Fail to update due to no matched rows or too much rows.') + if cursor.rowcount != 1: + raise DbException('Fail to update due to no matched rows or too much rows.') return lastupdate @SafeDatabaseOperation def collection_deleteOwn(self, token, uuid, lastChange): + cursor = self._get_cursor() self.tokenOper_check_valid(token) - self.cursor.execute('DELETE FROM collection WHERE [uuid] = ? AND [last_change] = ?;', ( + cursor.execute('DELETE FROM collection WHERE [uuid] = ? AND [last_change] = ?;', ( uuid, lastChange )) - if self.cursor.rowcount != 1: - raise Exception('Fail to delete due to no matched rows or too much rows.') + if cursor.rowcount != 1: + raise DbException('Fail to delete due to no matched rows or too much rows.') return True @SafeDatabaseOperation def collection_getSharing(self, token, uuid): + cursor = self._get_cursor() self.tokenOper_check_valid(token) - self.cursor.execute('SELECT [target] FROM share WHERE [uuid] = ?;', (uuid, )) - return tuple(map(lambda x: x[0], self.cursor.fetchall())) + cursor.execute('SELECT [target] FROM share WHERE [uuid] = ?;', (uuid, )) + return tuple(map(lambda x: x[0], cursor.fetchall())) @SafeDatabaseOperation def collection_deleteSharing(self, token, uuid, target, lastChange): + cursor = self._get_cursor() self.tokenOper_check_valid(token) lastupdate = utils.GenerateUUID() - self.cursor.execute('UPDATE collection SET [last_change] = ?, WHERE [uuid] = ? AND [last_change] = ?;', (lastupdate, uuid, lastChange)) - if self.cursor.rowcount != 1: - raise Exception('Fail to delete due to no matched rows or too much rows.') + cursor.execute('UPDATE collection SET [last_change] = ?, WHERE [uuid] = ? AND [last_change] = ?;', (lastupdate, uuid, lastChange)) + if cursor.rowcount != 1: + raise DbException('Fail to delete due to no matched rows or too much rows.') - self.cursor.execute('DELETE FROM share WHERE [uuid] = ? AND [target] = ?;', (uuid, target)) - if self.cursor.rowcount != 1: - raise Exception('Fail to delete due to no matched rows or too much rows.') + cursor.execute('DELETE FROM share WHERE [uuid] = ? AND [target] = ?;', (uuid, target)) + if cursor.rowcount != 1: + raise DbException('Fail to delete due to no matched rows or too much rows.') return lastupdate @SafeDatabaseOperation def collection_addSharing(self, token, uuid, target, lastChange): + cursor = self._get_cursor() self.tokenOper_check_valid(token) lastupdate = utils.GenerateUUID() - self.cursor.execute('UPDATE collection SET [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', (lastupdate, uuid, lastChange)) - if self.cursor.rowcount != 1: - raise Exception('Fail to delete due to no matched rows or too much rows.') + cursor.execute('UPDATE collection SET [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', (lastupdate, uuid, lastChange)) + if cursor.rowcount != 1: + raise DbException('Fail to delete due to no matched rows or too much rows.') - self.cursor.execute('SELECT * FROM share WHERE [uuid] = ? AND [target] = ?;', (uuid, target)) - if len(self.cursor.fetchall()) != 0: - raise Exception('Fail to insert duplicated item.') - self.cursor.execute('INSERT INTO share VALUES (?, ?);', (uuid, target)) + cursor.execute('SELECT * FROM share WHERE [uuid] = ? AND [target] = ?;', (uuid, target)) + if len(cursor.fetchall()) != 0: + raise DbException('Fail to insert duplicated item.') + cursor.execute('INSERT INTO share VALUES (?, ?);', (uuid, target)) return lastupdate @SafeDatabaseOperation def collection_getShared(self, token): + cursor = self._get_cursor() username = self.tokenOper_get_username(token) - self.cursor.execute('SELECT collection.uuid, collection.name, collection.user \ + cursor.execute('SELECT collection.uuid, collection.name, collection.user \ FROM share INNER JOIN collection \ ON share.uuid = collection.uuid \ WHERE share.target = ?;', (username, )) - return self.cursor.fetchall() + return cursor.fetchall() # =============================== todo @SafeDatabaseOperation def todo_getFull(self, token): + cursor = self._get_cursor() username = self.tokenOper_get_username(token) - self.cursor.execute('SELECT * FROM todo WHERE [belong_to] = ?;', (username, )) - return self.cursor.fetchall() + cursor.execute('SELECT * FROM todo WHERE [belong_to] = ?;', (username, )) + return cursor.fetchall() @SafeDatabaseOperation def todo_getList(self, token): + cursor = self._get_cursor() username = self.tokenOper_get_username(token) - self.cursor.execute('SELECT [uuid] FROM todo WHERE [belong_to] = ?;', (username, )) - return tuple(map(lambda x: x[0], self.cursor.fetchall())) + cursor.execute('SELECT [uuid] FROM todo WHERE [belong_to] = ?;', (username, )) + return tuple(map(lambda x: x[0], cursor.fetchall())) @SafeDatabaseOperation def todo_getDetail(self, token, uuid): + cursor = self._get_cursor() username = self.tokenOper_get_username(token) - self.cursor.execute('SELECT * FROM todo WHERE [belong_to] = ? AND [uuid] = ?;', (username, uuid)) - return self.cursor.fetchone() + cursor.execute('SELECT * FROM todo WHERE [belong_to] = ? AND [uuid] = ?;', (username, uuid)) + return cursor.fetchone() @SafeDatabaseOperation def todo_add(self, token): + cursor = self._get_cursor() username = self.tokenOper_get_username(token) newuuid = utils.GenerateUUID() lastupdate = utils.GenerateUUID() @@ -489,56 +560,60 @@ class CalendarDatabase: '', lastupdate, ) - self.cursor.execute('INSERT INTO todo VALUES (?, ?, ?, ?);', returnedData) + cursor.execute('INSERT INTO todo VALUES (?, ?, ?, ?);', returnedData) return returnedData @SafeDatabaseOperation def todo_update(self, token, uuid, data, lastChange): + cursor = self._get_cursor() # check valid token self.tokenOper_check_valid(token) # update newLastChange = utils.GenerateUUID() - self.cursor.execute('UPDATE todo SET [data] = ?, [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', ( + cursor.execute('UPDATE todo SET [data] = ?, [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', ( data, newLastChange, uuid, lastChange )) - if self.cursor.rowcount != 1: - raise Exception('Fail to update due to no matched rows or too much rows.') + if cursor.rowcount != 1: + raise DbException('Fail to update due to no matched rows or too much rows.') return newLastChange @SafeDatabaseOperation def todo_delete(self, token, uuid, lastChange): + cursor = self._get_cursor() # check valid token self.tokenOper_check_valid(token) # delete - self.cursor.execute('DELETE FROM todo WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange)) - if self.cursor.rowcount != 1: - raise Exception('Fail to delete due to no matched rows or too much rows.') + cursor.execute('DELETE FROM todo WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange)) + if cursor.rowcount != 1: + raise DbException('Fail to delete due to no matched rows or too much rows.') return True # =============================== admin @SafeDatabaseOperation def admin_get(self, token): + cursor = self._get_cursor() username = self.tokenOper_get_username(token) if not self.tokenOper_is_admin(username): - raise Exception('Permission denied.') + raise DbException('Permission denied.') - self.cursor.execute('SELECT [name], [is_admin] FROM user;') - return tuple(map(lambda x: (x[0], x[1] == 1), self.cursor.fetchall())) + cursor.execute('SELECT [name], [is_admin] FROM user;') + return tuple(map(lambda x: (x[0], x[1] == 1), cursor.fetchall())) @SafeDatabaseOperation def admin_add(self, token, newname): + cursor = self._get_cursor() username = self.tokenOper_get_username(token) if not self.tokenOper_is_admin(username): - raise Exception('Permission denied.') + raise DbException('Permission denied.') newpassword = utils.ComputePasswordHash(utils.GenerateUUID()) - self.cursor.execute('INSERT INTO user VALUES (?, ?, ?, ?);', ( + cursor.execute('INSERT INTO user VALUES (?, ?, ?, ?);', ( newname, newpassword, 0, @@ -548,9 +623,10 @@ class CalendarDatabase: @SafeDatabaseOperation def admin_update(self, token, _username, **optArgs): + cursor = self._get_cursor() username = self.tokenOper_get_username(token) if not self.tokenOper_is_admin(username): - raise Exception('Permission denied.') + raise DbException('Permission denied.') # construct data sqlList = [] @@ -568,36 +644,39 @@ class CalendarDatabase: # execute argumentsList.append(_username) - self.cursor.execute('UPDATE user SET {} WHERE [name] = ?;'.format(', '.join(sqlList)), + cursor.execute('UPDATE user SET {} WHERE [name] = ?;'.format(', '.join(sqlList)), tuple(argumentsList)) LOGGER.debug(cache) LOGGER.debug(tuple(argumentsList)) - if self.cursor.rowcount != 1: - raise Exception('Fail to update due to no matched rows or too much rows.') + if cursor.rowcount != 1: + raise DbException('Fail to update due to no matched rows or too much rows.') return True @SafeDatabaseOperation def admin_delete(self, token, username): + cursor = self._get_cursor() _username = self.tokenOper_get_username(token) if not self.tokenOper_is_admin(_username): - raise Exception('Permission denied.') + raise DbException('Permission denied.') # delete - self.cursor.execute('DELETE FROM user WHERE [name] = ?;', (username, )) - if self.cursor.rowcount != 1: - raise Exception('Fail to delete due to no matched rows or too much rows.') + cursor.execute('DELETE FROM user WHERE [name] = ?;', (username, )) + if cursor.rowcount != 1: + raise DbException('Fail to delete due to no matched rows or too much rows.') return True # =============================== profile @SafeDatabaseOperation def profile_isAdmin(self, token): + cursor = self._get_cursor() username = self.tokenOper_get_username(token) return self.tokenOper_is_admin(username) @SafeDatabaseOperation def profile_changePassword(self, token, newpassword): + cursor = self._get_cursor() username = self.tokenOper_get_username(token) - self.cursor.execute('UPDATE user SET [password] = ? WHERE [name] = ?;', ( + cursor.execute('UPDATE user SET [password] = ? WHERE [name] = ?;', ( utils.ComputePasswordHash(newpassword), username )) @@ -605,23 +684,25 @@ class CalendarDatabase: @SafeDatabaseOperation def profile_getToken(self, token): + cursor = self._get_cursor() username = self.tokenOper_get_username(token) - self.cursor.execute('SELECT * FROM token WHERE [user] = ?;', ( + cursor.execute('SELECT * FROM token WHERE [user] = ?;', ( username, )) - return self.cursor.fetchall() + return cursor.fetchall() @SafeDatabaseOperation def profile_deleteToken(self, token, deleteToken): + cursor = self._get_cursor() _username = self.tokenOper_get_username(token) # delete - self.cursor.execute('DELETE FROM token WHERE [user] = ? AND [token] = ?;', ( + cursor.execute('DELETE FROM token WHERE [user] = ? AND [token] = ?;', ( _username, deleteToken )) - if self.cursor.rowcount != 1: - raise Exception('Fail to delete due to no matched rows or too much rows.') + if cursor.rowcount != 1: + raise DbException('Fail to delete due to no matched rows or too much rows.') return True diff --git a/backend/server.py b/backend/server.py index 56cecbf..5084940 100644 --- a/backend/server.py +++ b/backend/server.py @@ -1,12 +1,13 @@ from flask import Flask from flask import request from dataclasses import dataclass -from typing import Any, Callable +from typing import Any, Callable, ParamSpec, TypeVar, Generic import config import database import utils from logger import LOGGER +from database import ResponseBody app = Flask(__name__) calendar_db = database.CalendarDatabase() @@ -15,319 +16,427 @@ calendar_db = database.CalendarDatabase() # region: Common -@app.route('/common/salt', methods=['POST']) + +@app.route("/common/salt", methods=["POST"]) def api_common_saltHandle(): - return SmartDbCaller(calendar_db.common_salt, - (FormField('username', str, False), ), - None) + return SmartDbCaller( + calendar_db.common_salt, (FormField("username", str, False),), None + ) -@app.route('/common/login', methods=['POST']) + +@app.route("/common/login", methods=["POST"]) def api_common_loginHandle(): - # construct client data first - clientUa = request.user_agent.string - if request.headers.getlist("X-Forwarded-For"): - clientIp = request.headers.getlist("X-Forwarded-For")[0] - else: - clientIp = request.remote_addr + clientInfo = FetchClientNetworkInfo() - return SmartDbCaller(calendar_db.common_login, - (FormField('username', str, False), - FormField('password', str, False), - FormField('clientUa', str, False), - FormField('clientIp', str, False)), - { - 'clientUa': clientUa, - 'clientIp': clientIp - }) + return SmartDbCaller( + calendar_db.common_login, + ( + FormField("username", str, False), + FormField("password", str, False), + FormField("clientUa", str, False), + FormField("clientIp", str, False), + ), + {"clientUa": clientInfo.user_agent, "clientIp": clientInfo.ip_addr}, + ) -@app.route('/common/webLogin', methods=['POST']) + +@app.route("/common/webLogin", methods=["POST"]) def api_common_webLoginHandle(): - # construct client data first - clientUa = request.user_agent.string - if request.headers.getlist("X-Forwarded-For"): - clientIp = request.headers.getlist("X-Forwarded-For")[0] - else: - clientIp = request.remote_addr - - return SmartDbCaller(calendar_db.common_webLogin, - (FormField('username', str, False), - FormField('password', str, False), - FormField('clientUa', str, False), - FormField('clientIp', str, False)), - { - 'clientUa': clientUa, - 'clientIp': clientIp - }) + clientInfo = FetchClientNetworkInfo() -@app.route('/common/logout', methods=['POST']) + return SmartDbCaller( + calendar_db.common_webLogin, + ( + FormField("username", str, False), + FormField("password", str, False), + FormField("clientUa", str, False), + FormField("clientIp", str, False), + ), + {"clientUa": clientInfo.user_agent, "clientIp": clientInfo.ip_addr}, + ) + + +@app.route("/common/logout", methods=["POST"]) def api_common_logoutHandle(): - return SmartDbCaller(calendar_db.common_logout, - (FormField('token', str, False), ), - None) + return SmartDbCaller( + calendar_db.common_logout, (FormField("token", str, False),), None + ) -@app.route('/common/tokenValid', methods=['POST']) + +@app.route("/common/tokenValid", methods=["POST"]) def api_common_tokenValidHandle(): - return SmartDbCaller(calendar_db.common_tokenValid, - (FormField('token', str, False), ), - None) + return SmartDbCaller( + calendar_db.common_tokenValid, (FormField("token", str, False),), None + ) + # endregion # region: Calendar -@app.route('/calendar/getFull', methods=['POST']) + +@app.route("/calendar/getFull", methods=["POST"]) def api_calendar_getFullHandle(): - return SmartDbCaller(calendar_db.calendar_getFull, - (FormField('token', str, False), - FormField('startDateTime', int, False), - FormField('endDateTime', int, False)), - None) + return SmartDbCaller( + calendar_db.calendar_getFull, + ( + FormField("token", str, False), + FormField("startDateTime", int, False), + FormField("endDateTime", int, False), + ), + None, + ) -@app.route('/calendar/getList', methods=['POST']) + +@app.route("/calendar/getList", methods=["POST"]) def api_calendar_getListHandle(): - return SmartDbCaller(calendar_db.calendar_getList, - (FormField('token', str, False), - FormField('startDateTime', int, False), - FormField('endDateTime', int, False)), - None) + return SmartDbCaller( + calendar_db.calendar_getList, + ( + FormField("token", str, False), + FormField("startDateTime", int, False), + FormField("endDateTime", int, False), + ), + None, + ) -@app.route('/calendar/getDetail', methods=['POST']) + +@app.route("/calendar/getDetail", methods=["POST"]) def api_calendar_getDetailHandle(): - return SmartDbCaller(calendar_db.calendar_getDetail, - (FormField('token', str, False), - FormField('uuid', str, False)), - None) + return SmartDbCaller( + calendar_db.calendar_getDetail, + (FormField("token", str, False), FormField("uuid", str, False)), + None, + ) -@app.route('/calendar/update', methods=['POST']) + +@app.route("/calendar/update", methods=["POST"]) def api_calendar_updateHandle(): - return SmartDbCaller(calendar_db.calendar_update, - (FormField('token', str, False), - FormField('uuid', str, False), - FormField('belongTo', str, True), - FormField('title', str, True), - FormField('description', str, True), - FormField('eventDateTimeStart', int, True), - FormField('eventDateTimeEnd', int, True), - FormField('loopRules', str, True), - FormField('timezoneOffset', int, True), - FormField('lastChange', str, False)), - None) + return SmartDbCaller( + calendar_db.calendar_update, + ( + FormField("token", str, False), + FormField("uuid", str, False), + FormField("belongTo", str, True), + FormField("title", str, True), + FormField("description", str, True), + FormField("eventDateTimeStart", int, True), + FormField("eventDateTimeEnd", int, True), + FormField("loopRules", str, True), + FormField("timezoneOffset", int, True), + FormField("lastChange", str, False), + ), + None, + ) -@app.route('/calendar/add', methods=['POST']) + +@app.route("/calendar/add", methods=["POST"]) def api_calendar_addHandle(): - return SmartDbCaller(calendar_db.calendar_add, - (FormField('token', str, False), - FormField('belongTo', str, False), - FormField('title', str, False), - FormField('description', str, False), - FormField('eventDateTimeStart', int, False), - FormField('eventDateTimeEnd', int, False), - FormField('loopRules', str, False), - FormField('timezoneOffset', int, False)), - None) + return SmartDbCaller( + calendar_db.calendar_add, + ( + FormField("token", str, False), + FormField("belongTo", str, False), + FormField("title", str, False), + FormField("description", str, False), + FormField("eventDateTimeStart", int, False), + FormField("eventDateTimeEnd", int, False), + FormField("loopRules", str, False), + FormField("timezoneOffset", int, False), + ), + None, + ) -@app.route('/calendar/delete', methods=['POST']) + +@app.route("/calendar/delete", methods=["POST"]) def api_calendar_deleteHandle(): - return SmartDbCaller(calendar_db.calendar_delete, - (FormField('token', str, False), - FormField('uuid', str, False), - FormField('lastChange', str, False)), - None) + return SmartDbCaller( + calendar_db.calendar_delete, + ( + FormField("token", str, False), + FormField("uuid", str, False), + FormField("lastChange", str, False), + ), + None, + ) + # endregion # region: Collection -@app.route('/collection/getFullOwn', methods=['POST']) + +@app.route("/collection/getFullOwn", methods=["POST"]) def api_collection_getFullOwnHandle(): - return SmartDbCaller(calendar_db.collection_getFullOwn, - (FormField('token', str, False), ), - None) + return SmartDbCaller( + calendar_db.collection_getFullOwn, (FormField("token", str, False),), None + ) -@app.route('/collection/getListOwn', methods=['POST']) + +@app.route("/collection/getListOwn", methods=["POST"]) def api_collection_getListOwnHandle(): - return SmartDbCaller(calendar_db.collection_getListOwn, - (FormField('token', str, False), ), - None) + return SmartDbCaller( + calendar_db.collection_getListOwn, (FormField("token", str, False),), None + ) -@app.route('/collection/getDetailOwn', methods=['POST']) + +@app.route("/collection/getDetailOwn", methods=["POST"]) def api_collection_getDetailOwnHandle(): - return SmartDbCaller(calendar_db.collection_getDetailOwn, - (FormField('token', str, False), - FormField('uuid', str, False)), - None) + return SmartDbCaller( + calendar_db.collection_getDetailOwn, + (FormField("token", str, False), FormField("uuid", str, False)), + None, + ) -@app.route('/collection/addOwn', methods=['POST']) + +@app.route("/collection/addOwn", methods=["POST"]) def api_collection_addOwnHandle(): - return SmartDbCaller(calendar_db.collection_addOwn, - (FormField('token', str, False), - FormField('name', str, False)), - None) + return SmartDbCaller( + calendar_db.collection_addOwn, + (FormField("token", str, False), FormField("name", str, False)), + None, + ) -@app.route('/collection/updateOwn', methods=['POST']) + +@app.route("/collection/updateOwn", methods=["POST"]) def api_collection_updateOwnHandle(): - return SmartDbCaller(calendar_db.collection_updateOwn, - (FormField('token', str, False), - FormField('uuid', str, False), - FormField('name', str, False), - FormField('lastChange', str, False)), - None) + return SmartDbCaller( + calendar_db.collection_updateOwn, + ( + FormField("token", str, False), + FormField("uuid", str, False), + FormField("name", str, False), + FormField("lastChange", str, False), + ), + None, + ) -@app.route('/collection/deleteOwn', methods=['POST']) + +@app.route("/collection/deleteOwn", methods=["POST"]) def api_collection_deleteOwnHandle(): - return SmartDbCaller(calendar_db.collection_deleteOwn, - (FormField('token', str, False), - FormField('uuid', str, False), - FormField('lastChange', str, False)), - None) + return SmartDbCaller( + calendar_db.collection_deleteOwn, + ( + FormField("token", str, False), + FormField("uuid", str, False), + FormField("lastChange", str, False), + ), + None, + ) -@app.route('/collection/getSharing', methods=['POST']) +@app.route("/collection/getSharing", methods=["POST"]) def api_collection_getSharingHandle(): - return SmartDbCaller(calendar_db.collection_getSharing, - (FormField('token', str, False), - FormField('uuid', str, False)), - None) + return SmartDbCaller( + calendar_db.collection_getSharing, + (FormField("token", str, False), FormField("uuid", str, False)), + None, + ) -@app.route('/collection/deleteSharing', methods=['POST']) + +@app.route("/collection/deleteSharing", methods=["POST"]) def api_collection_deleteSharingHandle(): - return SmartDbCaller(calendar_db.collection_deleteSharing, - (FormField('token', str, False), - FormField('uuid', str, False), - FormField('target', str, False), - FormField('lastChange', str, False)), - None) + return SmartDbCaller( + calendar_db.collection_deleteSharing, + ( + FormField("token", str, False), + FormField("uuid", str, False), + FormField("target", str, False), + FormField("lastChange", str, False), + ), + None, + ) -@app.route('/collection/addSharing', methods=['POST']) + +@app.route("/collection/addSharing", methods=["POST"]) def api_collection_addSharingHandle(): - return SmartDbCaller(calendar_db.collection_addSharing, - (FormField('token', str, False), - FormField('uuid', str, False), - FormField('target', str, False), - FormField('lastChange', str, False)), - None) + return SmartDbCaller( + calendar_db.collection_addSharing, + ( + FormField("token", str, False), + FormField("uuid", str, False), + FormField("target", str, False), + FormField("lastChange", str, False), + ), + None, + ) -@app.route('/collection/getShared', methods=['POST']) +@app.route("/collection/getShared", methods=["POST"]) def api_collection_getSharedHandle(): - return SmartDbCaller(calendar_db.collection_getShared, - (FormField('token', str, False), ), - None) + return SmartDbCaller( + calendar_db.collection_getShared, (FormField("token", str, False),), None + ) + # endregion # region: Todo -@app.route('/todo/getFull', methods=['POST']) + +@app.route("/todo/getFull", methods=["POST"]) def api_todo_getFullHandle(): - return SmartDbCaller(calendar_db.todo_getFull, - (FormField('token', str, False), ), - None) + return SmartDbCaller( + calendar_db.todo_getFull, (FormField("token", str, False),), None + ) -@app.route('/todo/getList', methods=['POST']) + +@app.route("/todo/getList", methods=["POST"]) def api_todo_getListHandle(): - return SmartDbCaller(calendar_db.todo_getList, - (FormField('token', str, False), ), - None) + return SmartDbCaller( + calendar_db.todo_getList, (FormField("token", str, False),), None + ) -@app.route('/todo/getDetail', methods=['POST']) + +@app.route("/todo/getDetail", methods=["POST"]) def api_todo_getDetailHandle(): - return SmartDbCaller(calendar_db.todo_getDetail, - (FormField('token', str, False), - FormField('uuid', str, False)), - None) + return SmartDbCaller( + calendar_db.todo_getDetail, + (FormField("token", str, False), FormField("uuid", str, False)), + None, + ) -@app.route('/todo/add', methods=['POST']) + +@app.route("/todo/add", methods=["POST"]) def api_todo_addHandle(): - return SmartDbCaller(calendar_db.todo_add, - (FormField('token', str, False), ), - None) + return SmartDbCaller(calendar_db.todo_add, (FormField("token", str, False),), None) -@app.route('/todo/update', methods=['POST']) + +@app.route("/todo/update", methods=["POST"]) def api_todo_updateHandle(): - return SmartDbCaller(calendar_db.todo_update, - (FormField('token', str, False), - FormField('uuid', str, False), - FormField('data', str, False), - FormField('lastChange', str, False)), - None) + return SmartDbCaller( + calendar_db.todo_update, + ( + FormField("token", str, False), + FormField("uuid", str, False), + FormField("data", str, False), + FormField("lastChange", str, False), + ), + None, + ) -@app.route('/todo/delete', methods=['POST']) + +@app.route("/todo/delete", methods=["POST"]) def api_todo_deleteHandle(): - return SmartDbCaller(calendar_db.todo_delete, - (FormField('token', str, False), - FormField('uuid', str, False), - FormField('lastChange', str, False)), - None) + return SmartDbCaller( + calendar_db.todo_delete, + ( + FormField("token", str, False), + FormField("uuid", str, False), + FormField("lastChange", str, False), + ), + None, + ) + # endregion # region: Admin -@app.route('/admin/get', methods=['POST']) + +@app.route("/admin/get", methods=["POST"]) def api_admin_getHandle(): - return SmartDbCaller(calendar_db.admin_get, - (FormField('token', str, False), ), - None) + return SmartDbCaller(calendar_db.admin_get, (FormField("token", str, False),), None) -@app.route('/admin/add', methods=['POST']) + +@app.route("/admin/add", methods=["POST"]) def api_admin_addHandle(): - return SmartDbCaller(calendar_db.admin_add, - (FormField('token', str, False), - FormField('username', str, False)), - None) + return SmartDbCaller( + calendar_db.admin_add, + (FormField("token", str, False), FormField("username", str, False)), + None, + ) -@app.route('/admin/update', methods=['POST']) + +@app.route("/admin/update", methods=["POST"]) def api_admin_updateHandle(): - return SmartDbCaller(calendar_db.admin_update, - (FormField('token', str, False), - FormField('username', str, False), - FormField('password', str, True), - FormField('isAdmin', utils.Str2Bool, True)), - None) + return SmartDbCaller( + calendar_db.admin_update, + ( + FormField("token", str, False), + FormField("username", str, False), + FormField("password", str, True), + FormField("isAdmin", utils.Str2Bool, True), + ), + None, + ) -@app.route('/admin/delete', methods=['POST']) + +@app.route("/admin/delete", methods=["POST"]) def api_admin_deleteHandle(): - return SmartDbCaller(calendar_db.admin_delete, - (FormField('token', str, False), - FormField('username', str, False)), - None) + return SmartDbCaller( + calendar_db.admin_delete, + (FormField("token", str, False), FormField("username", str, False)), + None, + ) + # endregion # region: Profile -@app.route('/profile/isAdmin', methods=['POST']) + +@app.route("/profile/isAdmin", methods=["POST"]) def api_profile_isAdminHandle(): - return SmartDbCaller(calendar_db.profile_isAdmin, - (FormField('token', str, False), ), - None) + return SmartDbCaller( + calendar_db.profile_isAdmin, (FormField("token", str, False),), None + ) -@app.route('/profile/changePassword', methods=['POST']) + +@app.route("/profile/changePassword", methods=["POST"]) def api_profile_changePasswordHandle(): - return SmartDbCaller(calendar_db.profile_changePassword, - (FormField('token', str, False), - FormField('password', str, False)), - None) + return SmartDbCaller( + calendar_db.profile_changePassword, + (FormField("token", str, False), FormField("password", str, False)), + None, + ) -@app.route('/profile/getToken', methods=['POST']) + +@app.route("/profile/getToken", methods=["POST"]) def api_profile_getTokenHandle(): - return SmartDbCaller(calendar_db.profile_getToken, - (FormField('token', str, False), ), - None) + return SmartDbCaller( + calendar_db.profile_getToken, (FormField("token", str, False),), None + ) -@app.route('/profile/deleteToken', methods=['POST']) + +@app.route("/profile/deleteToken", methods=["POST"]) def api_profile_deleteTokenHandle(): - return SmartDbCaller(calendar_db.profile_deleteToken, - (FormField('token', str, False), - FormField('deleteToken', str, False)), - None) + return SmartDbCaller( + calendar_db.profile_deleteToken, + (FormField("token", str, False), FormField("deleteToken", str, False)), + None, + ) + # endregion # endregion -# region: Misc Functions +# region: Utilities + + +@dataclass(frozen=True) +class ClientNetworkInfo: + user_agent: str + """The user agent of client.""" + ip_addr: str + """The IP address of client.""" + + +def FetchClientNetworkInfo() -> ClientNetworkInfo: + clientUa = request.user_agent.string + forwardIpList = request.headers.getlist("X-Forwarded-For") + if forwardIpList: + clientIp = forwardIpList[0] + else: + directIp = request.remote_addr + if directIp is not None: + clientIp = directIp + else: + clientIp = "0.0.0.0" + + return ClientNetworkInfo(clientUa, clientIp) + @dataclass(frozen=True) class FormField: @@ -338,22 +447,31 @@ class FormField: is_optional: bool """True if this form field is optional, otherwise false.""" -def SmartDbCaller(db_method: Callable, fields: tuple[FormField, ...], padding_form: dict[str, Any] | None) -> dict[str, Any]: - result = ResponseBody(False, 'Invalid parameter', None) + +def SmartDbCaller( + db_method: Callable[..., ResponseBody[Any]], + fields: tuple[FormField, ...], + padding_form: dict[str, str] | None, +) -> dict[str, Any]: opt_param_counter = 0 + lost_required: bool = False param_list: list[Any] = [] opt_param_dict: dict[str, Any] = {} - - real_form = request.form.to_dict() - LOGGER.debug(f'Form: {real_form}') + + # fetch user passed form + user_form: dict[str, str] = request.form.to_dict() + LOGGER.debug(f"User Form: {user_form}") + # overwrite user form by our padding form if padding_form is not None: - real_form.update(padding_form) - + user_form.update(padding_form) + LOGGER.debug(f"Padded User Form: {user_form}") + + # check fields one by one for field in fields: - value = real_form.get(field.name, None) + value = user_form.get(field.name, None) if value is not None: value = field.ty(value) - + if field.is_optional: # optional param if value is not None: @@ -362,36 +480,34 @@ def SmartDbCaller(db_method: Callable, fields: tuple[FormField, ...], padding_fo else: # required param if value is None: - break - param_list.append(value) - - # at least one opt param - LOGGER.debug(f'All Optional Parameter: {opt_param_counter}') - LOGGER.debug(f'Optional Parameter Count: {len(opt_param_dict)}') - if opt_param_counter == 0 or len(opt_param_dict) != 0: - result: ResponseBody = db_method(*param_list, **opt_param_dict) + lost_required = True + else: + param_list.append(value) + + # Only execute database function if there is no lost required fields. + # And fulfill one of following requirements: + # 1. There are all required fields (optional parameter count is zero). + # 1. Or, there is some optional parameter. + LOGGER.debug(f"Has Lost Required Parameter: {lost_required}") + LOGGER.debug(f"All Optional Parameter Count: {opt_param_counter}") + LOGGER.debug(f"Available Optional Parameter Count: {len(opt_param_dict)}") + result: ResponseBody[Any] + if lost_required == False and (opt_param_counter == 0 or len(opt_param_dict) != 0): + result = db_method(*param_list, **opt_param_dict) + else: + result = ResponseBody(False, "Invalid parameter", None) return ConstructResponseBody(result) -@dataclass(frozen=True) -class ResponseBody: - success: bool - """True if this operation is successful, otherwise false.""" - error: str - """The error message provided when operation failed.""" - data: Any - """The payload provided when operation successed.""" -def ConstructResponseBody(body: ResponseBody) -> dict[str, Any]: - return { - 'success': body.success, - 'error': body.error, - 'data': body.data - } +def ConstructResponseBody(body: ResponseBody[Any]) -> dict[str, Any]: + return {"success": body.success, "error": body.error, "data": body.data} + + +# endregion + def run(): calendar_db.open() app.run(port=config.get_config().web.port) calendar_db.close() - -# endregion diff --git a/backend/utils.py b/backend/utils.py index 064f74b..1fc65c6 100644 --- a/backend/utils.py +++ b/backend/utils.py @@ -3,50 +3,62 @@ import random import uuid import time import math +import re -ValidUsername = set(map(lambda x:chr(x), range(48, 58, 1))) | set(map(lambda x:chr(x), range(65, 91, 1))) | set(map(lambda x:chr(x), range(97, 123, 1))) -ValidPassword = set(map(lambda x:chr(x), range(33, 127, 1))) +USERNAME_PATTERN: re.Pattern = re.compile("^[0-9A-Za-z]+$") +PASSWORD_PATTERN: re.Pattern = re.compile("^[!-~]+$") -def IsValidUsername(strl): - return (len(set(strl) - ValidUsername) == 0) -def IsValidPassword(strl): - return (len(set(strl) - ValidPassword) == 0) +def IsValidUsername(strl: str) -> bool: + return USERNAME_PATTERN.match(strl) is not None -def ComputePasswordHash(password): + +def IsValidPassword(strl: str) -> bool: + return PASSWORD_PATTERN.match(strl) is not None + + +def ComputePasswordHash(password: str) -> str: s = hashlib.sha256() - s.update(password.encode('utf-8')) + s.update(password.encode("utf-8")) return s.hexdigest() -def GenerateUUID(): + +def GenerateUUID() -> str: return str(uuid.uuid1()) -def GenerateToken(username): + +def GenerateToken(username: str) -> str: s = hashlib.sha256() - s.update(username.encode('utf-8')) - s.update(GenerateUUID().encode('utf-8')) + s.update(username.encode("utf-8")) + s.update(GenerateUUID().encode("utf-8")) return s.hexdigest() -def GenerateSalt(): + +def GenerateSalt() -> int: return random.randint(0, 6172748) -def ComputePasswordHashWithSalt(passwordHashed, salt): + +def ComputePasswordHashWithSalt(passwordHashed: str, salt: int) -> str: s = hashlib.sha256() - s.update((passwordHashed + str(salt)).encode('utf-8')) + s.update((passwordHashed + str(salt)).encode("utf-8")) return s.hexdigest() -def GetCurrentTimestamp(): + +def GetCurrentTimestamp() -> int: return int(time.time()) -def GetTokenExpireOn(): - return GetCurrentTimestamp() + 60 * 60 * 24 * 2 # add 2 day from now -def Str2Bool(strl): - return strl.lower() == 'true' +def GetTokenExpireOn() -> int: + return GetCurrentTimestamp() + 60 * 60 * 24 * 2 # add 2 day from now -def GCD(a, b): + +def Str2Bool(strl: str) -> bool: + return strl.lower() == "true" + + +def GCD(a: int, b: int) -> int: return math.gcd(a, b) -def LCM(a, b): - return int(a * b / GCD(a, b)) - \ No newline at end of file + +def LCM(a: int, b: int) -> int: + return (a * b) // GCD(a, b)