1
0

refactor: refactor for modern layout

- split frontend and backend.
- update backend with modern Python dev strategies.
This commit is contained in:
2026-04-28 13:17:54 +08:00
parent 8e72e75a15
commit a0e3385670
65 changed files with 479 additions and 139 deletions

19
backend/.gitignore vendored Normal file
View File

@@ -0,0 +1,19 @@
## ======== Personal ========
# Database file
*.db
# Ignore setting file
coconut-leaf.toml
## ======== Python ========
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv

1
backend/.python-version Normal file
View File

@@ -0,0 +1 @@
3.11

0
backend/README.md Normal file
View File

76
backend/coconut-leaf.py Normal file
View File

@@ -0,0 +1,76 @@
import sys
import logging
from argparse import ArgumentParser
from typing import cast
from pathlib import Path
import server
import config
import utils
import database
def GetUsernamePassword():
print("What is the first username of this calendar system?")
cache = input()
while not utils.IsValidUsername(cache):
print("Sorry, invalid data. Please try again.")
cache = input()
username = cache
print("Input this user password:")
cache = input()
while not utils.IsValidPassword(cache):
print("Sorry, invalid data. Please try again.")
cache = input()
password = cache
return (username, password)
if __name__ == "__main__":
print("Coconut-leaf")
print("A self-host, multi-account calendar system.")
print("Project: https://github.com/yyc12345/coconut-leaf")
print("===================")
# Receive arguments
parser = ArgumentParser(description="Coconut-leaf")
parser.add_argument(
"-c",
"--config",
required=True,
type=Path,
action="store",
metavar="CONFIG_TOML",
dest="config",
help="The configuration file for coconut-leaf",
)
parser.add_argument(
"-i",
"--init",
action="store_true",
dest="init",
help="Set for initialize the calendar system",
)
args = parser.parse_args()
# Load config file
try:
config.setup_config(cast(Path, args.config))
except Exception as e:
print(f"Error loading config file: {e}")
sys.exit(1)
# Setup logging level
logging_level = logging.DEBUG if config.get_config().others.debug else logging.INFO
logging.basicConfig(format='[%(levelname)s] %(message)s', level=logging_level)
# Initialize the calendar system if needed
if cast(bool, args.init):
gotten_data = GetUsernamePassword()
calendar = database.CalendarDatabase()
calendar.init(*gotten_data)
calendar.close()
logging.info("Staring server...")
server.run()

View File

@@ -0,0 +1,22 @@
[database]
driver = "sqlite"
[database.config]
path = "coconut-leaf.db"
# [database]
# driver = "mysql"
#
# [database.config]
# host = "localhost"
# port = 3306
# user = "root"
# password = "password"
# database = "coconut_leaf"
[web]
port = 8888
[others]
auto-token-clean-duration = 86400
debug = true

View File

@@ -0,0 +1,14 @@
{
"database-type": "sqlite",
"database-config": {
"user": "",
"password": "",
"db": "",
"url": "",
"port": 3306
},
"web": {
"port": 8888
},
"debug": true
}

130
backend/config.py Normal file
View File

@@ -0,0 +1,130 @@
import tomllib
from dataclasses import dataclass
from enum import StrEnum
from typing import Optional
from pathlib import Path
class DatabaseDriver(StrEnum):
SQLITE = "sqlite"
MYSQL = "mysql"
@staticmethod
def from_raw(raw: dict):
return DatabaseDriver(raw["driver"])
@dataclass(frozen=True)
class SqliteDatabaseConfig:
path: str
"""Database path"""
@staticmethod
def from_raw(raw: dict):
return SqliteDatabaseConfig(raw["path"])
@dataclass(frozen=True)
class MysqlDatabaseConfig:
host: str
"""Database host"""
port: int
"""Database port"""
user: str
"""Database user"""
password: str
"""Database password"""
database: str
"""Database name"""
@staticmethod
def from_raw(raw: dict):
return MysqlDatabaseConfig(
raw["host"], raw["port"], raw["user"], raw["password"], raw["database"]
)
@dataclass(frozen=True)
class DatabaseConfig:
driver: DatabaseDriver
"""Database driver"""
config: SqliteDatabaseConfig | MysqlDatabaseConfig
"""Database config"""
@staticmethod
def from_raw(raw: dict):
if raw["driver"] == DatabaseDriver.SQLITE:
return DatabaseConfig(
DatabaseDriver.SQLITE, SqliteDatabaseConfig.from_raw(raw["config"])
)
elif raw["driver"] == DatabaseDriver.MYSQL:
return DatabaseConfig(
DatabaseDriver.MYSQL, MysqlDatabaseConfig.from_raw(raw["config"])
)
else:
raise ValueError("Invalid database driver")
@dataclass(frozen=True)
class WebConfig:
port: int
"""Web server port"""
@staticmethod
def from_raw(raw: dict):
return WebConfig(raw["port"])
@dataclass(frozen=True)
class OthersConfig:
debug: bool
"""Whether enable debug mode"""
auto_token_clean_duration: int
"""Auto token clean duration"""
@staticmethod
def from_raw(raw: dict):
return OthersConfig(raw["debug"], raw["auto-token-clean-duration"])
@dataclass(frozen=True)
class Config:
database: DatabaseConfig
web: WebConfig
others: OthersConfig
@staticmethod
def from_raw(raw: dict):
return Config(
database=DatabaseConfig.from_raw(raw["database"]),
web=WebConfig.from_raw(raw["web"]),
others=OthersConfig.from_raw(raw["others"]),
)
_CONFIG: Optional[Config] = None
def setup_config(p: Path) -> None:
"""
Setup config by given path.
Raise exception if config file is invalid.
"""
with open(p, "rb") as f:
raw = tomllib.load(f)
global _CONFIG
_CONFIG = Config.from_raw(raw)
def get_config() -> Config:
"""
Get config instance.
Raises RuntimeError if config is not loaded.
"""
if _CONFIG is None:
raise RuntimeError("Config is not loaded. Call setup_config() first.")
else:
return _CONFIG

610
backend/database.py Normal file
View File

@@ -0,0 +1,610 @@
import config
import sqlite3
import utils
import threading
import logging
import dt
from typing import cast
from pathlib import Path
def SafeDatabaseOperation(func):
def wrapper(self: 'CalendarDatabase', *args, **kwargs):
cfg = config.get_config()
with self.mutex:
# check database and acquire cursor
try:
self.check_database()
self.cursor = self.db.cursor()
except Exception as e:
self.cursor = None
if cfg.others.debug:
logging.exception(e)
return (False, str(e), None)
# do real data work
try:
currentTime = utils.GetCurrentTimestamp()
if currentTime - self.latestClean > cfg.others.auto_token_clean_duration:
self.latestClean = currentTime
logging.info('Cleaning outdated token...')
self.tokenOper_clean()
result = (True, '', func(self, *args, **kwargs))
self.cursor.close()
self.cursor = None
self.db.commit()
return result
except Exception as e:
self.cursor.close()
self.cursor = None
self.db.rollback()
if cfg.others.debug:
logging.exception(e)
return (False, str(e), None)
return wrapper
class CalendarDatabase:
db: sqlite3.Connection
cursor: sqlite3.Cursor
mutex: threading.Lock
latestClean: int
def __init__(self):
self.db = None
self.cursor = None
self.mutex = threading.Lock()
self.latestClean = 0
def open(self):
if (self.is_database_valid()):
raise Exception('Databade is opened')
cfg = config.get_config()
match cfg.database.driver:
case config.DatabaseDriver.SQLITE:
self.db = sqlite3.connect(cast(config.SqliteDatabaseConfig, cfg.database.config).path, check_same_thread = False)
self.db.execute('PRAGMA encoding = "UTF-8";')
self.db.execute('PRAGMA foreign_keys = ON;')
case config.DatabaseDriver.MYSQL:
raise Exception('Not implemented database')
case _:
raise Exception('Unknow database type')
def init(self, username, password):
if (self.is_database_valid()):
raise Exception('Database is opened')
# establish tables
cfg = config.get_config()
backend_path = Path(__file__).resolve().parent
backend_sql_path = backend_path / 'sql'
match cfg.database.driver:
case config.DatabaseDriver.SQLITE:
sql_file = backend_sql_path / 'sqlite.sql'
case config.DatabaseDriver.MYSQL:
raise Exception('Not implemented database')
case _:
raise Exception('Unknow database type')
self.open()
cursor = self.db.cursor()
with open(sql_file, 'r', encoding='utf-8') as fsql:
cursor.executescript(fsql.read())
# finish init
cursor.execute('INSERT INTO user VALUES (?, ?, ?, ?);', (
username,
utils.ComputePasswordHash(password),
1,
utils.GenerateSalt()
))
cursor.close()
self.db.commit()
def close(self):
self.check_database()
self.db.close()
self.db = None
def check_database(self):
if (not self.is_database_valid()):
raise Exception('Databade is None')
def is_database_valid(self):
return not (self.db == None)
# ======================= token related internal operation
def tokenOper_clean(self):
# remove outdated token
self.cursor.execute('DELETE FROM token WHERE [ccn_tokenExpireOn] <= ?',(utils.GetCurrentTimestamp(), ))
def tokenOper_postpone_expireOn(self, token):
self.cursor.execute('UPDATE token SET [ccn_tokenExpireOn] = ? WHERE [ccn_token] = ?;', (
utils.GetTokenExpireOn(),
token
))
def tokenOper_check_valid(self, token):
self.tokenOper_get_username(token)
def tokenOper_is_admin(self, username):
self.cursor.execute('SELECT [ccn_isAdmin] FROM user WHERE [ccn_name] = ?;',(username, ))
cache = self.cursor.fetchone()[0]
return cache == 1
def tokenOper_get_username(self, token):
self.cursor.execute('SELECT [ccn_user] FROM token WHERE [ccn_token] = ? AND [ccn_tokenExpireOn] > ?;',(
token,
utils.GetCurrentTimestamp()
))
result = self.cursor.fetchone()[0]
# need postpone expire on time
self.tokenOper_postpone_expireOn(token)
return result
# =============================== # =============================== operation function
# =============================== common
@SafeDatabaseOperation
def common_salt(self, username):
salt = utils.GenerateSalt()
self.cursor.execute('UPDATE user SET [ccn_salt] = ? WHERE [ccn_name] = ?;', (
salt,
username
))
return salt
@SafeDatabaseOperation
def common_login(self, username, password, clientUa, clientIp):
self.cursor.execute('SELECT [ccn_password], [ccn_salt] FROM user WHERE [ccn_name] = ?;', (username, ))
(gotten_salt, gotten_password) = self.cursor.fetchone()
if password == utils.ComputePasswordHashWithSalt(gotten_password, gotten_salt):
token = utils.GenerateToken(username)
self.cursor.execute('UPDATE user SET [ccn_salt] = ? WHERE [ccn_name] = ?;', (
utils.GenerateSalt(), # regenerate a new slat to prevent re-login try
username
))
self.cursor.execute('INSERT INTO token VALUES (?, ?, ?, ?, ?);', (
username,
token,
utils.GetTokenExpireOn(), # add 2 day from now
clientUa,
clientIp,
))
return token
else:
# throw a exception to indicate fail to login
raise Exception('Login authentication failed')
@SafeDatabaseOperation
def common_webLogin(self, username, password, clientUa, clientIp):
self.cursor.execute('SELECT [ccn_name] FROM user WHERE [ccn_name] = ? AND [ccn_password] = ?;', (username, utils.ComputePasswordHash(password)))
if len(self.cursor.fetchall()) != 0:
token = utils.GenerateToken(username)
self.cursor.execute('INSERT INTO token VALUES (?, ?, ?, ?, ?);', (
username,
token,
utils.GetTokenExpireOn(), # add 2 day from now
clientUa,
clientIp,
))
return token
else:
# throw a exception to indicate fail to login
raise Exception('Login authentication failed')
@SafeDatabaseOperation
def common_logout(self, token):
self.tokenOper_check_valid(token)
self.cursor.execute('DELETE FROM token WHERE [ccn_token] = ?;', (token, ))
return True
@SafeDatabaseOperation
def common_tokenValid(self, token):
self.tokenOper_check_valid(token)
return True
# =============================== calendar
@SafeDatabaseOperation
def calendar_getFull(self, token, startDateTime, endDateTime):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT calendar.* FROM calendar INNER JOIN collection \
ON collection.ccn_uuid = calendar.ccn_belongTo \
WHERE (collection.ccn_user = ? AND calendar.ccn_loopDateTimeEnd >= ? AND calendar.ccn_loopDateTimeStart - (calendar.ccn_eventDateTimeEnd - calendar.ccn_eventDateTimeStart) <= ?);',
(username, startDateTime, endDateTime))
return self.cursor.fetchall()
@SafeDatabaseOperation
def calendar_getList(self, token, startDateTime, endDateTime):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT calendar.ccn_uuid FROM calendar INNER JOIN collection \
ON collection.ccn_uuid = calendar.ccn_belongTo \
WHERE (collection.ccn_user = ? AND calendar.ccn_loopDateTimeEnd >= ? AND calendar.ccn_loopDateTimeStart - (calendar.ccn_eventDateTimeEnd - calendar.ccn_eventDateTimeStart) <= ?);',
(username, startDateTime, endDateTime))
return tuple(map(lambda x: x[0], self.cursor.fetchall()))
@SafeDatabaseOperation
def calendar_getDetail(self, token, uuid):
self.tokenOper_check_valid(token)
self.cursor.execute('SELECT * FROM calendar WHERE [ccn_uuid] = ?;', (uuid, ))
return self.cursor.fetchone()
@SafeDatabaseOperation
def calendar_update(self, token, uuid, lastChange, **optArgs):
self.tokenOper_check_valid(token)
# get prev data
self.cursor.execute('SELECT * FROM calendar WHERE [ccn_uuid] = ? AND [ccn_lastChange] = ?;', (uuid, lastChange))
analyseData = list(self.cursor.fetchone())
# construct update data
lastupdate = utils.GenerateUUID()
sqlList = [
'[ccn_lastChange] = ?',
]
argumentsList = [
lastupdate,
]
# analyse opt arg
reAnalyseLoop = False
cache = optArgs.get('belongTo', None)
if cache is not None:
sqlList.append('[ccn_belongTo] = ?')
argumentsList.append(cache)
cache = optArgs.get('title', None)
if cache is not None:
sqlList.append('[ccn_title] = ?')
argumentsList.append(cache)
cache = optArgs.get('description', None)
if cache is not None:
sqlList.append('[ccn_description] = ?')
argumentsList.append(cache)
cache = optArgs.get('eventDateTimeStart', None)
if cache is not None:
sqlList.append('[ccn_eventDateTimeStart] = ?')
argumentsList.append(cache)
reAnalyseLoop = True
analyseData[5] = cache
cache = optArgs.get('eventDateTimeEnd', None)
if cache is not None:
sqlList.append('[ccn_eventDateTimeEnd] = ?')
argumentsList.append(cache)
cache = optArgs.get('loopRules', None)
if cache is not None:
sqlList.append('[ccn_loopRules] = ?')
argumentsList.append(cache)
reAnalyseLoop = True
analyseData[8] = cache
cache = optArgs.get('timezoneOffset', None)
if cache is not None:
sqlList.append('[ccn_timezoneOffset] = ?')
argumentsList.append(cache)
reAnalyseLoop = True
analyseData[7] = cache
if reAnalyseLoop:
# re-compute loop data and upload it into list
sqlList.append('[ccn_loopDateTimeStart] = ?')
argumentsList.append(analyseData[5])
sqlList.append('[ccn_loopDateTimeEnd] = ?')
argumentsList.append(str(dt.ResolveLoopStr(
analyseData[8],
analyseData[5],
analyseData[7]
)))
# execute
argumentsList.append(uuid)
self.cursor.execute('UPDATE calendar SET {} WHERE [ccn_uuid] = ?;'.format(', '.join(sqlList)),
tuple(argumentsList))
if self.cursor.rowcount != 1:
raise Exception('Fail to update due to no matched rows or too much rows.')
return lastupdate
@SafeDatabaseOperation
def calendar_add(self, token, belongTo, title, description, eventDateTimeStart, eventDateTimeEnd, loopRules, timezoneOffset):
self.tokenOper_check_valid(token)
newuuid = utils.GenerateUUID()
lastupdate = utils.GenerateUUID()
# analyse loopRules and output following 2 fileds.
loopDateTimeStart = eventDateTimeStart
loopDateTimeEnd = dt.ResolveLoopStr(loopRules, eventDateTimeStart, timezoneOffset)
self.cursor.execute('INSERT INTO calendar VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);',
(newuuid,
belongTo,
title,
description,
lastupdate,
eventDateTimeStart,
eventDateTimeEnd,
timezoneOffset,
loopRules,
loopDateTimeStart,
loopDateTimeEnd))
return newuuid
@SafeDatabaseOperation
def calendar_delete(self, token, uuid, lastChange):
self.tokenOper_check_valid(token)
self.cursor.execute('DELETE FROM calendar WHERE [ccn_uuid] = ? AND [ccn_lastChange] = ?;', (uuid, lastChange))
if self.cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.')
return True
# =============================== collection
@SafeDatabaseOperation
def collection_getFullOwn(self, token):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT [ccn_uuid], [ccn_name], [ccn_lastChange] FROM collection WHERE [ccn_user] = ?;', (username, ))
return self.cursor.fetchall()
@SafeDatabaseOperation
def collection_getListOwn(self, token):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT [ccn_uuid] FROM collection WHERE [ccn_user] = ?;', (username, ))
return tuple(map(lambda x: x[0], self.cursor.fetchall()))
@SafeDatabaseOperation
def collection_getDetailOwn(self, token, uuid):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT [ccn_uuid], [ccn_name], [ccn_lastChange] FROM collection WHERE [ccn_user] = ? AND [ccn_uuid] = ?;', (username, uuid))
return self.cursor.fetchone()
@SafeDatabaseOperation
def collection_addOwn(self, token, newname):
username = self.tokenOper_get_username(token)
newuuid = utils.GenerateUUID()
lastupdate = utils.GenerateUUID()
self.cursor.execute('INSERT INTO collection VALUES (?, ?, ?, ?);',
(newuuid, newname, username, lastupdate))
return newuuid
@SafeDatabaseOperation
def collection_updateOwn(self, token, uuid, newname, lastChange):
self.tokenOper_check_valid(token)
lastupdate = utils.GenerateUUID()
self.cursor.execute('UPDATE collection SET [ccn_name] = ?, [ccn_lastChange] = ? WHERE [ccn_uuid] = ? AND [ccn_lastChange] = ?;', (
newname,
lastupdate,
uuid,
lastChange
))
if self.cursor.rowcount != 1:
raise Exception('Fail to update due to no matched rows or too much rows.')
return lastupdate
@SafeDatabaseOperation
def collection_deleteOwn(self, token, uuid, lastChange):
self.tokenOper_check_valid(token)
self.cursor.execute('DELETE FROM collection WHERE [ccn_uuid] = ? AND [ccn_lastChange] = ?;', (
uuid,
lastChange
))
if self.cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.')
return True
@SafeDatabaseOperation
def collection_getSharing(self, token, uuid):
self.tokenOper_check_valid(token)
self.cursor.execute('SELECT [ccn_target] FROM share WHERE [ccn_uuid] = ?;', (uuid, ))
return tuple(map(lambda x: x[0], self.cursor.fetchall()))
@SafeDatabaseOperation
def collection_deleteSharing(self, token, uuid, target, lastChange):
self.tokenOper_check_valid(token)
lastupdate = utils.GenerateUUID()
self.cursor.execute('UPDATE collection SET [ccn_lastChange] = ?, WHERE [ccn_uuid] = ? AND [ccn_lastChange] = ?;', (lastupdate, uuid, lastChange))
if self.cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.')
self.cursor.execute('DELETE FROM share WHERE [ccn_uuid] = ? AND [ccn_target] = ?;', (uuid, target))
if self.cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.')
return lastupdate
@SafeDatabaseOperation
def collection_addSharing(self, token, uuid, target, lastChange):
self.tokenOper_check_valid(token)
lastupdate = utils.GenerateUUID()
self.cursor.execute('UPDATE collection SET [ccn_lastChange] = ? WHERE [ccn_uuid] = ? AND [ccn_lastChange] = ?;', (lastupdate, uuid, lastChange))
if self.cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.')
self.cursor.execute('SELECT * FROM share WHERE [ccn_uuid] = ? AND [ccn_target] = ?;', (uuid, target))
if len(self.cursor.fetchall()) != 0:
raise Exception('Fail to insert duplicated item.')
self.cursor.execute('INSERT INTO share VALUES (?, ?);', (uuid, target))
return lastupdate
@SafeDatabaseOperation
def collection_getShared(self, token):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT collection.ccn_uuid, collection.ccn_name, collection.ccn_user \
FROM share INNER JOIN collection \
ON share.ccn_uuid = collection.ccn_uuid \
WHERE share.ccn_target = ?;', (username, ))
return self.cursor.fetchall()
# =============================== todo
@SafeDatabaseOperation
def todo_getFull(self, token):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT * FROM todo WHERE [ccn_belongTo] = ?;', (username, ))
return self.cursor.fetchall()
@SafeDatabaseOperation
def todo_getList(self, token):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT [ccn_uuid] FROM todo WHERE [ccn_belongTo] = ?;', (username, ))
return tuple(map(lambda x: x[0], self.cursor.fetchall()))
@SafeDatabaseOperation
def todo_getDetail(self, token, uuid):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT * FROM todo WHERE [ccn_belongTo] = ? AND [ccn_uuid] = ?;', (username, uuid))
return self.cursor.fetchone()
@SafeDatabaseOperation
def todo_add(self, token):
username = self.tokenOper_get_username(token)
newuuid = utils.GenerateUUID()
lastupdate = utils.GenerateUUID()
returnedData = (
newuuid,
username,
'',
lastupdate,
)
self.cursor.execute('INSERT INTO todo VALUES (?, ?, ?, ?);', returnedData)
return returnedData
@SafeDatabaseOperation
def todo_update(self, token, uuid, data, lastChange):
# check valid token
self.tokenOper_check_valid(token)
# update
newLastChange = utils.GenerateUUID()
self.cursor.execute('UPDATE todo SET [ccn_data] = ?, [ccn_lastChange] = ? WHERE [ccn_uuid] = ? AND [ccn_lastChange] = ?;', (
data,
newLastChange,
uuid,
lastChange
))
if self.cursor.rowcount != 1:
raise Exception('Fail to update due to no matched rows or too much rows.')
return newLastChange
@SafeDatabaseOperation
def todo_delete(self, token, uuid, lastChange):
# check valid token
self.tokenOper_check_valid(token)
# delete
self.cursor.execute('DELETE FROM todo WHERE [ccn_uuid] = ? AND [ccn_lastChange] = ?;', (uuid, lastChange))
if self.cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.')
return True
# =============================== admin
@SafeDatabaseOperation
def admin_get(self, token):
username = self.tokenOper_get_username(token)
if not self.tokenOper_is_admin(username):
raise Exception('Permission denied.')
self.cursor.execute('SELECT [ccn_name], [ccn_isAdmin] FROM user;')
return tuple(map(lambda x: (x[0], x[1] == 1), self.cursor.fetchall()))
@SafeDatabaseOperation
def admin_add(self, token, newname):
username = self.tokenOper_get_username(token)
if not self.tokenOper_is_admin(username):
raise Exception('Permission denied.')
newpassword = utils.ComputePasswordHash(utils.GenerateUUID())
self.cursor.execute('INSERT INTO user VALUES (?, ?, ?, ?);', (
newname,
newpassword,
0,
utils.GenerateSalt()
))
return (newname, False)
@SafeDatabaseOperation
def admin_update(self, token, _username, **optArgs):
username = self.tokenOper_get_username(token)
if not self.tokenOper_is_admin(username):
raise Exception('Permission denied.')
# construct data
sqlList = []
argumentsList = []
# analyse opt arg
cache = optArgs.get('password', None)
if cache is not None:
sqlList.append('[ccn_password] = ?')
argumentsList.append(utils.ComputePasswordHash(cache))
cache = optArgs.get('isAdmin', None)
if cache is not None:
sqlList.append('[ccn_isAdmin] = ?')
argumentsList.append(1 if cache else 0)
# execute
argumentsList.append(_username)
self.cursor.execute('UPDATE user SET {} WHERE [ccn_name] = ?;'.format(', '.join(sqlList)),
tuple(argumentsList))
logging.debug(cache)
logging.debug(tuple(argumentsList))
if self.cursor.rowcount != 1:
raise Exception('Fail to update due to no matched rows or too much rows.')
return True
@SafeDatabaseOperation
def admin_delete(self, token, username):
_username = self.tokenOper_get_username(token)
if not self.tokenOper_is_admin(_username):
raise Exception('Permission denied.')
# delete
self.cursor.execute('DELETE FROM user WHERE [ccn_name] = ?;', (username, ))
if self.cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.')
return True
# =============================== profile
@SafeDatabaseOperation
def profile_isAdmin(self, token):
username = self.tokenOper_get_username(token)
return self.tokenOper_is_admin(username)
@SafeDatabaseOperation
def profile_changePassword(self, token, newpassword):
username = self.tokenOper_get_username(token)
self.cursor.execute('UPDATE user SET [ccn_password] = ? WHERE [ccn_name] = ?;', (
utils.ComputePasswordHash(newpassword),
username
))
return True
@SafeDatabaseOperation
def profile_getToken(self, token):
username = self.tokenOper_get_username(token)
self.cursor.execute('SELECT * FROM token WHERE [ccn_user] = ?;', (
username,
))
return self.cursor.fetchall()
@SafeDatabaseOperation
def profile_deleteToken(self, token, deleteToken):
_username = self.tokenOper_get_username(token)
# delete
self.cursor.execute('DELETE FROM token WHERE [ccn_user] = ? AND [ccn_token] = ?;', (
_username,
deleteToken
))
if self.cursor.rowcount != 1:
raise Exception('Fail to delete due to no matched rows or too much rows.')
return True

289
backend/dt.py Normal file
View File

@@ -0,0 +1,289 @@
import datetime
import re
import logging
import typing
from functools import reduce
import utils
MonthDayCount = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
MIN_DATETIME = datetime.datetime(1950, 1, 1, 0, 0, 0, 0, tzinfo=datetime.timezone.utc)
MAX_DATETIME = datetime.datetime(2200, 1, 1, 0, 0, 0, 0, tzinfo=datetime.timezone.utc)
MIN_TIMESTAMP = int(MIN_DATETIME.timestamp() / 60)
MAX_TIMESTAMP = int(MAX_DATETIME.timestamp() / 60)
DAY1_SPAN = 60 * 24
DAY7_SPAN = 7 * DAY1_SPAN
LoopHandle = typing.Callable[[re.Match, int, int, int], int]
def ResolveLoopStr(strl: str, starttime: int, tzoffset: int) -> int:
# check no loop
if strl == '':
return starttime
# try compute from loopStop
(loopRules, loopStopRules) = strl.split('-')
cache = precompiledLoopStopRules['infinity'].search(loopStopRules)
if cache is not None:
return MAX_TIMESTAMP
cache = precompiledLoopStopRules['datetime'].search(loopStopRules)
if cache is not None:
return int(cache.group(1)) # group 1 is datetime
cache = precompiledLoopStopRules['times'].search(loopStopRules)
if cache is not None:
loopTimes = int(cache.group(1)) # for follwing calc
else:
raise Exception('Invalid loopStopRules') # invalid rules
for rules in precompiledLoopRules:
cache = rules[0].search(loopRules)
if cache is not None:
return rules[1](cache, starttime, loopTimes, tzoffset)
else:
raise Exception('Invalid loopRules')
def LoopHandle_Year(searchResult: re.Match, starttime: int, times: int, tzoffset: int) -> int:
clientDate = datetime.datetime.fromtimestamp(starttime * 60, UTCTimezone(tzoffset))
isStrict = searchResult.group(1) == 'S'
yearSpan = int(searchResult.group(2))
times -= 1
newYear = clientYear = clientDate.year
newMonth = clientMonth = clientDate.month
newDay = clientDay = clientDate.day
if clientMonth == 2 and clientDay == 29:
if isStrict:
realSpan = utils.LCM(yearSpan, 4)
logging.debug(realSpan)
valCache = starttime
while valCache < MAX_TIMESTAMP and times > 0:
newYear += realSpan
if not IsLeapYear(newYear):
continue
valCache = starttime + DAY1_SPAN * (DaysCount(newYear, newMonth, newDay) - DaysCount(clientYear, clientMonth, clientDay))
times -= 1
else:
newYear += times * yearSpan
if not IsLeapYear(newYear):
newDay = 28 # migrate to 28
else:
# if times == 1, no extra datetime need to be added
newYear += times * yearSpan
val = starttime + DAY1_SPAN * (DaysCount(newYear, newMonth, newDay) - DaysCount(clientYear, clientMonth, clientDay))
return val if val < MAX_TIMESTAMP else MAX_TIMESTAMP
def LoopHandle_Month(searchResult: re.Match, starttime: int, times: int, tzoffset: int) -> int:
isStrict = searchResult.group(1) == 'S'
loopType = searchResult.group(2)
monthSpan = int(searchResult.group(3))
# we should get original data in each method
times -= 1
clientDate = datetime.datetime.fromtimestamp(starttime * 60, UTCTimezone(tzoffset))
newYear = clientYear = clientDate.year
newMonth = clientMonth = clientDate.month
newDay = clientDay = clientDate.day
# data struct
# dayStatistics =
# (dayForwards || dayBackwards || weeksForward, dayOfWeek || weeksBackwards, dayOfWeek)
# ( A || B || C || D )
dayStatistics = GetDayInMonth(clientYear, clientMonth, clientDay)
if isStrict:
if loopType == 'A':
while times > 0:
newMonth += monthSpan
if newMonth > 12:
newYear += int((newMonth - 1) / 12)
newMonth = ((newMonth - 1) % 12) + 1
if newYear > MAX_DATETIME.year:
break
maxDays = MonthDayCount[newMonth - 1] + (1 if newMonth == 2 and IsLeapYear(newYear) else 0)
if dayStatistics[0] <= maxDays:
times -= 1
elif loopType == 'B':
while times > 0:
newMonth += monthSpan
if newMonth > 12:
newYear += int((newMonth - 1) / 12)
newMonth = ((newMonth - 1) % 12) + 1
if newYear > MAX_DATETIME.year:
break
maxDays = MonthDayCount[newMonth - 1] + (1 if newMonth == 2 and IsLeapYear(newYear) else 0)
if dayStatistics[1] <= maxDays:
times -= 1
elif loopType == 'C':
while times > 0:
newMonth += monthSpan
if newMonth > 12:
newYear += int((newMonth - 1) / 12)
newMonth = ((newMonth - 1) % 12) + 1
if newYear > MAX_DATETIME.year:
break
monthStatistics = GetMonthWeekStatistics(newYear, newMonth)
if dayStatistics[2] <= monthStatistics[dayStatistics[3]]:
times -= 1
elif loopType == 'D':
while times > 0:
newMonth += monthSpan
if newMonth > 12:
newYear += int((newMonth - 1) / 12)
newMonth = ((newMonth - 1) % 12) + 1
if newYear > MAX_DATETIME.year:
break
monthStatistics = GetMonthWeekStatistics(newYear, newMonth)
if dayStatistics[4] <= monthStatistics[dayStatistics[5]]:
times -= 1
else:
newMonth += times * monthSpan
newYear += int((newMonth - 1) / 12)
newMonth = ((newMonth - 1) % 12) + 1
# all method need calc newDay and it should be the last day of current selected month
# so calc it in there
newDay = MonthDayCount[newMonth - 1] + (1 if newMonth == 2 and IsLeapYear(newYear) else 0)
val = starttime + DAY1_SPAN * (DaysCount(newYear, newMonth, newDay) - DaysCount(clientYear, clientMonth, clientDay))
return val if val < MAX_TIMESTAMP else MAX_TIMESTAMP
def LoopHandle_Week(searchResult: re.Match, starttime: int, times: int, tzoffset: int) -> int:
weekOccupied = tuple(map(lambda x: x == 'T', searchResult.group(1)))
weekEventCount = reduce(lambda x, y: x + (1 if y else 0), weekOccupied, 0)
if weekEventCount == 0:
raise Exception('Invalid week format')
weekSpan = int(searchResult.group(2))
nowDayOfWeek = datetime.datetime.fromtimestamp(starttime * 60, UTCTimezone(tzoffset)).weekday()
if not weekOccupied[nowDayOfWeek]:
times-=1 # if first event is not suit for week loop rules, minus one more event to suit it.
fullWeek = int(times / weekEventCount)
remainEvent = times % weekEventCount
val = starttime + DAY7_SPAN * fullWeek * weekSpan
if val > MAX_TIMESTAMP:
return MAX_TIMESTAMP # return now, to reduce calc usage
while remainEvent != 0:
val += DAY1_SPAN
if weekOccupied[nowDayOfWeek % 7]:
remainEvent -= 1
nowDayOfWeek += 1
val -= 1
return val if val < MAX_TIMESTAMP else MAX_TIMESTAMP
def LoopHandle_Day(searchResult: re.Match, starttime: int, times: int, tzoffset: int) -> int:
val = starttime + DAY1_SPAN * times * int(searchResult.group(1))
val -= 1
return val if val < MAX_TIMESTAMP else MAX_TIMESTAMP
precompiledLoopRules: tuple[tuple[re.Pattern, LoopHandle], ...] = (
(re.compile(r'^Y([SR]{1})([1-9]\d*)$'), LoopHandle_Year),
(re.compile(r'^M([SR]{1})([ABCD]{1})([1-9]\d*)$'), LoopHandle_Month),
(re.compile(r'^W([TF]{7})([1-9]\d*)$'), LoopHandle_Week),
(re.compile(r'^D([1-9]\d*)$'), LoopHandle_Day)
)
precompiledLoopStopRules: dict[str, re.Pattern] = {
'infinity': re.compile(r'^F$'),
'datetime': re.compile(r'^D([1-9]\d*|0)$'),
'times': re.compile(r'^T([1-9]\d*)$')
}
def LeapYearCountEx(endYear: int, includeThis: bool = False, baseYear: int = 1, includeBase: bool = True):
if not includeThis:
endYear -= 1
if includeBase:
baseYear -= 1
endly = int(endYear / 4)
endly -= int(endYear / 100)
endly += int(endYear / 400)
basely = int(baseYear / 4)
basely -= int(baseYear / 100)
basely += int(baseYear / 400)
return (endly - basely)
def LeapYearCount(year: int):
return LeapYearCountEx(year, False, 1, True)
def IsLeapYear(year: int):
isLeap = False
if year % 4 == 0:
isLeap = True
if year % 100 == 0:
isLeap = False
if year % 400 == 0:
isLeap = True
return isLeap
def DaysCount(year: int, month: int, day: int):
ly = LeapYearCountEx(year, False, 1, True)
days = 365 * (year - 1)
days += ly
for index in range(1, month, 1):
days += MonthDayCount[index - 1]
if (month > 2) and IsLeapYear(year):
days += 1
days += day - 1
return days
def DayOfWeek(year: int, month: int, day: int):
# as we know, 1/1/1900 is Monday.
# via this method, we can got 1/1/1 is Monday
# compute day span
days=DaysCount(year, month, day)
# return day of week (from 0 - 6, corresponding with python)
return days % 7
def GetDayInMonth(year: int, month: int, day: int):
days = MonthDayCount[month - 1] + (1 if (month == 2 and IsLeapYear(year)) else 0)
firstDayOfWeek = DayOfWeek(year, month, 1)
dayOfWeek = (firstDayOfWeek + day - 1) % 7
dayForwards = day
dayBackwards = days - day + 1
weeksForward = int((dayForwards - 1) / 7) + 1
weeksBackwards = int((dayBackwards - 1) / 7) + 1
return (dayForwards, dayBackwards, weeksForward, dayOfWeek, weeksBackwards, dayOfWeek)
def GetMonthWeekStatistics(year: int, month: int):
days = MonthDayCount[month - 1] + (1 if (month == 2 and IsLeapYear(year)) else 0)
firstDayOfWeek = DayOfWeek(year, month, 1)
lastDayOfWeek = (firstDayOfWeek + days - 1) % 7
result = [4, 4, 4, 4, 4, 4, 4]
remain = days % 7
week = firstDayOfWeek
while remain > 0:
result[week % 7] += 1
week += 1
remain -= 1
return tuple(result)
class UTCTimezone(datetime.tzinfo):
__offset: int
def __init__(self, offset: int = 0):
self.__offset = offset
def utcoffset(self, dt):
return datetime.timedelta(minutes=self.__offset)
def tzname(self, dt):
return 'UTC {}'.format(self.__offset)
def dst(self, dt):
return datetime.timedelta(0)

14
backend/pyproject.toml Normal file
View File

@@ -0,0 +1,14 @@
[project]
name = "coleaf-backend"
version = "1.1.0"
description = "The backend of coconut-leaf."
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"flask==2.2.3",
]
[tool.uv]
constraint-dependencies = [
"Werkzeug==2.2.2",
"MarkupSafe==2.1.5"
]

457
backend/server.py Normal file
View File

@@ -0,0 +1,457 @@
from flask import Flask
# from flask import g
from flask import render_template
from flask import url_for
from flask import request
# from flask import abort
from flask import redirect
# from functools import reduce
# import json
# import os
import config
import database
import utils
from pathlib import Path
_FRONTEND_PATH = Path(__file__).resolve().parent.parent / "frontend"
app = Flask(
__name__,
static_folder=_FRONTEND_PATH / "static",
template_folder=_FRONTEND_PATH / "templates",
)
calendar_db = database.CalendarDatabase()
# render_static_resources = None
# =============================================database
# def get_database():
# db = getattr(g, '_database', None)
# if db is None:
# db = database.CalendarDatabase()
# db.open()
# return db
# @app.teardown_appcontext
# def close_database(exception):
# db = getattr(g, '_database', None)
# if db is not None:
# db.close()
# ============================================= static page route
@app.route('/', methods=['GET'])
def nospecHandle():
return redirect(url_for('web_homeHandle'))
@app.route('/web/home', methods=['GET'])
def web_homeHandle():
# UpdateStaticResources()
return render_template("home.html")
@app.route('/web/calendar', methods=['GET'])
def web_calendarHandle():
# UpdateStaticResources()
return render_template("calendar.html")
@app.route('/web/todo', methods=['GET'])
def web_todoHandle():
# UpdateStaticResources()
return render_template("todo.html")
@app.route('/web/admin', methods=['GET'])
def web_adminHandle():
# UpdateStaticResources()
return render_template("admin.html")
@app.route('/web/login', methods=['GET'])
def web_loginHandle():
# UpdateStaticResources()
return render_template("login.html")
@app.route('/web/collection', methods=['GET'])
def web_collectionHandle():
# UpdateStaticResources()
return render_template("collection.html")
@app.route('/web/eventAdd', methods=['GET'])
def web_eventAddHandle():
# UpdateStaticResources()
return render_template("event.html",
uuidPath=''
)
@app.route('/web/eventUpdate/<path:uuidPath>', methods=['GET'])
def web_eventUpdateHandle(uuidPath):
# UpdateStaticResources()
return render_template("event.html",
uuidPath = uuidPath
)
# ============================================= query page route
# ================================ common
@app.route('/api/common/salt', methods=['POST'])
def api_common_saltHandle():
return SmartDbCaller(calendar_db.common_salt,
(('username', str, False), ),
None)
@app.route('/api/common/login', methods=['POST'])
def api_common_loginHandle():
# construct client data first
clientUa = request.user_agent.string
if request.headers.getlist("X-Forwarded-For"):
clientIp = request.headers.getlist("X-Forwarded-For")[0]
else:
clientIp = request.remote_addr
return SmartDbCaller(calendar_db.common_login,
(('username', str, False),
('password', str, False),
('clientUa', str, False),
('clientIp', str, False)),
{
'clientUa': clientUa,
'clientIp': clientIp
})
@app.route('/api/common/webLogin', methods=['POST'])
def api_common_webLoginHandle():
# construct client data first
clientUa = request.user_agent.string
if request.headers.getlist("X-Forwarded-For"):
clientIp = request.headers.getlist("X-Forwarded-For")[0]
else:
clientIp = request.remote_addr
return SmartDbCaller(calendar_db.common_webLogin,
(('username', str, False),
('password', str, False),
('clientUa', str, False),
('clientIp', str, False)),
{
'clientUa': clientUa,
'clientIp': clientIp
})
@app.route('/api/common/logout', methods=['POST'])
def api_common_logoutHandle():
return SmartDbCaller(calendar_db.common_logout,
(('token', str, False), ),
None)
@app.route('/api/common/tokenValid', methods=['POST'])
def api_common_tokenValidHandle():
return SmartDbCaller(calendar_db.common_tokenValid,
(('token', str, False), ),
None)
# ================================ calendar
@app.route('/api/calendar/getFull', methods=['POST'])
def api_calendar_getFullHandle():
return SmartDbCaller(calendar_db.calendar_getFull,
(('token', str, False),
('startDateTime', int, False),
('endDateTime', int, False)),
None)
@app.route('/api/calendar/getList', methods=['POST'])
def api_calendar_getListHandle():
return SmartDbCaller(calendar_db.calendar_getList,
(('token', str, False),
('startDateTime', int, False),
('endDateTime', int, False)),
None)
@app.route('/api/calendar/getDetail', methods=['POST'])
def api_calendar_getDetailHandle():
return SmartDbCaller(calendar_db.calendar_getDetail,
(('token', str, False),
('uuid', str, False)),
None)
@app.route('/api/calendar/update', methods=['POST'])
def api_calendar_updateHandle():
return SmartDbCaller(calendar_db.calendar_update,
(('token', str, False),
('uuid', str, False),
('belongTo', str, True),
('title', str, True),
('description', str, True),
('eventDateTimeStart', int, True),
('eventDateTimeEnd', int, True),
('loopRules', str, True),
('timezoneOffset', int, True),
('lastChange', str, False)),
None)
@app.route('/api/calendar/add', methods=['POST'])
def api_calendar_addHandle():
return SmartDbCaller(calendar_db.calendar_add,
(('token', str, False),
('belongTo', str, False),
('title', str, False),
('description', str, False),
('eventDateTimeStart', int, False),
('eventDateTimeEnd', int, False),
('loopRules', str, False),
('timezoneOffset', int, False)),
None)
@app.route('/api/calendar/delete', methods=['POST'])
def api_calendar_deleteHandle():
return SmartDbCaller(calendar_db.calendar_delete,
(('token', str, False),
('uuid', str, False),
('lastChange', str, False)),
None)
# ================================ collection
@app.route('/api/collection/getFullOwn', methods=['POST'])
def api_collection_getFullOwnHandle():
return SmartDbCaller(calendar_db.collection_getFullOwn,
(('token', str, False), ),
None)
@app.route('/api/collection/getListOwn', methods=['POST'])
def api_collection_getListOwnHandle():
return SmartDbCaller(calendar_db.collection_getListOwn,
(('token', str, False), ),
None)
@app.route('/api/collection/getDetailOwn', methods=['POST'])
def api_collection_getDetailOwnHandle():
return SmartDbCaller(calendar_db.collection_getDetailOwn,
(('token', str, False),
('uuid', str, False)),
None)
@app.route('/api/collection/addOwn', methods=['POST'])
def api_collection_addOwnHandle():
return SmartDbCaller(calendar_db.collection_addOwn,
(('token', str, False),
('name', str, False)),
None)
@app.route('/api/collection/updateOwn', methods=['POST'])
def api_collection_updateOwnHandle():
return SmartDbCaller(calendar_db.collection_updateOwn,
(('token', str, False),
('uuid', str, False),
('name', str, False),
('lastChange', str, False)),
None)
@app.route('/api/collection/deleteOwn', methods=['POST'])
def api_collection_deleteOwnHandle():
return SmartDbCaller(calendar_db.collection_deleteOwn,
(('token', str, False),
('uuid', str, False),
('lastChange', str, False)),
None)
@app.route('/api/collection/getSharing', methods=['POST'])
def api_collection_getSharingHandle():
return SmartDbCaller(calendar_db.collection_getSharing,
(('token', str, False),
('uuid', str, False)),
None)
@app.route('/api/collection/deleteSharing', methods=['POST'])
def api_collection_deleteSharingHandle():
return SmartDbCaller(calendar_db.collection_deleteSharing,
(('token', str, False),
('uuid', str, False),
('target', str, False),
('lastChange', str, False)),
None)
@app.route('/api/collection/addSharing', methods=['POST'])
def api_collection_addSharingHandle():
return SmartDbCaller(calendar_db.collection_addSharing,
(('token', str, False),
('uuid', str, False),
('target', str, False),
('lastChange', str, False)),
None)
@app.route('/api/collection/getShared', methods=['POST'])
def api_collection_getSharedHandle():
return SmartDbCaller(calendar_db.collection_getShared,
(('token', str, False), ),
None)
# ================================ todo
@app.route('/api/todo/getFull', methods=['POST'])
def api_todo_getFullHandle():
return SmartDbCaller(calendar_db.todo_getFull,
(('token', str, False), ),
None)
@app.route('/api/todo/getList', methods=['POST'])
def api_todo_getListHandle():
return SmartDbCaller(calendar_db.todo_getList,
(('token', str, False), ),
None)
@app.route('/api/todo/getDetail', methods=['POST'])
def api_todo_getDetailHandle():
return SmartDbCaller(calendar_db.todo_getDetail,
(('token', str, False),
('uuid', str, False)),
None)
@app.route('/api/todo/add', methods=['POST'])
def api_todo_addHandle():
return SmartDbCaller(calendar_db.todo_add,
(('token', str, False), ),
None)
@app.route('/api/todo/update', methods=['POST'])
def api_todo_updateHandle():
return SmartDbCaller(calendar_db.todo_update,
(('token', str, False),
('uuid', str, False),
('data', str, False),
('lastChange', str, False)),
None)
@app.route('/api/todo/delete', methods=['POST'])
def api_todo_deleteHandle():
return SmartDbCaller(calendar_db.todo_delete,
(('token', str, False),
('uuid', str, False),
('lastChange', str, False)),
None)
# ================================ admin
@app.route('/api/admin/get', methods=['POST'])
def api_admin_getHandle():
return SmartDbCaller(calendar_db.admin_get,
(('token', str, False), ),
None)
@app.route('/api/admin/add', methods=['POST'])
def api_admin_addHandle():
return SmartDbCaller(calendar_db.admin_add,
(('token', str, False),
('username', str, False)),
None)
@app.route('/api/admin/update', methods=['POST'])
def api_admin_updateHandle():
return SmartDbCaller(calendar_db.admin_update,
(('token', str, False),
('username', str, False),
('password', str, True),
('isAdmin', utils.Str2Bool, True)),
None)
@app.route('/api/admin/delete', methods=['POST'])
def api_admin_deleteHandle():
return SmartDbCaller(calendar_db.admin_delete,
(('token', str, False),
('username', str, False)),
None)
# ================================ profile
@app.route('/api/profile/isAdmin', methods=['POST'])
def api_profile_isAdminHandle():
return SmartDbCaller(calendar_db.profile_isAdmin,
(('token', str, False), ),
None)
@app.route('/api/profile/changePassword', methods=['POST'])
def api_profile_changePasswordHandle():
return SmartDbCaller(calendar_db.profile_changePassword,
(('token', str, False),
('password', str, False)),
None)
@app.route('/api/profile/getToken', methods=['POST'])
def api_profile_getTokenHandle():
return SmartDbCaller(calendar_db.profile_getToken,
(('token', str, False), ),
None)
@app.route('/api/profile/deleteToken', methods=['POST'])
def api_profile_deleteTokenHandle():
return SmartDbCaller(calendar_db.profile_deleteToken,
(('token', str, False),
('deleteToken', str, False)),
None)
# =============================================main run
'''
def UpdateStaticResources():
global render_static_resources
if render_static_resources is not None:
return
render_static_resources = {
'url_js_localStorageAssist': url_for('static', filename='js/localStorageAssist.js'),
'url_js_i18n': url_for('static', filename='js/i18n.js'),
'url_js_api': url_for('static', filename='js/api.js'),
'url_js_headerNav': url_for('static', filename='js/headerNav.js'),
'url_tmpl_headerNac': url_for('static', filename='tmpl/headerNav.tmpl'),
'url_js_pageHome': url_for('static', filename='js/page/home.js')
}
'''
def SmartDbCaller(dbMethod, paramTuple, extraDict):
result = (False, 'Invalid parameter', None)
optCount = 0
paramList = []
optParamDict = {}
# for each item,
# item[0] is field name.
# item[1] is type.
# item[2] is whether it is optional field
realForm = request.form.to_dict()
if extraDict is not None:
realForm.update(extraDict)
for item in paramTuple:
cache = item[1](realForm.get(item[0], None))
if item[2]:
# optional param
if cache is not None:
optParamDict[item[0]] = cache
optCount += 1
else:
if cache is None:
break
paramList.append(cache)
else:
# at least one opt param
if optCount == 0 or len(optParamDict) != 0:
result = dbMethod(*paramList, **optParamDict)
return ConstructResponseBody(result)
def ConstructResponseBody(returnedTuple):
return {
'success': returnedTuple[0],
'error': returnedTuple[1],
'data': returnedTuple[2]
}
def run():
calendar_db.open()
app.run(port=config.get_config().web.port)
calendar_db.close()

0
backend/sql/mysql.sql Normal file
View File

67
backend/sql/sqlite.sql Normal file
View File

@@ -0,0 +1,67 @@
CREATE TABLE user(
[ccn_name] TEXT NOT NULL,
[ccn_password] TEXT NOT NULL,
[ccn_isAdmin] TINYINT NOT NULL CHECK(ccn_isAdmin = 1 OR ccn_isAdmin = 0),
[ccn_salt] INTEGER NOT NULL,
PRIMARY KEY (ccn_name)
);
CREATE TABLE token(
[ccn_user] TEXT NOT NULL,
[ccn_token] TEXT UNIQUE NOT NULL,
[ccn_tokenExpireOn] BIGINT NOT NULL,
[ccn_ua] TEXT NOT NULL,
[ccn_ip] TEXT NOT NULL,
FOREIGN KEY (ccn_user) REFERENCES user(ccn_name) ON DELETE CASCADE
);
CREATE TABLE collection(
[ccn_uuid] TEXT NOT NULL,
[ccn_name] TEXT NOT NULL,
[ccn_user] TEXT NOT NULL,
[ccn_lastChange] TEXT NOT NULL,
PRIMARY KEY (ccn_uuid),
FOREIGN KEY (ccn_user) REFERENCES user(ccn_name) ON DELETE CASCADE
);
CREATE TABLE share(
[ccn_uuid] TEXT NOT NULL,
[ccn_target] TEXT NOT NULL,
FOREIGN KEY (ccn_uuid) REFERENCES collection(ccn_uuid) ON DELETE CASCADE
FOREIGN KEY (ccn_target) REFERENCES user(ccn_name) ON DELETE CASCADE
);
CREATE TABLE calendar(
[ccn_uuid] TEXT NOT NULL,
[ccn_belongTo] TEXT NOT NULL,
[ccn_title] TEXT NOT NULL,
[ccn_description] TEXT NOT NULL,
[ccn_lastChange] TEXT NOT NULL,
[ccn_eventDateTimeStart] BIGINT NOT NULL,
[ccn_eventDateTimeEnd] BIGINT NOT NULL,
[ccn_timezoneOffset] INT NOT NULL,
[ccn_loopRules] TEXT NOT NULL,
[ccn_loopDateTimeStart] BIGINT NOT NULL,
[ccn_loopDateTimeEnd] BIGINT NOT NULL,
PRIMARY KEY (ccn_uuid),
FOREIGN KEY (ccn_belongTo) REFERENCES collection(ccn_uuid) ON DELETE CASCADE
);
CREATE TABLE todo(
[ccn_uuid] TEXT NOT NULL,
[ccn_belongTo] TEXT NOT NULL,
[ccn_data] TEXT NOT NULL,
[ccn_lastChange] TEXT NOT NULL,
PRIMARY KEY (ccn_uuid),
FOREIGN KEY (ccn_belongTo) REFERENCES user(ccn_name) ON DELETE CASCADE
);

52
backend/utils.py Normal file
View File

@@ -0,0 +1,52 @@
import hashlib
import random
import uuid
import time
import math
ValidUsername = set(map(lambda x:chr(x), range(48, 58, 1))) | set(map(lambda x:chr(x), range(65, 91, 1))) | set(map(lambda x:chr(x), range(97, 123, 1)))
ValidPassword = set(map(lambda x:chr(x), range(33, 127, 1)))
def IsValidUsername(strl):
return (len(set(strl) - ValidUsername) == 0)
def IsValidPassword(strl):
return (len(set(strl) - ValidPassword) == 0)
def ComputePasswordHash(password):
s = hashlib.sha256()
s.update(password.encode('utf-8'))
return s.hexdigest()
def GenerateUUID():
return str(uuid.uuid1())
def GenerateToken(username):
s = hashlib.sha256()
s.update(username.encode('utf-8'))
s.update(GenerateUUID().encode('utf-8'))
return s.hexdigest()
def GenerateSalt():
return random.randint(0, 6172748)
def ComputePasswordHashWithSalt(passwordHashed, salt):
s = hashlib.sha256()
s.update((passwordHashed + str(salt)).encode('utf-8'))
return s.hexdigest()
def GetCurrentTimestamp():
return int(time.time())
def GetTokenExpireOn():
return GetCurrentTimestamp() + 60 * 60 * 24 * 2 # add 2 day from now
def Str2Bool(strl):
return strl.lower() == 'true'
def GCD(a, b):
return math.gcd(a, b)
def LCM(a, b):
return int(a * b / GCD(a, b))

117
backend/uv.lock generated Normal file
View File

@@ -0,0 +1,117 @@
version = 1
revision = 2
requires-python = ">=3.11"
[manifest]
constraints = [
{ name = "markupsafe", specifier = "==2.1.5" },
{ name = "werkzeug", specifier = "==2.2.2" },
]
[[package]]
name = "click"
version = "8.3.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/bb/63/f9e1ea081ce35720d8b92acde70daaedace594dc93b693c869e0d5910718/click-8.3.3.tar.gz", hash = "sha256:398329ad4837b2ff7cbe1dd166a4c0f8900c3ca3a218de04466f38f6497f18a2", size = 328061, upload-time = "2026-04-22T15:11:27.506Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ae/44/c1221527f6a71a01ec6fbad7fa78f1d50dfa02217385cf0fa3eec7087d59/click-8.3.3-py3-none-any.whl", hash = "sha256:a2bf429bb3033c89fa4936ffb35d5cb471e3719e1f3c8a7c3fff0b8314305613", size = 110502, upload-time = "2026-04-22T15:11:25.044Z" },
]
[[package]]
name = "coleaf-backend"
version = "1.1.0"
source = { virtual = "." }
dependencies = [
{ name = "flask" },
]
[package.metadata]
requires-dist = [{ name = "flask", specifier = "==2.2.3" }]
[[package]]
name = "colorama"
version = "0.4.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697, upload-time = "2022-10-25T02:36:22.414Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "flask"
version = "2.2.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "click" },
{ name = "itsdangerous" },
{ name = "jinja2" },
{ name = "werkzeug" },
]
sdist = { url = "https://files.pythonhosted.org/packages/e8/5c/ff9047989bd995b1098d14b03013f160225db2282925b517bb4a967752ee/Flask-2.2.3.tar.gz", hash = "sha256:7eb373984bf1c770023fce9db164ed0c3353cd0b53f130f4693da0ca756a2e6d", size = 697599, upload-time = "2023-02-15T22:43:57.265Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/95/9c/a3542594ce4973786236a1b7b702b8ca81dbf40ea270f0f96284f0c27348/Flask-2.2.3-py3-none-any.whl", hash = "sha256:c0bec9477df1cb867e5a67c9e1ab758de9cb4a3e52dd70681f59fa40a62b3f2d", size = 101839, upload-time = "2023-02-15T22:43:55.501Z" },
]
[[package]]
name = "itsdangerous"
version = "2.2.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/9c/cb/8ac0172223afbccb63986cc25049b154ecfb5e85932587206f42317be31d/itsdangerous-2.2.0.tar.gz", hash = "sha256:e0050c0b7da1eea53ffaf149c0cfbb5c6e2e2b69c4bef22c81fa6eb73e5f6173", size = 54410, upload-time = "2024-04-16T21:28:15.614Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/04/96/92447566d16df59b2a776c0fb82dbc4d9e07cd95062562af01e408583fc4/itsdangerous-2.2.0-py3-none-any.whl", hash = "sha256:c6242fc49e35958c8b15141343aa660db5fc54d4f13a1db01a3f5891b98700ef", size = 16234, upload-time = "2024-04-16T21:28:14.499Z" },
]
[[package]]
name = "jinja2"
version = "3.1.6"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markupsafe" },
]
sdist = { url = "https://files.pythonhosted.org/packages/df/bf/f7da0350254c0ed7c72f3e33cef02e048281fec7ecec5f032d4aac52226b/jinja2-3.1.6.tar.gz", hash = "sha256:0137fb05990d35f1275a587e9aee6d56da821fc83491a0fb838183be43f66d6d", size = 245115, upload-time = "2025-03-05T20:05:02.478Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/62/a1/3d680cbfd5f4b8f15abc1d571870c5fc3e594bb582bc3b64ea099db13e56/jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67", size = 134899, upload-time = "2025-03-05T20:05:00.369Z" },
]
[[package]]
name = "markupsafe"
version = "2.1.5"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/87/5b/aae44c6655f3801e81aa3eef09dbbf012431987ba564d7231722f68df02d/MarkupSafe-2.1.5.tar.gz", hash = "sha256:d283d37a890ba4c1ae73ffadf8046435c76e7bc2247bbb63c00bd1a709c6544b", size = 19384, upload-time = "2024-02-02T16:31:22.863Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/11/e7/291e55127bb2ae67c64d66cef01432b5933859dfb7d6949daa721b89d0b3/MarkupSafe-2.1.5-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:629ddd2ca402ae6dbedfceeba9c46d5f7b2a61d9749597d4307f943ef198fc1f", size = 18219, upload-time = "2024-02-02T16:30:19.988Z" },
{ url = "https://files.pythonhosted.org/packages/6b/cb/aed7a284c00dfa7c0682d14df85ad4955a350a21d2e3b06d8240497359bf/MarkupSafe-2.1.5-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:5b7b716f97b52c5a14bffdf688f971b2d5ef4029127f1ad7a513973cfd818df2", size = 14098, upload-time = "2024-02-02T16:30:21.063Z" },
{ url = "https://files.pythonhosted.org/packages/1c/cf/35fe557e53709e93feb65575c93927942087e9b97213eabc3fe9d5b25a55/MarkupSafe-2.1.5-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6ec585f69cec0aa07d945b20805be741395e28ac1627333b1c5b0105962ffced", size = 29014, upload-time = "2024-02-02T16:30:22.926Z" },
{ url = "https://files.pythonhosted.org/packages/97/18/c30da5e7a0e7f4603abfc6780574131221d9148f323752c2755d48abad30/MarkupSafe-2.1.5-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b91c037585eba9095565a3556f611e3cbfaa42ca1e865f7b8015fe5c7336d5a5", size = 28220, upload-time = "2024-02-02T16:30:24.76Z" },
{ url = "https://files.pythonhosted.org/packages/0c/40/2e73e7d532d030b1e41180807a80d564eda53babaf04d65e15c1cf897e40/MarkupSafe-2.1.5-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7502934a33b54030eaf1194c21c692a534196063db72176b0c4028e140f8f32c", size = 27756, upload-time = "2024-02-02T16:30:25.877Z" },
{ url = "https://files.pythonhosted.org/packages/18/46/5dca760547e8c59c5311b332f70605d24c99d1303dd9a6e1fc3ed0d73561/MarkupSafe-2.1.5-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:0e397ac966fdf721b2c528cf028494e86172b4feba51d65f81ffd65c63798f3f", size = 33988, upload-time = "2024-02-02T16:30:26.935Z" },
{ url = "https://files.pythonhosted.org/packages/6d/c5/27febe918ac36397919cd4a67d5579cbbfa8da027fa1238af6285bb368ea/MarkupSafe-2.1.5-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:c061bb86a71b42465156a3ee7bd58c8c2ceacdbeb95d05a99893e08b8467359a", size = 32718, upload-time = "2024-02-02T16:30:28.111Z" },
{ url = "https://files.pythonhosted.org/packages/f8/81/56e567126a2c2bc2684d6391332e357589a96a76cb9f8e5052d85cb0ead8/MarkupSafe-2.1.5-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:3a57fdd7ce31c7ff06cdfbf31dafa96cc533c21e443d57f5b1ecc6cdc668ec7f", size = 33317, upload-time = "2024-02-02T16:30:29.214Z" },
{ url = "https://files.pythonhosted.org/packages/00/0b/23f4b2470accb53285c613a3ab9ec19dc944eaf53592cb6d9e2af8aa24cc/MarkupSafe-2.1.5-cp311-cp311-win32.whl", hash = "sha256:397081c1a0bfb5124355710fe79478cdbeb39626492b15d399526ae53422b906", size = 16670, upload-time = "2024-02-02T16:30:30.915Z" },
{ url = "https://files.pythonhosted.org/packages/b7/a2/c78a06a9ec6d04b3445a949615c4c7ed86a0b2eb68e44e7541b9d57067cc/MarkupSafe-2.1.5-cp311-cp311-win_amd64.whl", hash = "sha256:2b7c57a4dfc4f16f7142221afe5ba4e093e09e728ca65c51f5620c9aaeb9a617", size = 17224, upload-time = "2024-02-02T16:30:32.09Z" },
{ url = "https://files.pythonhosted.org/packages/53/bd/583bf3e4c8d6a321938c13f49d44024dbe5ed63e0a7ba127e454a66da974/MarkupSafe-2.1.5-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:8dec4936e9c3100156f8a2dc89c4b88d5c435175ff03413b443469c7c8c5f4d1", size = 18215, upload-time = "2024-02-02T16:30:33.081Z" },
{ url = "https://files.pythonhosted.org/packages/48/d6/e7cd795fc710292c3af3a06d80868ce4b02bfbbf370b7cee11d282815a2a/MarkupSafe-2.1.5-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:3c6b973f22eb18a789b1460b4b91bf04ae3f0c4234a0a6aa6b0a92f6f7b951d4", size = 14069, upload-time = "2024-02-02T16:30:34.148Z" },
{ url = "https://files.pythonhosted.org/packages/51/b5/5d8ec796e2a08fc814a2c7d2584b55f889a55cf17dd1a90f2beb70744e5c/MarkupSafe-2.1.5-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ac07bad82163452a6884fe8fa0963fb98c2346ba78d779ec06bd7a6262132aee", size = 29452, upload-time = "2024-02-02T16:30:35.149Z" },
{ url = "https://files.pythonhosted.org/packages/0a/0d/2454f072fae3b5a137c119abf15465d1771319dfe9e4acbb31722a0fff91/MarkupSafe-2.1.5-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f5dfb42c4604dddc8e4305050aa6deb084540643ed5804d7455b5df8fe16f5e5", size = 28462, upload-time = "2024-02-02T16:30:36.166Z" },
{ url = "https://files.pythonhosted.org/packages/2d/75/fd6cb2e68780f72d47e6671840ca517bda5ef663d30ada7616b0462ad1e3/MarkupSafe-2.1.5-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ea3d8a3d18833cf4304cd2fc9cbb1efe188ca9b5efef2bdac7adc20594a0e46b", size = 27869, upload-time = "2024-02-02T16:30:37.834Z" },
{ url = "https://files.pythonhosted.org/packages/b0/81/147c477391c2750e8fc7705829f7351cf1cd3be64406edcf900dc633feb2/MarkupSafe-2.1.5-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:d050b3361367a06d752db6ead6e7edeb0009be66bc3bae0ee9d97fb326badc2a", size = 33906, upload-time = "2024-02-02T16:30:39.366Z" },
{ url = "https://files.pythonhosted.org/packages/8b/ff/9a52b71839d7a256b563e85d11050e307121000dcebc97df120176b3ad93/MarkupSafe-2.1.5-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:bec0a414d016ac1a18862a519e54b2fd0fc8bbfd6890376898a6c0891dd82e9f", size = 32296, upload-time = "2024-02-02T16:30:40.413Z" },
{ url = "https://files.pythonhosted.org/packages/88/07/2dc76aa51b481eb96a4c3198894f38b480490e834479611a4053fbf08623/MarkupSafe-2.1.5-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:58c98fee265677f63a4385256a6d7683ab1832f3ddd1e66fe948d5880c21a169", size = 33038, upload-time = "2024-02-02T16:30:42.243Z" },
{ url = "https://files.pythonhosted.org/packages/96/0c/620c1fb3661858c0e37eb3cbffd8c6f732a67cd97296f725789679801b31/MarkupSafe-2.1.5-cp312-cp312-win32.whl", hash = "sha256:8590b4ae07a35970728874632fed7bd57b26b0102df2d2b233b6d9d82f6c62ad", size = 16572, upload-time = "2024-02-02T16:30:43.326Z" },
{ url = "https://files.pythonhosted.org/packages/3f/14/c3554d512d5f9100a95e737502f4a2323a1959f6d0d01e0d0997b35f7b10/MarkupSafe-2.1.5-cp312-cp312-win_amd64.whl", hash = "sha256:823b65d8706e32ad2df51ed89496147a42a2a6e01c13cfb6ffb8b1e92bc910bb", size = 17127, upload-time = "2024-02-02T16:30:44.418Z" },
]
[[package]]
name = "werkzeug"
version = "2.2.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markupsafe" },
]
sdist = { url = "https://files.pythonhosted.org/packages/f8/c1/1c8e539f040acd80f844c69a5ef8e2fccdf8b442dabb969e497b55d544e1/Werkzeug-2.2.2.tar.gz", hash = "sha256:7ea2d48322cc7c0f8b3a215ed73eabd7b5d75d0b50e31ab006286ccff9e00b8f", size = 844378, upload-time = "2022-08-08T21:44:15.376Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c8/27/be6ddbcf60115305205de79c29004a0c6bc53cec814f733467b1bb89386d/Werkzeug-2.2.2-py3-none-any.whl", hash = "sha256:f979ab81f58d7318e064e99c4506445d60135ac5cd2e177a2de0089bfd4c9bd5", size = 232700, upload-time = "2022-08-08T21:44:13.251Z" },
]