1
0

refactor: modify backend for better understand

This commit is contained in:
2026-05-15 11:08:57 +08:00
parent e484ded5be
commit 35fee0f473
3 changed files with 617 additions and 408 deletions

View File

@@ -3,33 +3,50 @@ import threading
from typing import cast from typing import cast
from pathlib import Path from pathlib import Path
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any from typing import Callable, ParamSpec, TypeVar, Generic
import dt import dt
import utils import utils
import config import config
from logger import LOGGER from logger import LOGGER
T = TypeVar('T')
P = ParamSpec('P')
R = TypeVar('R')
@dataclass(frozen=True) @dataclass(frozen=True)
class ResponseBody: class ResponseBody(Generic[T]):
"""The generic response body for API return."""
success: bool success: bool
"""True if this operation is successful, otherwise false.""" """True if this operation is successful, otherwise false."""
error: str error: str
"""The error message provided when operation failed.""" """The error message provided when operation failed."""
data: Any data: T | None
"""The payload provided when operation successed.""" """The payload provided when operation successed."""
def SafeDatabaseOperation(func):
def wrapper(self: 'CalendarDatabase', *args, **kwargs): class DbException(Exception):
"""Error occurs when manipulating with database."""
pass
def SafeDatabaseOperation(inner: Callable[P, R]) -> Callable[P, ResponseBody[R]]:
def wrapper(*args, **kwargs) -> ResponseBody[R]:
# extract self from args
self: 'CalendarDatabase' = args[0]
# get config
cfg = config.get_config() cfg = config.get_config()
with self.mutex: with self.mutex:
# check database and acquire cursor # try to fetching database and allocate database cursor
try: try:
self.check_database() db = self._get_db()
self.cursor = self.db.cursor() self._allocate_cursor()
except Exception as e: except Exception as e:
self.cursor = None self._free_cursor()
if cfg.others.debug: if cfg.others.debug:
LOGGER.exception(e) LOGGER.exception(e)
return ResponseBody(False, str(e), None) return ResponseBody(False, str(e), None)
@@ -42,15 +59,13 @@ def SafeDatabaseOperation(func):
LOGGER.info('Cleaning outdated token...') LOGGER.info('Cleaning outdated token...')
self.tokenOper_clean() self.tokenOper_clean()
result = ResponseBody(True, '', func(self, *args, **kwargs)) result = ResponseBody(True, '', inner(*args, **kwargs))
self.cursor.close() self._free_cursor()
self.cursor = None db.commit()
self.db.commit()
return result return result
except Exception as e: except Exception as e:
self.cursor.close() self._free_cursor()
self.cursor = None db.rollback()
self.db.rollback()
if cfg.others.debug: if cfg.others.debug:
LOGGER.exception(e) LOGGER.exception(e)
return ResponseBody(False, str(e), None) return ResponseBody(False, str(e), None)
@@ -59,8 +74,8 @@ def SafeDatabaseOperation(func):
class CalendarDatabase: class CalendarDatabase:
db: sqlite3.Connection db: sqlite3.Connection | None
cursor: sqlite3.Cursor cursor: sqlite3.Cursor | None
mutex: threading.Lock mutex: threading.Lock
latestClean: int latestClean: int
@@ -71,8 +86,8 @@ class CalendarDatabase:
self.latestClean = 0 self.latestClean = 0
def open(self): def open(self):
if (self.is_database_valid()): if (self.db is not None):
raise Exception('Databade is opened') raise DbException('Database is already opened')
cfg = config.get_config() cfg = config.get_config()
match cfg.database.driver: match cfg.database.driver:
@@ -81,13 +96,13 @@ class CalendarDatabase:
self.db.execute('PRAGMA encoding = "UTF-8";') self.db.execute('PRAGMA encoding = "UTF-8";')
self.db.execute('PRAGMA foreign_keys = ON;') self.db.execute('PRAGMA foreign_keys = ON;')
case config.DatabaseDriver.MYSQL: case config.DatabaseDriver.MYSQL:
raise Exception('Not implemented database') raise DbException('Not implemented database')
case _: case _:
raise Exception('Unknow database type') raise DbException('Unknow database type')
def init(self, username, password): def init(self, username: str, password: str):
if (self.is_database_valid()): if (self.db is not None):
raise Exception('Database is opened') raise DbException('Database is already opened')
# establish tables # establish tables
cfg = config.get_config() cfg = config.get_config()
@@ -97,44 +112,74 @@ class CalendarDatabase:
case config.DatabaseDriver.SQLITE: case config.DatabaseDriver.SQLITE:
sql_file = backend_sql_path / 'sqlite.sql' sql_file = backend_sql_path / 'sqlite.sql'
case config.DatabaseDriver.MYSQL: case config.DatabaseDriver.MYSQL:
raise Exception('Not implemented database') raise DbException('Not implemented database')
case _: case _:
raise Exception('Unknow database type') raise DbException('Unknow database type')
self.open() self.open()
cursor = self.db.cursor() db = self._get_db()
self._allocate_cursor()
cursor = self._get_cursor()
# execute script for creating tables
with open(sql_file, 'r', encoding='utf-8') as fsql: with open(sql_file, 'r', encoding='utf-8') as fsql:
cursor.executescript(fsql.read()) cursor.executescript(fsql.read())
# add default user in user table
# finish init
cursor.execute('INSERT INTO user VALUES (?, ?, ?, ?);', ( cursor.execute('INSERT INTO user VALUES (?, ?, ?, ?);', (
username, username,
utils.ComputePasswordHash(password), utils.ComputePasswordHash(password),
1, 1,
utils.GenerateSalt() utils.GenerateSalt()
)) ))
cursor.close()
self.db.commit() self._free_cursor()
# commit to database
db.commit()
def close(self): def close(self):
self.check_database() if (self.db is None):
LOGGER.warning('Try to close null database.')
else:
self._free_cursor()
self.db.close() self.db.close()
self.db = None self.db = None
def check_database(self): def _get_db(self) -> sqlite3.Connection:
if (not self.is_database_valid()): if (self.db is None):
raise Exception('Databade is None') raise DbException('There is no opened database')
else:
return self.db
def is_database_valid(self): def _allocate_cursor(self) -> None:
return not (self.db == None) if (self.cursor is not None):
raise DbException('There is already opened database cursor')
else:
self.cursor = self._get_db().cursor()
def _get_cursor(self) -> sqlite3.Cursor:
if (self.cursor is None):
raise DbException('There is no opened database cursor')
else:
return self.cursor
def _free_cursor(self) -> None:
if (self.cursor is None):
LOGGER.warning('Try to free null databse cursor.')
else:
self.cursor.close()
self.cursor = None
# ======================= token related internal operation # ======================= token related internal operation
def tokenOper_clean(self): def tokenOper_clean(self):
# remove outdated token # remove outdated token
self.cursor.execute('DELETE FROM token WHERE [token_expire_on] <= ?',(utils.GetCurrentTimestamp(), )) cursor = self._get_cursor()
cursor.execute('DELETE FROM token WHERE [token_expire_on] <= ?',(utils.GetCurrentTimestamp(), ))
def tokenOper_postpone_expireOn(self, token): def tokenOper_postpone_expireOn(self, token):
self.cursor.execute('UPDATE token SET [token_expire_on] = ? WHERE [token] = ?;', ( cursor = self._get_cursor()
cursor.execute('UPDATE token SET [token_expire_on] = ? WHERE [token] = ?;', (
utils.GetTokenExpireOn(), utils.GetTokenExpireOn(),
token token
)) ))
@@ -143,16 +188,18 @@ class CalendarDatabase:
self.tokenOper_get_username(token) self.tokenOper_get_username(token)
def tokenOper_is_admin(self, username): def tokenOper_is_admin(self, username):
self.cursor.execute('SELECT [is_admin] FROM user WHERE [name] = ?;',(username, )) cursor = self._get_cursor()
cache = self.cursor.fetchone()[0] cursor.execute('SELECT [is_admin] FROM user WHERE [name] = ?;',(username, ))
cache = cursor.fetchone()[0]
return cache == 1 return cache == 1
def tokenOper_get_username(self, token): def tokenOper_get_username(self, token):
self.cursor.execute('SELECT [user] FROM token WHERE [token] = ? AND [token_expire_on] > ?;',( cursor = self._get_cursor()
cursor.execute('SELECT [user] FROM token WHERE [token] = ? AND [token_expire_on] > ?;',(
token, token,
utils.GetCurrentTimestamp() utils.GetCurrentTimestamp()
)) ))
result = self.cursor.fetchone()[0] result = cursor.fetchone()[0]
# need postpone expire on time # need postpone expire on time
self.tokenOper_postpone_expireOn(token) self.tokenOper_postpone_expireOn(token)
return result return result
@@ -162,8 +209,9 @@ class CalendarDatabase:
@SafeDatabaseOperation @SafeDatabaseOperation
def common_salt(self, username): def common_salt(self, username):
cursor = self._get_cursor()
salt = utils.GenerateSalt() salt = utils.GenerateSalt()
self.cursor.execute('UPDATE user SET [salt] = ? WHERE [name] = ?;', ( cursor.execute('UPDATE user SET [salt] = ? WHERE [name] = ?;', (
salt, salt,
username username
)) ))
@@ -171,16 +219,17 @@ class CalendarDatabase:
@SafeDatabaseOperation @SafeDatabaseOperation
def common_login(self, username, password, clientUa, clientIp): def common_login(self, username, password, clientUa, clientIp):
self.cursor.execute('SELECT [password], [salt] FROM user WHERE [name] = ?;', (username, )) cursor = self._get_cursor()
(gotten_salt, gotten_password) = self.cursor.fetchone() cursor.execute('SELECT [password], [salt] FROM user WHERE [name] = ?;', (username, ))
(gotten_salt, gotten_password) = cursor.fetchone()
if password == utils.ComputePasswordHashWithSalt(gotten_password, gotten_salt): if password == utils.ComputePasswordHashWithSalt(gotten_password, gotten_salt):
token = utils.GenerateToken(username) token = utils.GenerateToken(username)
self.cursor.execute('UPDATE user SET [salt] = ? WHERE [name] = ?;', ( cursor.execute('UPDATE user SET [salt] = ? WHERE [name] = ?;', (
utils.GenerateSalt(), # regenerate a new slat to prevent re-login try utils.GenerateSalt(), # regenerate a new slat to prevent re-login try
username username
)) ))
self.cursor.execute('INSERT INTO token VALUES (?, ?, ?, ?, ?);', ( cursor.execute('INSERT INTO token VALUES (?, ?, ?, ?, ?);', (
username, username,
token, token,
utils.GetTokenExpireOn(), # add 2 day from now utils.GetTokenExpireOn(), # add 2 day from now
@@ -190,20 +239,21 @@ class CalendarDatabase:
return token return token
else: else:
# throw a exception to indicate fail to login # throw a exception to indicate fail to login
raise Exception('Login authentication failed') raise DbException('Login authentication failed')
@SafeDatabaseOperation @SafeDatabaseOperation
def common_webLogin(self, username, password, clientUa, clientIp): def common_webLogin(self, username, password, clientUa, clientIp):
cursor = self._get_cursor()
LOGGER.debug(f'WebLogin Username: {username}') LOGGER.debug(f'WebLogin Username: {username}')
LOGGER.debug(f'WebLogin Password: {password}') LOGGER.debug(f'WebLogin Password: {password}')
passwordHash = utils.ComputePasswordHash(password) passwordHash = utils.ComputePasswordHash(password)
LOGGER.debug(f'WebLogin Password Hash: {passwordHash}') LOGGER.debug(f'WebLogin Password Hash: {passwordHash}')
self.cursor.execute('SELECT [name] FROM user WHERE [name] = ? AND [password] = ?;', (username, passwordHash)) cursor.execute('SELECT [name] FROM user WHERE [name] = ? AND [password] = ?;', (username, passwordHash))
if len(self.cursor.fetchall()) != 0: if len(cursor.fetchall()) != 0:
token = utils.GenerateToken(username) token = utils.GenerateToken(username)
self.cursor.execute('INSERT INTO token VALUES (?, ?, ?, ?, ?);', ( cursor.execute('INSERT INTO token VALUES (?, ?, ?, ?, ?);', (
username, username,
token, token,
utils.GetTokenExpireOn(), # add 2 day from now utils.GetTokenExpireOn(), # add 2 day from now
@@ -213,12 +263,13 @@ class CalendarDatabase:
return token return token
else: else:
# throw a exception to indicate fail to login # throw a exception to indicate fail to login
raise Exception('Login authentication failed') raise DbException('Login authentication failed')
@SafeDatabaseOperation @SafeDatabaseOperation
def common_logout(self, token): def common_logout(self, token):
cursor = self._get_cursor()
self.tokenOper_check_valid(token) self.tokenOper_check_valid(token)
self.cursor.execute('DELETE FROM token WHERE [token] = ?;', (token, )) cursor.execute('DELETE FROM token WHERE [token] = ?;', (token, ))
return True return True
@SafeDatabaseOperation @SafeDatabaseOperation
@@ -229,35 +280,39 @@ class CalendarDatabase:
# =============================== calendar # =============================== calendar
@SafeDatabaseOperation @SafeDatabaseOperation
def calendar_getFull(self, token, startDateTime, endDateTime): def calendar_getFull(self, token, startDateTime, endDateTime):
cursor = self._get_cursor()
username = self.tokenOper_get_username(token) username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT calendar.* FROM calendar INNER JOIN collection \ cursor.execute('SELECT calendar.* FROM calendar INNER JOIN collection \
ON collection.uuid = calendar.belong_to \ ON collection.uuid = calendar.belong_to \
WHERE (collection.user = ? AND calendar.loop_date_time_end >= ? AND calendar.loop_date_time_start - (calendar.event_date_time_end - calendar.event_date_time_start) <= ?);', WHERE (collection.user = ? AND calendar.loop_date_time_end >= ? AND calendar.loop_date_time_start - (calendar.event_date_time_end - calendar.event_date_time_start) <= ?);',
(username, startDateTime, endDateTime)) (username, startDateTime, endDateTime))
return self.cursor.fetchall() return cursor.fetchall()
@SafeDatabaseOperation @SafeDatabaseOperation
def calendar_getList(self, token, startDateTime, endDateTime): def calendar_getList(self, token, startDateTime, endDateTime):
cursor = self._get_cursor()
username = self.tokenOper_get_username(token) username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT calendar.uuid FROM calendar INNER JOIN collection \ cursor.execute('SELECT calendar.uuid FROM calendar INNER JOIN collection \
ON collection.uuid = calendar.belong_to \ ON collection.uuid = calendar.belong_to \
WHERE (collection.user = ? AND calendar.loop_date_time_end >= ? AND calendar.loop_date_time_start - (calendar.event_date_time_end - calendar.event_date_time_start) <= ?);', WHERE (collection.user = ? AND calendar.loop_date_time_end >= ? AND calendar.loop_date_time_start - (calendar.event_date_time_end - calendar.event_date_time_start) <= ?);',
(username, startDateTime, endDateTime)) (username, startDateTime, endDateTime))
return tuple(map(lambda x: x[0], self.cursor.fetchall())) return tuple(map(lambda x: x[0], cursor.fetchall()))
@SafeDatabaseOperation @SafeDatabaseOperation
def calendar_getDetail(self, token, uuid): def calendar_getDetail(self, token, uuid):
cursor = self._get_cursor()
self.tokenOper_check_valid(token) self.tokenOper_check_valid(token)
self.cursor.execute('SELECT * FROM calendar WHERE [uuid] = ?;', (uuid, )) cursor.execute('SELECT * FROM calendar WHERE [uuid] = ?;', (uuid, ))
return self.cursor.fetchone() return cursor.fetchone()
@SafeDatabaseOperation @SafeDatabaseOperation
def calendar_update(self, token, uuid, lastChange, **optArgs): def calendar_update(self, token, uuid, lastChange, **optArgs):
cursor = self._get_cursor()
self.tokenOper_check_valid(token) self.tokenOper_check_valid(token)
# get prev data # get prev data
self.cursor.execute('SELECT * FROM calendar WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange)) cursor.execute('SELECT * FROM calendar WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange))
analyseData = list(self.cursor.fetchone()) analyseData = list(cursor.fetchone())
# construct update data # construct update data
lastupdate = utils.GenerateUUID() lastupdate = utils.GenerateUUID()
@@ -319,14 +374,15 @@ class CalendarDatabase:
# execute # execute
argumentsList.append(uuid) argumentsList.append(uuid)
self.cursor.execute('UPDATE calendar SET {} WHERE [uuid] = ?;'.format(', '.join(sqlList)), cursor.execute('UPDATE calendar SET {} WHERE [uuid] = ?;'.format(', '.join(sqlList)),
tuple(argumentsList)) tuple(argumentsList))
if self.cursor.rowcount != 1: if cursor.rowcount != 1:
raise Exception('Fail to update due to no matched rows or too much rows.') raise DbException('Fail to update due to no matched rows or too much rows.')
return lastupdate return lastupdate
@SafeDatabaseOperation @SafeDatabaseOperation
def calendar_add(self, token, belongTo, title, description, eventDateTimeStart, eventDateTimeEnd, loopRules, timezoneOffset): def calendar_add(self, token, belongTo, title, description, eventDateTimeStart, eventDateTimeEnd, loopRules, timezoneOffset):
cursor = self._get_cursor()
self.tokenOper_check_valid(token) self.tokenOper_check_valid(token)
newuuid = utils.GenerateUUID() newuuid = utils.GenerateUUID()
@@ -336,7 +392,7 @@ class CalendarDatabase:
loopDateTimeStart = eventDateTimeStart loopDateTimeStart = eventDateTimeStart
loopDateTimeEnd = dt.ResolveLoopStr(loopRules, eventDateTimeStart, timezoneOffset) loopDateTimeEnd = dt.ResolveLoopStr(loopRules, eventDateTimeStart, timezoneOffset)
self.cursor.execute('INSERT INTO calendar VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);', cursor.execute('INSERT INTO calendar VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);',
(newuuid, (newuuid,
belongTo, belongTo,
title, title,
@@ -352,134 +408,149 @@ class CalendarDatabase:
@SafeDatabaseOperation @SafeDatabaseOperation
def calendar_delete(self, token, uuid, lastChange): def calendar_delete(self, token, uuid, lastChange):
cursor = self._get_cursor()
self.tokenOper_check_valid(token) self.tokenOper_check_valid(token)
self.cursor.execute('DELETE FROM calendar WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange)) cursor.execute('DELETE FROM calendar WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange))
if self.cursor.rowcount != 1: if cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.') raise DbException('Fail to delete due to no matched rows or too much rows.')
return True return True
# =============================== collection # =============================== collection
@SafeDatabaseOperation @SafeDatabaseOperation
def collection_getFullOwn(self, token): def collection_getFullOwn(self, token):
cursor = self._get_cursor()
username = self.tokenOper_get_username(token) username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT [uuid], [name], [last_change] FROM collection WHERE [user] = ?;', (username, )) cursor.execute('SELECT [uuid], [name], [last_change] FROM collection WHERE [user] = ?;', (username, ))
return self.cursor.fetchall() return cursor.fetchall()
@SafeDatabaseOperation @SafeDatabaseOperation
def collection_getListOwn(self, token): def collection_getListOwn(self, token):
cursor = self._get_cursor()
username = self.tokenOper_get_username(token) username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT [uuid] FROM collection WHERE [user] = ?;', (username, )) cursor.execute('SELECT [uuid] FROM collection WHERE [user] = ?;', (username, ))
return tuple(map(lambda x: x[0], self.cursor.fetchall())) return tuple(map(lambda x: x[0], cursor.fetchall()))
@SafeDatabaseOperation @SafeDatabaseOperation
def collection_getDetailOwn(self, token, uuid): def collection_getDetailOwn(self, token, uuid):
cursor = self._get_cursor()
username = self.tokenOper_get_username(token) username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT [uuid], [name], [last_change] FROM collection WHERE [user] = ? AND [uuid] = ?;', (username, uuid)) cursor.execute('SELECT [uuid], [name], [last_change] FROM collection WHERE [user] = ? AND [uuid] = ?;', (username, uuid))
return self.cursor.fetchone() return cursor.fetchone()
@SafeDatabaseOperation @SafeDatabaseOperation
def collection_addOwn(self, token, newname): def collection_addOwn(self, token, newname):
cursor = self._get_cursor()
username = self.tokenOper_get_username(token) username = self.tokenOper_get_username(token)
newuuid = utils.GenerateUUID() newuuid = utils.GenerateUUID()
lastupdate = utils.GenerateUUID() lastupdate = utils.GenerateUUID()
self.cursor.execute('INSERT INTO collection VALUES (?, ?, ?, ?);', cursor.execute('INSERT INTO collection VALUES (?, ?, ?, ?);',
(newuuid, newname, username, lastupdate)) (newuuid, newname, username, lastupdate))
return newuuid return newuuid
@SafeDatabaseOperation @SafeDatabaseOperation
def collection_updateOwn(self, token, uuid, newname, lastChange): def collection_updateOwn(self, token, uuid, newname, lastChange):
cursor = self._get_cursor()
self.tokenOper_check_valid(token) self.tokenOper_check_valid(token)
lastupdate = utils.GenerateUUID() lastupdate = utils.GenerateUUID()
self.cursor.execute('UPDATE collection SET [name] = ?, [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', ( cursor.execute('UPDATE collection SET [name] = ?, [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', (
newname, newname,
lastupdate, lastupdate,
uuid, uuid,
lastChange lastChange
)) ))
if self.cursor.rowcount != 1: if cursor.rowcount != 1:
raise Exception('Fail to update due to no matched rows or too much rows.') raise DbException('Fail to update due to no matched rows or too much rows.')
return lastupdate return lastupdate
@SafeDatabaseOperation @SafeDatabaseOperation
def collection_deleteOwn(self, token, uuid, lastChange): def collection_deleteOwn(self, token, uuid, lastChange):
cursor = self._get_cursor()
self.tokenOper_check_valid(token) self.tokenOper_check_valid(token)
self.cursor.execute('DELETE FROM collection WHERE [uuid] = ? AND [last_change] = ?;', ( cursor.execute('DELETE FROM collection WHERE [uuid] = ? AND [last_change] = ?;', (
uuid, uuid,
lastChange lastChange
)) ))
if self.cursor.rowcount != 1: if cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.') raise DbException('Fail to delete due to no matched rows or too much rows.')
return True return True
@SafeDatabaseOperation @SafeDatabaseOperation
def collection_getSharing(self, token, uuid): def collection_getSharing(self, token, uuid):
cursor = self._get_cursor()
self.tokenOper_check_valid(token) self.tokenOper_check_valid(token)
self.cursor.execute('SELECT [target] FROM share WHERE [uuid] = ?;', (uuid, )) cursor.execute('SELECT [target] FROM share WHERE [uuid] = ?;', (uuid, ))
return tuple(map(lambda x: x[0], self.cursor.fetchall())) return tuple(map(lambda x: x[0], cursor.fetchall()))
@SafeDatabaseOperation @SafeDatabaseOperation
def collection_deleteSharing(self, token, uuid, target, lastChange): def collection_deleteSharing(self, token, uuid, target, lastChange):
cursor = self._get_cursor()
self.tokenOper_check_valid(token) self.tokenOper_check_valid(token)
lastupdate = utils.GenerateUUID() lastupdate = utils.GenerateUUID()
self.cursor.execute('UPDATE collection SET [last_change] = ?, WHERE [uuid] = ? AND [last_change] = ?;', (lastupdate, uuid, lastChange)) cursor.execute('UPDATE collection SET [last_change] = ?, WHERE [uuid] = ? AND [last_change] = ?;', (lastupdate, uuid, lastChange))
if self.cursor.rowcount != 1: if cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.') raise DbException('Fail to delete due to no matched rows or too much rows.')
self.cursor.execute('DELETE FROM share WHERE [uuid] = ? AND [target] = ?;', (uuid, target)) cursor.execute('DELETE FROM share WHERE [uuid] = ? AND [target] = ?;', (uuid, target))
if self.cursor.rowcount != 1: if cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.') raise DbException('Fail to delete due to no matched rows or too much rows.')
return lastupdate return lastupdate
@SafeDatabaseOperation @SafeDatabaseOperation
def collection_addSharing(self, token, uuid, target, lastChange): def collection_addSharing(self, token, uuid, target, lastChange):
cursor = self._get_cursor()
self.tokenOper_check_valid(token) self.tokenOper_check_valid(token)
lastupdate = utils.GenerateUUID() lastupdate = utils.GenerateUUID()
self.cursor.execute('UPDATE collection SET [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', (lastupdate, uuid, lastChange)) cursor.execute('UPDATE collection SET [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', (lastupdate, uuid, lastChange))
if self.cursor.rowcount != 1: if cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.') raise DbException('Fail to delete due to no matched rows or too much rows.')
self.cursor.execute('SELECT * FROM share WHERE [uuid] = ? AND [target] = ?;', (uuid, target)) cursor.execute('SELECT * FROM share WHERE [uuid] = ? AND [target] = ?;', (uuid, target))
if len(self.cursor.fetchall()) != 0: if len(cursor.fetchall()) != 0:
raise Exception('Fail to insert duplicated item.') raise DbException('Fail to insert duplicated item.')
self.cursor.execute('INSERT INTO share VALUES (?, ?);', (uuid, target)) cursor.execute('INSERT INTO share VALUES (?, ?);', (uuid, target))
return lastupdate return lastupdate
@SafeDatabaseOperation @SafeDatabaseOperation
def collection_getShared(self, token): def collection_getShared(self, token):
cursor = self._get_cursor()
username = self.tokenOper_get_username(token) username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT collection.uuid, collection.name, collection.user \ cursor.execute('SELECT collection.uuid, collection.name, collection.user \
FROM share INNER JOIN collection \ FROM share INNER JOIN collection \
ON share.uuid = collection.uuid \ ON share.uuid = collection.uuid \
WHERE share.target = ?;', (username, )) WHERE share.target = ?;', (username, ))
return self.cursor.fetchall() return cursor.fetchall()
# =============================== todo # =============================== todo
@SafeDatabaseOperation @SafeDatabaseOperation
def todo_getFull(self, token): def todo_getFull(self, token):
cursor = self._get_cursor()
username = self.tokenOper_get_username(token) username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT * FROM todo WHERE [belong_to] = ?;', (username, )) cursor.execute('SELECT * FROM todo WHERE [belong_to] = ?;', (username, ))
return self.cursor.fetchall() return cursor.fetchall()
@SafeDatabaseOperation @SafeDatabaseOperation
def todo_getList(self, token): def todo_getList(self, token):
cursor = self._get_cursor()
username = self.tokenOper_get_username(token) username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT [uuid] FROM todo WHERE [belong_to] = ?;', (username, )) cursor.execute('SELECT [uuid] FROM todo WHERE [belong_to] = ?;', (username, ))
return tuple(map(lambda x: x[0], self.cursor.fetchall())) return tuple(map(lambda x: x[0], cursor.fetchall()))
@SafeDatabaseOperation @SafeDatabaseOperation
def todo_getDetail(self, token, uuid): def todo_getDetail(self, token, uuid):
cursor = self._get_cursor()
username = self.tokenOper_get_username(token) username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT * FROM todo WHERE [belong_to] = ? AND [uuid] = ?;', (username, uuid)) cursor.execute('SELECT * FROM todo WHERE [belong_to] = ? AND [uuid] = ?;', (username, uuid))
return self.cursor.fetchone() return cursor.fetchone()
@SafeDatabaseOperation @SafeDatabaseOperation
def todo_add(self, token): def todo_add(self, token):
cursor = self._get_cursor()
username = self.tokenOper_get_username(token) username = self.tokenOper_get_username(token)
newuuid = utils.GenerateUUID() newuuid = utils.GenerateUUID()
lastupdate = utils.GenerateUUID() lastupdate = utils.GenerateUUID()
@@ -489,56 +560,60 @@ class CalendarDatabase:
'', '',
lastupdate, lastupdate,
) )
self.cursor.execute('INSERT INTO todo VALUES (?, ?, ?, ?);', returnedData) cursor.execute('INSERT INTO todo VALUES (?, ?, ?, ?);', returnedData)
return returnedData return returnedData
@SafeDatabaseOperation @SafeDatabaseOperation
def todo_update(self, token, uuid, data, lastChange): def todo_update(self, token, uuid, data, lastChange):
cursor = self._get_cursor()
# check valid token # check valid token
self.tokenOper_check_valid(token) self.tokenOper_check_valid(token)
# update # update
newLastChange = utils.GenerateUUID() newLastChange = utils.GenerateUUID()
self.cursor.execute('UPDATE todo SET [data] = ?, [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', ( cursor.execute('UPDATE todo SET [data] = ?, [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', (
data, data,
newLastChange, newLastChange,
uuid, uuid,
lastChange lastChange
)) ))
if self.cursor.rowcount != 1: if cursor.rowcount != 1:
raise Exception('Fail to update due to no matched rows or too much rows.') raise DbException('Fail to update due to no matched rows or too much rows.')
return newLastChange return newLastChange
@SafeDatabaseOperation @SafeDatabaseOperation
def todo_delete(self, token, uuid, lastChange): def todo_delete(self, token, uuid, lastChange):
cursor = self._get_cursor()
# check valid token # check valid token
self.tokenOper_check_valid(token) self.tokenOper_check_valid(token)
# delete # delete
self.cursor.execute('DELETE FROM todo WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange)) cursor.execute('DELETE FROM todo WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange))
if self.cursor.rowcount != 1: if cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.') raise DbException('Fail to delete due to no matched rows or too much rows.')
return True return True
# =============================== admin # =============================== admin
@SafeDatabaseOperation @SafeDatabaseOperation
def admin_get(self, token): def admin_get(self, token):
cursor = self._get_cursor()
username = self.tokenOper_get_username(token) username = self.tokenOper_get_username(token)
if not self.tokenOper_is_admin(username): if not self.tokenOper_is_admin(username):
raise Exception('Permission denied.') raise DbException('Permission denied.')
self.cursor.execute('SELECT [name], [is_admin] FROM user;') cursor.execute('SELECT [name], [is_admin] FROM user;')
return tuple(map(lambda x: (x[0], x[1] == 1), self.cursor.fetchall())) return tuple(map(lambda x: (x[0], x[1] == 1), cursor.fetchall()))
@SafeDatabaseOperation @SafeDatabaseOperation
def admin_add(self, token, newname): def admin_add(self, token, newname):
cursor = self._get_cursor()
username = self.tokenOper_get_username(token) username = self.tokenOper_get_username(token)
if not self.tokenOper_is_admin(username): if not self.tokenOper_is_admin(username):
raise Exception('Permission denied.') raise DbException('Permission denied.')
newpassword = utils.ComputePasswordHash(utils.GenerateUUID()) newpassword = utils.ComputePasswordHash(utils.GenerateUUID())
self.cursor.execute('INSERT INTO user VALUES (?, ?, ?, ?);', ( cursor.execute('INSERT INTO user VALUES (?, ?, ?, ?);', (
newname, newname,
newpassword, newpassword,
0, 0,
@@ -548,9 +623,10 @@ class CalendarDatabase:
@SafeDatabaseOperation @SafeDatabaseOperation
def admin_update(self, token, _username, **optArgs): def admin_update(self, token, _username, **optArgs):
cursor = self._get_cursor()
username = self.tokenOper_get_username(token) username = self.tokenOper_get_username(token)
if not self.tokenOper_is_admin(username): if not self.tokenOper_is_admin(username):
raise Exception('Permission denied.') raise DbException('Permission denied.')
# construct data # construct data
sqlList = [] sqlList = []
@@ -568,36 +644,39 @@ class CalendarDatabase:
# execute # execute
argumentsList.append(_username) argumentsList.append(_username)
self.cursor.execute('UPDATE user SET {} WHERE [name] = ?;'.format(', '.join(sqlList)), cursor.execute('UPDATE user SET {} WHERE [name] = ?;'.format(', '.join(sqlList)),
tuple(argumentsList)) tuple(argumentsList))
LOGGER.debug(cache) LOGGER.debug(cache)
LOGGER.debug(tuple(argumentsList)) LOGGER.debug(tuple(argumentsList))
if self.cursor.rowcount != 1: if cursor.rowcount != 1:
raise Exception('Fail to update due to no matched rows or too much rows.') raise DbException('Fail to update due to no matched rows or too much rows.')
return True return True
@SafeDatabaseOperation @SafeDatabaseOperation
def admin_delete(self, token, username): def admin_delete(self, token, username):
cursor = self._get_cursor()
_username = self.tokenOper_get_username(token) _username = self.tokenOper_get_username(token)
if not self.tokenOper_is_admin(_username): if not self.tokenOper_is_admin(_username):
raise Exception('Permission denied.') raise DbException('Permission denied.')
# delete # delete
self.cursor.execute('DELETE FROM user WHERE [name] = ?;', (username, )) cursor.execute('DELETE FROM user WHERE [name] = ?;', (username, ))
if self.cursor.rowcount != 1: if cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.') raise DbException('Fail to delete due to no matched rows or too much rows.')
return True return True
# =============================== profile # =============================== profile
@SafeDatabaseOperation @SafeDatabaseOperation
def profile_isAdmin(self, token): def profile_isAdmin(self, token):
cursor = self._get_cursor()
username = self.tokenOper_get_username(token) username = self.tokenOper_get_username(token)
return self.tokenOper_is_admin(username) return self.tokenOper_is_admin(username)
@SafeDatabaseOperation @SafeDatabaseOperation
def profile_changePassword(self, token, newpassword): def profile_changePassword(self, token, newpassword):
cursor = self._get_cursor()
username = self.tokenOper_get_username(token) username = self.tokenOper_get_username(token)
self.cursor.execute('UPDATE user SET [password] = ? WHERE [name] = ?;', ( cursor.execute('UPDATE user SET [password] = ? WHERE [name] = ?;', (
utils.ComputePasswordHash(newpassword), utils.ComputePasswordHash(newpassword),
username username
)) ))
@@ -605,23 +684,25 @@ class CalendarDatabase:
@SafeDatabaseOperation @SafeDatabaseOperation
def profile_getToken(self, token): def profile_getToken(self, token):
cursor = self._get_cursor()
username = self.tokenOper_get_username(token) username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT * FROM token WHERE [user] = ?;', ( cursor.execute('SELECT * FROM token WHERE [user] = ?;', (
username, username,
)) ))
return self.cursor.fetchall() return cursor.fetchall()
@SafeDatabaseOperation @SafeDatabaseOperation
def profile_deleteToken(self, token, deleteToken): def profile_deleteToken(self, token, deleteToken):
cursor = self._get_cursor()
_username = self.tokenOper_get_username(token) _username = self.tokenOper_get_username(token)
# delete # delete
self.cursor.execute('DELETE FROM token WHERE [user] = ? AND [token] = ?;', ( cursor.execute('DELETE FROM token WHERE [user] = ? AND [token] = ?;', (
_username, _username,
deleteToken deleteToken
)) ))
if self.cursor.rowcount != 1: if cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.') raise DbException('Fail to delete due to no matched rows or too much rows.')
return True return True

View File

@@ -1,12 +1,13 @@
from flask import Flask from flask import Flask
from flask import request from flask import request
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Callable from typing import Any, Callable, ParamSpec, TypeVar, Generic
import config import config
import database import database
import utils import utils
from logger import LOGGER from logger import LOGGER
from database import ResponseBody
app = Flask(__name__) app = Flask(__name__)
calendar_db = database.CalendarDatabase() calendar_db = database.CalendarDatabase()
@@ -15,319 +16,427 @@ calendar_db = database.CalendarDatabase()
# region: Common # region: Common
@app.route('/common/salt', methods=['POST'])
@app.route("/common/salt", methods=["POST"])
def api_common_saltHandle(): def api_common_saltHandle():
return SmartDbCaller(calendar_db.common_salt, return SmartDbCaller(
(FormField('username', str, False), ), calendar_db.common_salt, (FormField("username", str, False),), None
None) )
@app.route('/common/login', methods=['POST'])
@app.route("/common/login", methods=["POST"])
def api_common_loginHandle(): def api_common_loginHandle():
# construct client data first clientInfo = FetchClientNetworkInfo()
clientUa = request.user_agent.string
if request.headers.getlist("X-Forwarded-For"):
clientIp = request.headers.getlist("X-Forwarded-For")[0]
else:
clientIp = request.remote_addr
return SmartDbCaller(calendar_db.common_login, return SmartDbCaller(
(FormField('username', str, False), calendar_db.common_login,
FormField('password', str, False), (
FormField('clientUa', str, False), FormField("username", str, False),
FormField('clientIp', str, False)), FormField("password", str, False),
{ FormField("clientUa", str, False),
'clientUa': clientUa, FormField("clientIp", str, False),
'clientIp': clientIp ),
}) {"clientUa": clientInfo.user_agent, "clientIp": clientInfo.ip_addr},
)
@app.route('/common/webLogin', methods=['POST'])
@app.route("/common/webLogin", methods=["POST"])
def api_common_webLoginHandle(): def api_common_webLoginHandle():
# construct client data first clientInfo = FetchClientNetworkInfo()
clientUa = request.user_agent.string
if request.headers.getlist("X-Forwarded-For"):
clientIp = request.headers.getlist("X-Forwarded-For")[0]
else:
clientIp = request.remote_addr
return SmartDbCaller(calendar_db.common_webLogin, return SmartDbCaller(
(FormField('username', str, False), calendar_db.common_webLogin,
FormField('password', str, False), (
FormField('clientUa', str, False), FormField("username", str, False),
FormField('clientIp', str, False)), FormField("password", str, False),
{ FormField("clientUa", str, False),
'clientUa': clientUa, FormField("clientIp", str, False),
'clientIp': clientIp ),
}) {"clientUa": clientInfo.user_agent, "clientIp": clientInfo.ip_addr},
)
@app.route('/common/logout', methods=['POST'])
@app.route("/common/logout", methods=["POST"])
def api_common_logoutHandle(): def api_common_logoutHandle():
return SmartDbCaller(calendar_db.common_logout, return SmartDbCaller(
(FormField('token', str, False), ), calendar_db.common_logout, (FormField("token", str, False),), None
None) )
@app.route('/common/tokenValid', methods=['POST'])
@app.route("/common/tokenValid", methods=["POST"])
def api_common_tokenValidHandle(): def api_common_tokenValidHandle():
return SmartDbCaller(calendar_db.common_tokenValid, return SmartDbCaller(
(FormField('token', str, False), ), calendar_db.common_tokenValid, (FormField("token", str, False),), None
None) )
# endregion # endregion
# region: Calendar # region: Calendar
@app.route('/calendar/getFull', methods=['POST'])
@app.route("/calendar/getFull", methods=["POST"])
def api_calendar_getFullHandle(): def api_calendar_getFullHandle():
return SmartDbCaller(calendar_db.calendar_getFull, return SmartDbCaller(
(FormField('token', str, False), calendar_db.calendar_getFull,
FormField('startDateTime', int, False), (
FormField('endDateTime', int, False)), FormField("token", str, False),
None) FormField("startDateTime", int, False),
FormField("endDateTime", int, False),
),
None,
)
@app.route('/calendar/getList', methods=['POST'])
@app.route("/calendar/getList", methods=["POST"])
def api_calendar_getListHandle(): def api_calendar_getListHandle():
return SmartDbCaller(calendar_db.calendar_getList, return SmartDbCaller(
(FormField('token', str, False), calendar_db.calendar_getList,
FormField('startDateTime', int, False), (
FormField('endDateTime', int, False)), FormField("token", str, False),
None) FormField("startDateTime", int, False),
FormField("endDateTime", int, False),
),
None,
)
@app.route('/calendar/getDetail', methods=['POST'])
@app.route("/calendar/getDetail", methods=["POST"])
def api_calendar_getDetailHandle(): def api_calendar_getDetailHandle():
return SmartDbCaller(calendar_db.calendar_getDetail, return SmartDbCaller(
(FormField('token', str, False), calendar_db.calendar_getDetail,
FormField('uuid', str, False)), (FormField("token", str, False), FormField("uuid", str, False)),
None) None,
)
@app.route('/calendar/update', methods=['POST'])
@app.route("/calendar/update", methods=["POST"])
def api_calendar_updateHandle(): def api_calendar_updateHandle():
return SmartDbCaller(calendar_db.calendar_update, return SmartDbCaller(
(FormField('token', str, False), calendar_db.calendar_update,
FormField('uuid', str, False), (
FormField('belongTo', str, True), FormField("token", str, False),
FormField('title', str, True), FormField("uuid", str, False),
FormField('description', str, True), FormField("belongTo", str, True),
FormField('eventDateTimeStart', int, True), FormField("title", str, True),
FormField('eventDateTimeEnd', int, True), FormField("description", str, True),
FormField('loopRules', str, True), FormField("eventDateTimeStart", int, True),
FormField('timezoneOffset', int, True), FormField("eventDateTimeEnd", int, True),
FormField('lastChange', str, False)), FormField("loopRules", str, True),
None) FormField("timezoneOffset", int, True),
FormField("lastChange", str, False),
),
None,
)
@app.route('/calendar/add', methods=['POST'])
@app.route("/calendar/add", methods=["POST"])
def api_calendar_addHandle(): def api_calendar_addHandle():
return SmartDbCaller(calendar_db.calendar_add, return SmartDbCaller(
(FormField('token', str, False), calendar_db.calendar_add,
FormField('belongTo', str, False), (
FormField('title', str, False), FormField("token", str, False),
FormField('description', str, False), FormField("belongTo", str, False),
FormField('eventDateTimeStart', int, False), FormField("title", str, False),
FormField('eventDateTimeEnd', int, False), FormField("description", str, False),
FormField('loopRules', str, False), FormField("eventDateTimeStart", int, False),
FormField('timezoneOffset', int, False)), FormField("eventDateTimeEnd", int, False),
None) FormField("loopRules", str, False),
FormField("timezoneOffset", int, False),
),
None,
)
@app.route('/calendar/delete', methods=['POST'])
@app.route("/calendar/delete", methods=["POST"])
def api_calendar_deleteHandle(): def api_calendar_deleteHandle():
return SmartDbCaller(calendar_db.calendar_delete, return SmartDbCaller(
(FormField('token', str, False), calendar_db.calendar_delete,
FormField('uuid', str, False), (
FormField('lastChange', str, False)), FormField("token", str, False),
None) FormField("uuid", str, False),
FormField("lastChange", str, False),
),
None,
)
# endregion # endregion
# region: Collection # region: Collection
@app.route('/collection/getFullOwn', methods=['POST'])
@app.route("/collection/getFullOwn", methods=["POST"])
def api_collection_getFullOwnHandle(): def api_collection_getFullOwnHandle():
return SmartDbCaller(calendar_db.collection_getFullOwn, return SmartDbCaller(
(FormField('token', str, False), ), calendar_db.collection_getFullOwn, (FormField("token", str, False),), None
None) )
@app.route('/collection/getListOwn', methods=['POST'])
@app.route("/collection/getListOwn", methods=["POST"])
def api_collection_getListOwnHandle(): def api_collection_getListOwnHandle():
return SmartDbCaller(calendar_db.collection_getListOwn, return SmartDbCaller(
(FormField('token', str, False), ), calendar_db.collection_getListOwn, (FormField("token", str, False),), None
None) )
@app.route('/collection/getDetailOwn', methods=['POST'])
@app.route("/collection/getDetailOwn", methods=["POST"])
def api_collection_getDetailOwnHandle(): def api_collection_getDetailOwnHandle():
return SmartDbCaller(calendar_db.collection_getDetailOwn, return SmartDbCaller(
(FormField('token', str, False), calendar_db.collection_getDetailOwn,
FormField('uuid', str, False)), (FormField("token", str, False), FormField("uuid", str, False)),
None) None,
)
@app.route('/collection/addOwn', methods=['POST'])
@app.route("/collection/addOwn", methods=["POST"])
def api_collection_addOwnHandle(): def api_collection_addOwnHandle():
return SmartDbCaller(calendar_db.collection_addOwn, return SmartDbCaller(
(FormField('token', str, False), calendar_db.collection_addOwn,
FormField('name', str, False)), (FormField("token", str, False), FormField("name", str, False)),
None) None,
)
@app.route('/collection/updateOwn', methods=['POST'])
@app.route("/collection/updateOwn", methods=["POST"])
def api_collection_updateOwnHandle(): def api_collection_updateOwnHandle():
return SmartDbCaller(calendar_db.collection_updateOwn, return SmartDbCaller(
(FormField('token', str, False), calendar_db.collection_updateOwn,
FormField('uuid', str, False), (
FormField('name', str, False), FormField("token", str, False),
FormField('lastChange', str, False)), FormField("uuid", str, False),
None) FormField("name", str, False),
FormField("lastChange", str, False),
),
None,
)
@app.route('/collection/deleteOwn', methods=['POST'])
@app.route("/collection/deleteOwn", methods=["POST"])
def api_collection_deleteOwnHandle(): def api_collection_deleteOwnHandle():
return SmartDbCaller(calendar_db.collection_deleteOwn, return SmartDbCaller(
(FormField('token', str, False), calendar_db.collection_deleteOwn,
FormField('uuid', str, False), (
FormField('lastChange', str, False)), FormField("token", str, False),
None) FormField("uuid", str, False),
FormField("lastChange", str, False),
),
None,
)
@app.route('/collection/getSharing', methods=['POST']) @app.route("/collection/getSharing", methods=["POST"])
def api_collection_getSharingHandle(): def api_collection_getSharingHandle():
return SmartDbCaller(calendar_db.collection_getSharing, return SmartDbCaller(
(FormField('token', str, False), calendar_db.collection_getSharing,
FormField('uuid', str, False)), (FormField("token", str, False), FormField("uuid", str, False)),
None) None,
)
@app.route('/collection/deleteSharing', methods=['POST'])
@app.route("/collection/deleteSharing", methods=["POST"])
def api_collection_deleteSharingHandle(): def api_collection_deleteSharingHandle():
return SmartDbCaller(calendar_db.collection_deleteSharing, return SmartDbCaller(
(FormField('token', str, False), calendar_db.collection_deleteSharing,
FormField('uuid', str, False), (
FormField('target', str, False), FormField("token", str, False),
FormField('lastChange', str, False)), FormField("uuid", str, False),
None) FormField("target", str, False),
FormField("lastChange", str, False),
),
None,
)
@app.route('/collection/addSharing', methods=['POST'])
@app.route("/collection/addSharing", methods=["POST"])
def api_collection_addSharingHandle(): def api_collection_addSharingHandle():
return SmartDbCaller(calendar_db.collection_addSharing, return SmartDbCaller(
(FormField('token', str, False), calendar_db.collection_addSharing,
FormField('uuid', str, False), (
FormField('target', str, False), FormField("token", str, False),
FormField('lastChange', str, False)), FormField("uuid", str, False),
None) FormField("target", str, False),
FormField("lastChange", str, False),
),
None,
)
@app.route('/collection/getShared', methods=['POST']) @app.route("/collection/getShared", methods=["POST"])
def api_collection_getSharedHandle(): def api_collection_getSharedHandle():
return SmartDbCaller(calendar_db.collection_getShared, return SmartDbCaller(
(FormField('token', str, False), ), calendar_db.collection_getShared, (FormField("token", str, False),), None
None) )
# endregion # endregion
# region: Todo # region: Todo
@app.route('/todo/getFull', methods=['POST'])
@app.route("/todo/getFull", methods=["POST"])
def api_todo_getFullHandle(): def api_todo_getFullHandle():
return SmartDbCaller(calendar_db.todo_getFull, return SmartDbCaller(
(FormField('token', str, False), ), calendar_db.todo_getFull, (FormField("token", str, False),), None
None) )
@app.route('/todo/getList', methods=['POST'])
@app.route("/todo/getList", methods=["POST"])
def api_todo_getListHandle(): def api_todo_getListHandle():
return SmartDbCaller(calendar_db.todo_getList, return SmartDbCaller(
(FormField('token', str, False), ), calendar_db.todo_getList, (FormField("token", str, False),), None
None) )
@app.route('/todo/getDetail', methods=['POST'])
@app.route("/todo/getDetail", methods=["POST"])
def api_todo_getDetailHandle(): def api_todo_getDetailHandle():
return SmartDbCaller(calendar_db.todo_getDetail, return SmartDbCaller(
(FormField('token', str, False), calendar_db.todo_getDetail,
FormField('uuid', str, False)), (FormField("token", str, False), FormField("uuid", str, False)),
None) None,
)
@app.route('/todo/add', methods=['POST'])
@app.route("/todo/add", methods=["POST"])
def api_todo_addHandle(): def api_todo_addHandle():
return SmartDbCaller(calendar_db.todo_add, return SmartDbCaller(calendar_db.todo_add, (FormField("token", str, False),), None)
(FormField('token', str, False), ),
None)
@app.route('/todo/update', methods=['POST'])
@app.route("/todo/update", methods=["POST"])
def api_todo_updateHandle(): def api_todo_updateHandle():
return SmartDbCaller(calendar_db.todo_update, return SmartDbCaller(
(FormField('token', str, False), calendar_db.todo_update,
FormField('uuid', str, False), (
FormField('data', str, False), FormField("token", str, False),
FormField('lastChange', str, False)), FormField("uuid", str, False),
None) FormField("data", str, False),
FormField("lastChange", str, False),
),
None,
)
@app.route('/todo/delete', methods=['POST'])
@app.route("/todo/delete", methods=["POST"])
def api_todo_deleteHandle(): def api_todo_deleteHandle():
return SmartDbCaller(calendar_db.todo_delete, return SmartDbCaller(
(FormField('token', str, False), calendar_db.todo_delete,
FormField('uuid', str, False), (
FormField('lastChange', str, False)), FormField("token", str, False),
None) FormField("uuid", str, False),
FormField("lastChange", str, False),
),
None,
)
# endregion # endregion
# region: Admin # region: Admin
@app.route('/admin/get', methods=['POST'])
@app.route("/admin/get", methods=["POST"])
def api_admin_getHandle(): def api_admin_getHandle():
return SmartDbCaller(calendar_db.admin_get, return SmartDbCaller(calendar_db.admin_get, (FormField("token", str, False),), None)
(FormField('token', str, False), ),
None)
@app.route('/admin/add', methods=['POST'])
@app.route("/admin/add", methods=["POST"])
def api_admin_addHandle(): def api_admin_addHandle():
return SmartDbCaller(calendar_db.admin_add, return SmartDbCaller(
(FormField('token', str, False), calendar_db.admin_add,
FormField('username', str, False)), (FormField("token", str, False), FormField("username", str, False)),
None) None,
)
@app.route('/admin/update', methods=['POST'])
@app.route("/admin/update", methods=["POST"])
def api_admin_updateHandle(): def api_admin_updateHandle():
return SmartDbCaller(calendar_db.admin_update, return SmartDbCaller(
(FormField('token', str, False), calendar_db.admin_update,
FormField('username', str, False), (
FormField('password', str, True), FormField("token", str, False),
FormField('isAdmin', utils.Str2Bool, True)), FormField("username", str, False),
None) FormField("password", str, True),
FormField("isAdmin", utils.Str2Bool, True),
),
None,
)
@app.route('/admin/delete', methods=['POST'])
@app.route("/admin/delete", methods=["POST"])
def api_admin_deleteHandle(): def api_admin_deleteHandle():
return SmartDbCaller(calendar_db.admin_delete, return SmartDbCaller(
(FormField('token', str, False), calendar_db.admin_delete,
FormField('username', str, False)), (FormField("token", str, False), FormField("username", str, False)),
None) None,
)
# endregion # endregion
# region: Profile # region: Profile
@app.route('/profile/isAdmin', methods=['POST'])
@app.route("/profile/isAdmin", methods=["POST"])
def api_profile_isAdminHandle(): def api_profile_isAdminHandle():
return SmartDbCaller(calendar_db.profile_isAdmin, return SmartDbCaller(
(FormField('token', str, False), ), calendar_db.profile_isAdmin, (FormField("token", str, False),), None
None) )
@app.route('/profile/changePassword', methods=['POST'])
@app.route("/profile/changePassword", methods=["POST"])
def api_profile_changePasswordHandle(): def api_profile_changePasswordHandle():
return SmartDbCaller(calendar_db.profile_changePassword, return SmartDbCaller(
(FormField('token', str, False), calendar_db.profile_changePassword,
FormField('password', str, False)), (FormField("token", str, False), FormField("password", str, False)),
None) None,
)
@app.route('/profile/getToken', methods=['POST'])
@app.route("/profile/getToken", methods=["POST"])
def api_profile_getTokenHandle(): def api_profile_getTokenHandle():
return SmartDbCaller(calendar_db.profile_getToken, return SmartDbCaller(
(FormField('token', str, False), ), calendar_db.profile_getToken, (FormField("token", str, False),), None
None) )
@app.route('/profile/deleteToken', methods=['POST'])
@app.route("/profile/deleteToken", methods=["POST"])
def api_profile_deleteTokenHandle(): def api_profile_deleteTokenHandle():
return SmartDbCaller(calendar_db.profile_deleteToken, return SmartDbCaller(
(FormField('token', str, False), calendar_db.profile_deleteToken,
FormField('deleteToken', str, False)), (FormField("token", str, False), FormField("deleteToken", str, False)),
None) None,
)
# endregion # endregion
# endregion # endregion
# region: Misc Functions # region: Utilities
@dataclass(frozen=True)
class ClientNetworkInfo:
user_agent: str
"""The user agent of client."""
ip_addr: str
"""The IP address of client."""
def FetchClientNetworkInfo() -> ClientNetworkInfo:
clientUa = request.user_agent.string
forwardIpList = request.headers.getlist("X-Forwarded-For")
if forwardIpList:
clientIp = forwardIpList[0]
else:
directIp = request.remote_addr
if directIp is not None:
clientIp = directIp
else:
clientIp = "0.0.0.0"
return ClientNetworkInfo(clientUa, clientIp)
@dataclass(frozen=True) @dataclass(frozen=True)
class FormField: class FormField:
@@ -338,19 +447,28 @@ class FormField:
is_optional: bool is_optional: bool
"""True if this form field is optional, otherwise false.""" """True if this form field is optional, otherwise false."""
def SmartDbCaller(db_method: Callable, fields: tuple[FormField, ...], padding_form: dict[str, Any] | None) -> dict[str, Any]:
result = ResponseBody(False, 'Invalid parameter', None) def SmartDbCaller(
db_method: Callable[..., ResponseBody[Any]],
fields: tuple[FormField, ...],
padding_form: dict[str, str] | None,
) -> dict[str, Any]:
opt_param_counter = 0 opt_param_counter = 0
lost_required: bool = False
param_list: list[Any] = [] param_list: list[Any] = []
opt_param_dict: dict[str, Any] = {} opt_param_dict: dict[str, Any] = {}
real_form = request.form.to_dict() # fetch user passed form
LOGGER.debug(f'Form: {real_form}') user_form: dict[str, str] = request.form.to_dict()
LOGGER.debug(f"User Form: {user_form}")
# overwrite user form by our padding form
if padding_form is not None: if padding_form is not None:
real_form.update(padding_form) user_form.update(padding_form)
LOGGER.debug(f"Padded User Form: {user_form}")
# check fields one by one
for field in fields: for field in fields:
value = real_form.get(field.name, None) value = user_form.get(field.name, None)
if value is not None: if value is not None:
value = field.ty(value) value = field.ty(value)
@@ -362,36 +480,34 @@ def SmartDbCaller(db_method: Callable, fields: tuple[FormField, ...], padding_fo
else: else:
# required param # required param
if value is None: if value is None:
break lost_required = True
else:
param_list.append(value) param_list.append(value)
# at least one opt param # Only execute database function if there is no lost required fields.
LOGGER.debug(f'All Optional Parameter: {opt_param_counter}') # And fulfill one of following requirements:
LOGGER.debug(f'Optional Parameter Count: {len(opt_param_dict)}') # 1. There are all required fields (optional parameter count is zero).
if opt_param_counter == 0 or len(opt_param_dict) != 0: # 1. Or, there is some optional parameter.
result: ResponseBody = db_method(*param_list, **opt_param_dict) LOGGER.debug(f"Has Lost Required Parameter: {lost_required}")
LOGGER.debug(f"All Optional Parameter Count: {opt_param_counter}")
LOGGER.debug(f"Available Optional Parameter Count: {len(opt_param_dict)}")
result: ResponseBody[Any]
if lost_required == False and (opt_param_counter == 0 or len(opt_param_dict) != 0):
result = db_method(*param_list, **opt_param_dict)
else:
result = ResponseBody(False, "Invalid parameter", None)
return ConstructResponseBody(result) return ConstructResponseBody(result)
@dataclass(frozen=True)
class ResponseBody:
success: bool
"""True if this operation is successful, otherwise false."""
error: str
"""The error message provided when operation failed."""
data: Any
"""The payload provided when operation successed."""
def ConstructResponseBody(body: ResponseBody) -> dict[str, Any]: def ConstructResponseBody(body: ResponseBody[Any]) -> dict[str, Any]:
return { return {"success": body.success, "error": body.error, "data": body.data}
'success': body.success,
'error': body.error,
'data': body.data # endregion
}
def run(): def run():
calendar_db.open() calendar_db.open()
app.run(port=config.get_config().web.port) app.run(port=config.get_config().web.port)
calendar_db.close() calendar_db.close()
# endregion

View File

@@ -3,50 +3,62 @@ import random
import uuid import uuid
import time import time
import math import math
import re
ValidUsername = set(map(lambda x:chr(x), range(48, 58, 1))) | set(map(lambda x:chr(x), range(65, 91, 1))) | set(map(lambda x:chr(x), range(97, 123, 1))) USERNAME_PATTERN: re.Pattern = re.compile("^[0-9A-Za-z]+$")
ValidPassword = set(map(lambda x:chr(x), range(33, 127, 1))) PASSWORD_PATTERN: re.Pattern = re.compile("^[!-~]+$")
def IsValidUsername(strl):
return (len(set(strl) - ValidUsername) == 0)
def IsValidPassword(strl): def IsValidUsername(strl: str) -> bool:
return (len(set(strl) - ValidPassword) == 0) return USERNAME_PATTERN.match(strl) is not None
def ComputePasswordHash(password):
def IsValidPassword(strl: str) -> bool:
return PASSWORD_PATTERN.match(strl) is not None
def ComputePasswordHash(password: str) -> str:
s = hashlib.sha256() s = hashlib.sha256()
s.update(password.encode('utf-8')) s.update(password.encode("utf-8"))
return s.hexdigest() return s.hexdigest()
def GenerateUUID():
def GenerateUUID() -> str:
return str(uuid.uuid1()) return str(uuid.uuid1())
def GenerateToken(username):
def GenerateToken(username: str) -> str:
s = hashlib.sha256() s = hashlib.sha256()
s.update(username.encode('utf-8')) s.update(username.encode("utf-8"))
s.update(GenerateUUID().encode('utf-8')) s.update(GenerateUUID().encode("utf-8"))
return s.hexdigest() return s.hexdigest()
def GenerateSalt():
def GenerateSalt() -> int:
return random.randint(0, 6172748) return random.randint(0, 6172748)
def ComputePasswordHashWithSalt(passwordHashed, salt):
def ComputePasswordHashWithSalt(passwordHashed: str, salt: int) -> str:
s = hashlib.sha256() s = hashlib.sha256()
s.update((passwordHashed + str(salt)).encode('utf-8')) s.update((passwordHashed + str(salt)).encode("utf-8"))
return s.hexdigest() return s.hexdigest()
def GetCurrentTimestamp():
def GetCurrentTimestamp() -> int:
return int(time.time()) return int(time.time())
def GetTokenExpireOn():
def GetTokenExpireOn() -> int:
return GetCurrentTimestamp() + 60 * 60 * 24 * 2 # add 2 day from now return GetCurrentTimestamp() + 60 * 60 * 24 * 2 # add 2 day from now
def Str2Bool(strl):
return strl.lower() == 'true'
def GCD(a, b): def Str2Bool(strl: str) -> bool:
return strl.lower() == "true"
def GCD(a: int, b: int) -> int:
return math.gcd(a, b) return math.gcd(a, b)
def LCM(a, b):
return int(a * b / GCD(a, b))
def LCM(a: int, b: int) -> int:
return (a * b) // GCD(a, b)