1
0
Files
coconut-leaf/backend/database.py
yyc12345 2a280dcba0 feat: support login
- migrate old API into typescript but not finished (only webLogin works now)
- seperate the logger of backend due to the shitty behavior of Flask (change logging level)
2026-05-14 21:13:13 +08:00

628 lines
23 KiB
Python

import sqlite3
import threading
from typing import cast
from pathlib import Path
from dataclasses import dataclass
from typing import Any
import dt
import utils
import config
from logger import LOGGER
@dataclass(frozen=True)
class ResponseBody:
success: bool
"""True if this operation is successful, otherwise false."""
error: str
"""The error message provided when operation failed."""
data: Any
"""The payload provided when operation successed."""
def SafeDatabaseOperation(func):
def wrapper(self: 'CalendarDatabase', *args, **kwargs):
cfg = config.get_config()
with self.mutex:
# check database and acquire cursor
try:
self.check_database()
self.cursor = self.db.cursor()
except Exception as e:
self.cursor = None
if cfg.others.debug:
LOGGER.exception(e)
return ResponseBody(False, str(e), None)
# do real data work
try:
currentTime = utils.GetCurrentTimestamp()
if currentTime - self.latestClean > cfg.others.auto_token_clean_duration:
self.latestClean = currentTime
LOGGER.info('Cleaning outdated token...')
self.tokenOper_clean()
result = ResponseBody(True, '', func(self, *args, **kwargs))
self.cursor.close()
self.cursor = None
self.db.commit()
return result
except Exception as e:
self.cursor.close()
self.cursor = None
self.db.rollback()
if cfg.others.debug:
LOGGER.exception(e)
return ResponseBody(False, str(e), None)
return wrapper
class CalendarDatabase:
db: sqlite3.Connection
cursor: sqlite3.Cursor
mutex: threading.Lock
latestClean: int
def __init__(self):
self.db = None
self.cursor = None
self.mutex = threading.Lock()
self.latestClean = 0
def open(self):
if (self.is_database_valid()):
raise Exception('Databade is opened')
cfg = config.get_config()
match cfg.database.driver:
case config.DatabaseDriver.SQLITE:
self.db = sqlite3.connect(cast(config.SqliteDatabaseConfig, cfg.database.config).path, check_same_thread = False)
self.db.execute('PRAGMA encoding = "UTF-8";')
self.db.execute('PRAGMA foreign_keys = ON;')
case config.DatabaseDriver.MYSQL:
raise Exception('Not implemented database')
case _:
raise Exception('Unknow database type')
def init(self, username, password):
if (self.is_database_valid()):
raise Exception('Database is opened')
# establish tables
cfg = config.get_config()
backend_path = Path(__file__).resolve().parent
backend_sql_path = backend_path / 'sql'
match cfg.database.driver:
case config.DatabaseDriver.SQLITE:
sql_file = backend_sql_path / 'sqlite.sql'
case config.DatabaseDriver.MYSQL:
raise Exception('Not implemented database')
case _:
raise Exception('Unknow database type')
self.open()
cursor = self.db.cursor()
with open(sql_file, 'r', encoding='utf-8') as fsql:
cursor.executescript(fsql.read())
# finish init
cursor.execute('INSERT INTO user VALUES (?, ?, ?, ?);', (
username,
utils.ComputePasswordHash(password),
1,
utils.GenerateSalt()
))
cursor.close()
self.db.commit()
def close(self):
self.check_database()
self.db.close()
self.db = None
def check_database(self):
if (not self.is_database_valid()):
raise Exception('Databade is None')
def is_database_valid(self):
return not (self.db == None)
# ======================= token related internal operation
def tokenOper_clean(self):
# remove outdated token
self.cursor.execute('DELETE FROM token WHERE [token_expire_on] <= ?',(utils.GetCurrentTimestamp(), ))
def tokenOper_postpone_expireOn(self, token):
self.cursor.execute('UPDATE token SET [token_expire_on] = ? WHERE [token] = ?;', (
utils.GetTokenExpireOn(),
token
))
def tokenOper_check_valid(self, token):
self.tokenOper_get_username(token)
def tokenOper_is_admin(self, username):
self.cursor.execute('SELECT [is_admin] FROM user WHERE [name] = ?;',(username, ))
cache = self.cursor.fetchone()[0]
return cache == 1
def tokenOper_get_username(self, token):
self.cursor.execute('SELECT [user] FROM token WHERE [token] = ? AND [token_expire_on] > ?;',(
token,
utils.GetCurrentTimestamp()
))
result = self.cursor.fetchone()[0]
# need postpone expire on time
self.tokenOper_postpone_expireOn(token)
return result
# =============================== # =============================== operation function
# =============================== common
@SafeDatabaseOperation
def common_salt(self, username):
salt = utils.GenerateSalt()
self.cursor.execute('UPDATE user SET [salt] = ? WHERE [name] = ?;', (
salt,
username
))
return salt
@SafeDatabaseOperation
def common_login(self, username, password, clientUa, clientIp):
self.cursor.execute('SELECT [password], [salt] FROM user WHERE [name] = ?;', (username, ))
(gotten_salt, gotten_password) = self.cursor.fetchone()
if password == utils.ComputePasswordHashWithSalt(gotten_password, gotten_salt):
token = utils.GenerateToken(username)
self.cursor.execute('UPDATE user SET [salt] = ? WHERE [name] = ?;', (
utils.GenerateSalt(), # regenerate a new slat to prevent re-login try
username
))
self.cursor.execute('INSERT INTO token VALUES (?, ?, ?, ?, ?);', (
username,
token,
utils.GetTokenExpireOn(), # add 2 day from now
clientUa,
clientIp,
))
return token
else:
# throw a exception to indicate fail to login
raise Exception('Login authentication failed')
@SafeDatabaseOperation
def common_webLogin(self, username, password, clientUa, clientIp):
LOGGER.debug(f'WebLogin Username: {username}')
LOGGER.debug(f'WebLogin Password: {password}')
passwordHash = utils.ComputePasswordHash(password)
LOGGER.debug(f'WebLogin Password Hash: {passwordHash}')
self.cursor.execute('SELECT [name] FROM user WHERE [name] = ? AND [password] = ?;', (username, passwordHash))
if len(self.cursor.fetchall()) != 0:
token = utils.GenerateToken(username)
self.cursor.execute('INSERT INTO token VALUES (?, ?, ?, ?, ?);', (
username,
token,
utils.GetTokenExpireOn(), # add 2 day from now
clientUa,
clientIp,
))
return token
else:
# throw a exception to indicate fail to login
raise Exception('Login authentication failed')
@SafeDatabaseOperation
def common_logout(self, token):
self.tokenOper_check_valid(token)
self.cursor.execute('DELETE FROM token WHERE [token] = ?;', (token, ))
return True
@SafeDatabaseOperation
def common_tokenValid(self, token):
self.tokenOper_check_valid(token)
return True
# =============================== calendar
@SafeDatabaseOperation
def calendar_getFull(self, token, startDateTime, endDateTime):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT calendar.* FROM calendar INNER JOIN collection \
ON collection.uuid = calendar.belong_to \
WHERE (collection.user = ? AND calendar.loop_date_time_end >= ? AND calendar.loop_date_time_start - (calendar.event_date_time_end - calendar.event_date_time_start) <= ?);',
(username, startDateTime, endDateTime))
return self.cursor.fetchall()
@SafeDatabaseOperation
def calendar_getList(self, token, startDateTime, endDateTime):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT calendar.uuid FROM calendar INNER JOIN collection \
ON collection.uuid = calendar.belong_to \
WHERE (collection.user = ? AND calendar.loop_date_time_end >= ? AND calendar.loop_date_time_start - (calendar.event_date_time_end - calendar.event_date_time_start) <= ?);',
(username, startDateTime, endDateTime))
return tuple(map(lambda x: x[0], self.cursor.fetchall()))
@SafeDatabaseOperation
def calendar_getDetail(self, token, uuid):
self.tokenOper_check_valid(token)
self.cursor.execute('SELECT * FROM calendar WHERE [uuid] = ?;', (uuid, ))
return self.cursor.fetchone()
@SafeDatabaseOperation
def calendar_update(self, token, uuid, lastChange, **optArgs):
self.tokenOper_check_valid(token)
# get prev data
self.cursor.execute('SELECT * FROM calendar WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange))
analyseData = list(self.cursor.fetchone())
# construct update data
lastupdate = utils.GenerateUUID()
sqlList = [
'[last_change] = ?',
]
argumentsList = [
lastupdate,
]
# analyse opt arg
reAnalyseLoop = False
cache = optArgs.get('belongTo', None)
if cache is not None:
sqlList.append('[belong_to] = ?')
argumentsList.append(cache)
cache = optArgs.get('title', None)
if cache is not None:
sqlList.append('[title] = ?')
argumentsList.append(cache)
cache = optArgs.get('description', None)
if cache is not None:
sqlList.append('[description] = ?')
argumentsList.append(cache)
cache = optArgs.get('eventDateTimeStart', None)
if cache is not None:
sqlList.append('[event_date_time_start] = ?')
argumentsList.append(cache)
reAnalyseLoop = True
analyseData[5] = cache
cache = optArgs.get('eventDateTimeEnd', None)
if cache is not None:
sqlList.append('[event_date_time_end] = ?')
argumentsList.append(cache)
cache = optArgs.get('loopRules', None)
if cache is not None:
sqlList.append('[loop_rules] = ?')
argumentsList.append(cache)
reAnalyseLoop = True
analyseData[8] = cache
cache = optArgs.get('timezoneOffset', None)
if cache is not None:
sqlList.append('[timezone_offset] = ?')
argumentsList.append(cache)
reAnalyseLoop = True
analyseData[7] = cache
if reAnalyseLoop:
# re-compute loop data and upload it into list
sqlList.append('[loop_date_time_start] = ?')
argumentsList.append(analyseData[5])
sqlList.append('[loop_date_time_end] = ?')
argumentsList.append(str(dt.ResolveLoopStr(
analyseData[8],
analyseData[5],
analyseData[7]
)))
# execute
argumentsList.append(uuid)
self.cursor.execute('UPDATE calendar SET {} WHERE [uuid] = ?;'.format(', '.join(sqlList)),
tuple(argumentsList))
if self.cursor.rowcount != 1:
raise Exception('Fail to update due to no matched rows or too much rows.')
return lastupdate
@SafeDatabaseOperation
def calendar_add(self, token, belongTo, title, description, eventDateTimeStart, eventDateTimeEnd, loopRules, timezoneOffset):
self.tokenOper_check_valid(token)
newuuid = utils.GenerateUUID()
lastupdate = utils.GenerateUUID()
# analyse loopRules and output following 2 fileds.
loopDateTimeStart = eventDateTimeStart
loopDateTimeEnd = dt.ResolveLoopStr(loopRules, eventDateTimeStart, timezoneOffset)
self.cursor.execute('INSERT INTO calendar VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);',
(newuuid,
belongTo,
title,
description,
lastupdate,
eventDateTimeStart,
eventDateTimeEnd,
timezoneOffset,
loopRules,
loopDateTimeStart,
loopDateTimeEnd))
return newuuid
@SafeDatabaseOperation
def calendar_delete(self, token, uuid, lastChange):
self.tokenOper_check_valid(token)
self.cursor.execute('DELETE FROM calendar WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange))
if self.cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.')
return True
# =============================== collection
@SafeDatabaseOperation
def collection_getFullOwn(self, token):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT [uuid], [name], [last_change] FROM collection WHERE [user] = ?;', (username, ))
return self.cursor.fetchall()
@SafeDatabaseOperation
def collection_getListOwn(self, token):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT [uuid] FROM collection WHERE [user] = ?;', (username, ))
return tuple(map(lambda x: x[0], self.cursor.fetchall()))
@SafeDatabaseOperation
def collection_getDetailOwn(self, token, uuid):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT [uuid], [name], [last_change] FROM collection WHERE [user] = ? AND [uuid] = ?;', (username, uuid))
return self.cursor.fetchone()
@SafeDatabaseOperation
def collection_addOwn(self, token, newname):
username = self.tokenOper_get_username(token)
newuuid = utils.GenerateUUID()
lastupdate = utils.GenerateUUID()
self.cursor.execute('INSERT INTO collection VALUES (?, ?, ?, ?);',
(newuuid, newname, username, lastupdate))
return newuuid
@SafeDatabaseOperation
def collection_updateOwn(self, token, uuid, newname, lastChange):
self.tokenOper_check_valid(token)
lastupdate = utils.GenerateUUID()
self.cursor.execute('UPDATE collection SET [name] = ?, [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', (
newname,
lastupdate,
uuid,
lastChange
))
if self.cursor.rowcount != 1:
raise Exception('Fail to update due to no matched rows or too much rows.')
return lastupdate
@SafeDatabaseOperation
def collection_deleteOwn(self, token, uuid, lastChange):
self.tokenOper_check_valid(token)
self.cursor.execute('DELETE FROM collection WHERE [uuid] = ? AND [last_change] = ?;', (
uuid,
lastChange
))
if self.cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.')
return True
@SafeDatabaseOperation
def collection_getSharing(self, token, uuid):
self.tokenOper_check_valid(token)
self.cursor.execute('SELECT [target] FROM share WHERE [uuid] = ?;', (uuid, ))
return tuple(map(lambda x: x[0], self.cursor.fetchall()))
@SafeDatabaseOperation
def collection_deleteSharing(self, token, uuid, target, lastChange):
self.tokenOper_check_valid(token)
lastupdate = utils.GenerateUUID()
self.cursor.execute('UPDATE collection SET [last_change] = ?, WHERE [uuid] = ? AND [last_change] = ?;', (lastupdate, uuid, lastChange))
if self.cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.')
self.cursor.execute('DELETE FROM share WHERE [uuid] = ? AND [target] = ?;', (uuid, target))
if self.cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.')
return lastupdate
@SafeDatabaseOperation
def collection_addSharing(self, token, uuid, target, lastChange):
self.tokenOper_check_valid(token)
lastupdate = utils.GenerateUUID()
self.cursor.execute('UPDATE collection SET [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', (lastupdate, uuid, lastChange))
if self.cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.')
self.cursor.execute('SELECT * FROM share WHERE [uuid] = ? AND [target] = ?;', (uuid, target))
if len(self.cursor.fetchall()) != 0:
raise Exception('Fail to insert duplicated item.')
self.cursor.execute('INSERT INTO share VALUES (?, ?);', (uuid, target))
return lastupdate
@SafeDatabaseOperation
def collection_getShared(self, token):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT collection.uuid, collection.name, collection.user \
FROM share INNER JOIN collection \
ON share.uuid = collection.uuid \
WHERE share.target = ?;', (username, ))
return self.cursor.fetchall()
# =============================== todo
@SafeDatabaseOperation
def todo_getFull(self, token):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT * FROM todo WHERE [belong_to] = ?;', (username, ))
return self.cursor.fetchall()
@SafeDatabaseOperation
def todo_getList(self, token):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT [uuid] FROM todo WHERE [belong_to] = ?;', (username, ))
return tuple(map(lambda x: x[0], self.cursor.fetchall()))
@SafeDatabaseOperation
def todo_getDetail(self, token, uuid):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT * FROM todo WHERE [belong_to] = ? AND [uuid] = ?;', (username, uuid))
return self.cursor.fetchone()
@SafeDatabaseOperation
def todo_add(self, token):
username = self.tokenOper_get_username(token)
newuuid = utils.GenerateUUID()
lastupdate = utils.GenerateUUID()
returnedData = (
newuuid,
username,
'',
lastupdate,
)
self.cursor.execute('INSERT INTO todo VALUES (?, ?, ?, ?);', returnedData)
return returnedData
@SafeDatabaseOperation
def todo_update(self, token, uuid, data, lastChange):
# check valid token
self.tokenOper_check_valid(token)
# update
newLastChange = utils.GenerateUUID()
self.cursor.execute('UPDATE todo SET [data] = ?, [last_change] = ? WHERE [uuid] = ? AND [last_change] = ?;', (
data,
newLastChange,
uuid,
lastChange
))
if self.cursor.rowcount != 1:
raise Exception('Fail to update due to no matched rows or too much rows.')
return newLastChange
@SafeDatabaseOperation
def todo_delete(self, token, uuid, lastChange):
# check valid token
self.tokenOper_check_valid(token)
# delete
self.cursor.execute('DELETE FROM todo WHERE [uuid] = ? AND [last_change] = ?;', (uuid, lastChange))
if self.cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.')
return True
# =============================== admin
@SafeDatabaseOperation
def admin_get(self, token):
username = self.tokenOper_get_username(token)
if not self.tokenOper_is_admin(username):
raise Exception('Permission denied.')
self.cursor.execute('SELECT [name], [is_admin] FROM user;')
return tuple(map(lambda x: (x[0], x[1] == 1), self.cursor.fetchall()))
@SafeDatabaseOperation
def admin_add(self, token, newname):
username = self.tokenOper_get_username(token)
if not self.tokenOper_is_admin(username):
raise Exception('Permission denied.')
newpassword = utils.ComputePasswordHash(utils.GenerateUUID())
self.cursor.execute('INSERT INTO user VALUES (?, ?, ?, ?);', (
newname,
newpassword,
0,
utils.GenerateSalt()
))
return (newname, False)
@SafeDatabaseOperation
def admin_update(self, token, _username, **optArgs):
username = self.tokenOper_get_username(token)
if not self.tokenOper_is_admin(username):
raise Exception('Permission denied.')
# construct data
sqlList = []
argumentsList = []
# analyse opt arg
cache = optArgs.get('password', None)
if cache is not None:
sqlList.append('[password] = ?')
argumentsList.append(utils.ComputePasswordHash(cache))
cache = optArgs.get('isAdmin', None)
if cache is not None:
sqlList.append('[is_admin] = ?')
argumentsList.append(1 if cache else 0)
# execute
argumentsList.append(_username)
self.cursor.execute('UPDATE user SET {} WHERE [name] = ?;'.format(', '.join(sqlList)),
tuple(argumentsList))
LOGGER.debug(cache)
LOGGER.debug(tuple(argumentsList))
if self.cursor.rowcount != 1:
raise Exception('Fail to update due to no matched rows or too much rows.')
return True
@SafeDatabaseOperation
def admin_delete(self, token, username):
_username = self.tokenOper_get_username(token)
if not self.tokenOper_is_admin(_username):
raise Exception('Permission denied.')
# delete
self.cursor.execute('DELETE FROM user WHERE [name] = ?;', (username, ))
if self.cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.')
return True
# =============================== profile
@SafeDatabaseOperation
def profile_isAdmin(self, token):
username = self.tokenOper_get_username(token)
return self.tokenOper_is_admin(username)
@SafeDatabaseOperation
def profile_changePassword(self, token, newpassword):
username = self.tokenOper_get_username(token)
self.cursor.execute('UPDATE user SET [password] = ? WHERE [name] = ?;', (
utils.ComputePasswordHash(newpassword),
username
))
return True
@SafeDatabaseOperation
def profile_getToken(self, token):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT * FROM token WHERE [user] = ?;', (
username,
))
return self.cursor.fetchall()
@SafeDatabaseOperation
def profile_deleteToken(self, token, deleteToken):
_username = self.tokenOper_get_username(token)
# delete
self.cursor.execute('DELETE FROM token WHERE [user] = ? AND [token] = ?;', (
_username,
deleteToken
))
if self.cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.')
return True