dinner_vote/dao.py

168 lines
4.0 KiB
Python

import sqlite3
from datetime import datetime, timedelta
from typing import Generator
from flask import request
db_path = './data.sqlite3'
ddl_db = sqlite3.connect(db_path)
ddl_cursor = ddl_db.cursor()
try:
ddl_cursor.execute('''
create table if not exists user_menu
(
user text,
menu text,
datestr text,
primary key (user, datestr)
)
''')
ddl_cursor.execute('''
create table if not exists roll_result
(
datestr text primary key,
value text
)
''')
ddl_cursor.execute('''
create table if not exists menu
(
name text primary key,
label text
)
''')
ddl_db.commit()
finally:
ddl_cursor.close()
ddl_db.close()
def get_user() -> str:
"""
根据访问IP决定用户ID
:return: 用户ID
"""
client_ip = request.remote_addr
return client_ip
def get_user_menu() -> str:
"""
获取当前用户的投票内容
:return: 投票内容
"""
user = get_user()
db = sqlite3.connect(db_path)
cursor = db.cursor()
datestr = datetime.now().strftime('%Y-%m-%d')
try:
cursor.execute("select menu from user_menu where user = ? and datestr=?", (user, datestr))
row = cursor.fetchone()
if row:
return row[0]
else:
cursor.execute("select menu from user_menu where user = ? order by datestr desc limit 1", (user,))
row = cursor.fetchone()
if row:
return row[0]
return ''
finally:
cursor.close()
db.close()
def set_user_menu(menu: str, user: str = None) -> None:
"""
设置用户投票内容
:param menu: 投票内容
:param user: 用户,默认当前用户
"""
datestr = datetime.now().strftime('%Y-%m-%d')
if user is None:
user = get_user()
db = sqlite3.connect(db_path)
cursor = db.cursor()
try:
cursor.execute("insert or replace into user_menu(user,menu,datestr) values(?,?,?)",
(user, menu, datestr))
db.commit()
finally:
cursor.close()
db.close()
def fetch_all_user_today_menu() -> Generator[tuple[str, str], None, None]:
"""
获取所有用户今日的投票内容
"""
datestr = datetime.now().strftime('%Y-%m-%d')
db = sqlite3.connect(db_path)
cursor = db.cursor()
try:
cursor.execute("select user, menu from user_menu where menu != '' and datestr=? order by user",
(datestr,))
row = cursor.fetchone()
while row:
yield row[0], row[1]
row = cursor.fetchone()
finally:
cursor.close()
db.close()
def fetch_all_menu() -> Generator[tuple[str, str], None, None]:
"""
获取所有可点的菜单
"""
db = sqlite3.connect(db_path)
cursor = db.cursor()
try:
cursor.execute("select name, label from menu order by order_num")
row = cursor.fetchone()
while row:
yield row[0], row[1]
row = cursor.fetchone()
finally:
cursor.close()
db.close()
def fetch_roll_result(interval: int = 0) -> str | None:
"""
获取N天前的抽签结果
:param interval: 间隔,天
:return: 抽签结果
"""
date = datetime.now() + timedelta(days=interval)
datestr = date.strftime('%Y-%m-%d')
db = sqlite3.connect(db_path)
cursor = db.cursor()
try:
cursor.execute("select value from roll_result where datestr=?",
(datestr,))
row = cursor.fetchone()
if row:
return row[0]
return None
finally:
cursor.close()
db.close()
def set_roll_result(value: str) -> None:
"""
写入抽签结果
:param value: 抽签结果
"""
datestr = datetime.now().strftime('%Y-%m-%d')
db = sqlite3.connect(db_path)
cursor = db.cursor()
try:
cursor.execute("insert or replace into roll_result(datestr, value) values(?,?)",
(datestr, value))
db.commit()
finally:
cursor.close()
db.close()