diff --git a/assets/dict/dictionary_parser.py b/assets/dict/dictionary_parser.py
index 69c619f..4803631 100755
--- a/assets/dict/dictionary_parser.py
+++ b/assets/dict/dictionary_parser.py
@@ -1,12 +1,13 @@
import base64
import os
import re
-import psycopg2
+import mysql.connector
import hashlib
from typing import List, Tuple, Dict, Optional, Any
from readmdict import MDX, MDD
from bs4 import BeautifulSoup, Tag
import json
+from mysql.connector import Error
from backend.app.admin.schema.dict import Example, Frequency, Pronunciation, FamilyItem, WordFamily, \
WordMetaData, Sense, Definition, Topic, CrossReference, DictEntry, Etymology, EtymologyItem
@@ -20,10 +21,10 @@ class DictionaryParser:
self.connect_db()
def connect_db(self):
- """连接到PostgreSQL数据库"""
+ """连接到MySQL数据库"""
try:
- self.conn = psycopg2.connect(**self.db_config)
- except Exception as e:
+ self.conn = mysql.connector.connect(**self.db_config)
+ except Error as e:
print(f"数据库连接失败: {e}")
raise
@@ -44,7 +45,7 @@ class DictionaryParser:
print(f"解析完成,共处理 {len(entries)} 个词汇条目")
- except Exception as e:
+ except Error as e:
print(f"解析词典文件失败: {e}")
raise
@@ -69,7 +70,7 @@ class DictionaryParser:
return entries, media_references
- except Exception as e:
+ except Error as e:
print(f"解析MDX文件失败: {e}")
raise
@@ -92,7 +93,7 @@ class DictionaryParser:
# 保存媒体文件
self.save_dict_media(dict_media, media_references, entry_ids)
- except Exception as e:
+ except Error as e:
print(f"解析MDD文件失败: {e}")
raise
@@ -146,10 +147,10 @@ class DictionaryParser:
def save_entries(self, entries: List[Tuple[str, str]]) -> Dict[str, int]:
"""保存词汇条目到数据库,并更新 details 字段"""
- from psycopg2.extras import Json
+ from mysql.connector import Error
import hashlib
- cursor = self.conn.cursor()
+ cursor = self.conn.cursor(dictionary=True)
entry_ids = {}
for word, definition in entries:
@@ -164,7 +165,7 @@ class DictionaryParser:
# 如果存在现有记录
if existing_record:
- entry_id, existing_definition, existing_details_json = existing_record
+ entry_id, existing_definition, existing_details_json = existing_record['id'], existing_record['definition'], existing_record['details']
# 获取现有的 details
if existing_details_json:
@@ -199,905 +200,229 @@ class DictionaryParser:
for link in new_ref_links:
if link not in combined_links:
combined_links.append(link)
- existing_details.ref_link = combined_links
else:
- existing_details.ref_link = new_ref_links
- metadata = existing_details
+ combined_links = new_ref_links
else:
- # 如果没有现有 details,创建新的 metadata
- metadata = WordMetaData()
- metadata.ref_link = new_ref_links
+ combined_links = new_ref_links
- # 保留现有的 metadata
- elif existing_details:
- metadata = existing_details
- else:
- # 如果当前 definition 不是 @@@ 开头,则正常更新 definition 和解析 HTML
- final_definition = definition
-
- # 解析 HTML 内容获取 metadata 信息
- html_metadata, images_info1 = self.parse_definition_to_metadata(definition)
- if images_info1:
- self.save_entry_images(entry_id, word, images_info1)
-
- # 合并 metadata 信息
- if html_metadata:
+ # 更新 details
if existing_details:
- # 保留现有的 ref_link,合并其他字段
- html_metadata.ref_link = existing_details.ref_link
- metadata = html_metadata
-
- # 提取并处理图片信息
- images_info = self.extract_images_from_definition(definition, word)
- if images_info:
- self.save_entry_images(entry_id, word, images_info)
- else:
- # 新词条,正常处理
- if definition.startswith('@@@'):
- # 处理 @@@ 开头的引用链接
- lines = definition.split('\n')
- ref_links = []
- for line in lines:
- if line.startswith('@@@'):
- link = line[3:].strip()
- if link:
- ref_links.append(link)
+ metadata = existing_details.model_copy(update={"ref_link": combined_links})
else:
- break
+ metadata = WordMetaData(ref_link=combined_links)
- if ref_links:
- metadata = WordMetaData()
- metadata.ref_link = ref_links
- else:
- # 解析 HTML 内容
- html_metadata, images_info1 = self.parse_definition_to_metadata(definition)
- metadata = html_metadata
+ # 如果是新词条或需要更新 details
+ if not existing_record or metadata:
+ # 如果是新词条,创建默认 metadata
+ if not existing_record:
+ metadata = WordMetaData()
- # 提取并处理图片信息
- images_info = self.extract_images_from_definition(definition, word)
- if images_info or images_info1:
- # 先插入词条获取 entry_id
- cursor.execute('''
- INSERT INTO dict_entry (word, definition, details)
- VALUES (%s, %s, %s) RETURNING id
- ''', (word, definition, Json(metadata.model_dump()) if metadata else None))
+ # 准备 details 数据
+ details_dict = metadata.model_dump() if metadata else None
- entry_id = cursor.fetchone()[0]
- entry_ids[word] = entry_id
-
- # 处理图片信息
- if images_info:
- self.save_entry_images(entry_id, word, images_info)
- if images_info1:
- self.save_entry_images(entry_id, word, images_info1)
- continue # 跳过后续的插入操作
-
- # 保存或更新词条到数据库
- if existing_record:
- # 更新现有记录
- cursor.execute('''
- UPDATE dict_entry
- SET definition = %s,
- details = %s
- WHERE word = %s RETURNING id
- ''', (final_definition, Json(metadata.model_dump()) if metadata else None, word))
- entry_id = cursor.fetchone()[0] if cursor.rowcount > 0 else existing_record[0]
- entry_ids[word] = entry_id
- else:
- # 插入新记录(仅当不是上面处理过的情况)
- if word not in entry_ids: # 避免重复插入
+ if existing_record:
+ # 更新现有记录
cursor.execute('''
- INSERT INTO dict_entry (word, definition, details)
- VALUES (%s, %s, %s) RETURNING id
- ''', (word, final_definition, Json(metadata.model_dump()) if metadata else None))
- result = cursor.fetchone()
- if result:
- entry_ids[word] = result[0]
+ UPDATE dict_entry
+ SET definition = %s, details = %s
+ WHERE word = %s
+ ''', (final_definition, json.dumps(details_dict) if details_dict else None, word))
+ entry_id = existing_record['id']
+ else:
+ # 插入新记录
+ cursor.execute('''
+ INSERT INTO dict_entry (word, definition, details)
+ VALUES (%s, %s, %s)
+ ''', (word, final_definition, json.dumps(details_dict) if details_dict else None))
+ entry_id = cursor.lastrowid
- except Exception as e:
- print(f"保存词汇 '{word}' 时出错: {e}")
- continue
+ else:
+ # 保持现有记录不变
+ entry_id = existing_record['id']
+
+ entry_ids[word] = entry_id
+
+ except Error as e:
+ print(f"保存词条 '{word}' 时出错: {e}")
self.conn.commit()
cursor.close()
return entry_ids
- def save_dict_media(self, media_files: Dict[str, bytes], media_references: List[Dict],
- entry_ids: Dict[str, int]) -> None:
- """保存媒体文件到数据库"""
- # 按文件名分组媒体引用
- refs_by_filename = {}
- for ref in media_references:
- filename = ref['filename'].replace('\\', '/').lstrip('/')
- if filename not in refs_by_filename:
- refs_by_filename[filename] = []
- refs_by_filename[filename].append(ref)
-
- saved_count = 0
- error_count = 0
-
- for filename, file_data in media_files.items():
- if filename in refs_by_filename:
- try:
- # 每次操作都使用新的游标
- cursor = self.conn.cursor()
-
- # 计算文件哈希
- file_hash = hashlib.sha256(file_data).hexdigest()
-
- # 先检查是否已存在
- cursor.execute('''
- SELECT COUNT(*)
- FROM dict_media
- WHERE file_name = %s
- ''', (filename,))
-
- if cursor.fetchone()[0] > 0:
- print(f"文件已存在,跳过: {filename}")
- cursor.close()
- continue
-
- file_type = refs_by_filename[filename][0]['type']
- # 保存文件数据
- cursor.execute('''
- INSERT INTO dict_media (file_name, file_type, file_data, file_hash)
- VALUES (%s, %s, %s, %s) RETURNING id
- ''', (filename, file_type, psycopg2.Binary(file_data), file_hash))
-
- media_id = cursor.fetchone()[0]
-
- # 关联到对应的词汇条目
- update_count = 0
- for ref in refs_by_filename[filename]:
- word = ref['word']
- if word in entry_ids:
- cursor.execute('''
- UPDATE dict_media
- SET dict_id = %s
- WHERE id = %s
- ''', (entry_ids[word], media_id))
- update_count += 1
-
- self.conn.commit()
- cursor.close()
-
- saved_count += 1
- if saved_count % 100 == 0:
- print(f"已处理 {saved_count} 个媒体文件")
-
- except Exception as e:
- # 发生错误时回滚并继续处理下一个文件
- try:
- self.conn.rollback()
- cursor.close()
- except:
- pass
- error_count += 1
- print(f"保存媒体文件 '{filename}' 时出错: {e}")
- continue
- else:
- # 处理图片文件(没有在 media_references 中的文件)
- try:
- cursor = self.conn.cursor()
-
- # 计算文件哈希
- file_hash = hashlib.sha256(file_data).hexdigest()
-
- # 检查是否已存在
- cursor.execute('''
- SELECT COUNT(*)
- FROM dict_media
- WHERE file_name = %s
- ''', (filename,))
-
- if cursor.fetchone()[0] == 0:
- # 保存图片文件数据
- cursor.execute('''
- INSERT INTO dict_media (file_name, file_type, file_data, file_hash)
- VALUES (%s, %s, %s, %s)
- ''', (filename, 'image', psycopg2.Binary(file_data), file_hash))
- self.conn.commit()
-
- cursor.close()
- saved_count += 1
-
- except Exception as e:
- try:
- self.conn.rollback()
- cursor.close()
- except:
- pass
- error_count += 1
- print(f"保存图片文件 '{filename}' 时出错: {e}")
-
- print(f"媒体文件处理完成: 成功 {saved_count} 个,错误 {error_count} 个")
-
- def export_media_files(self, output_dir: str) -> None:
- """导出媒体文件到指定目录"""
- cursor = self.conn.cursor()
-
- cursor.execute('''
- SELECT id, file_name, file_type, file_data
- FROM dict_media
- WHERE file_data IS NOT NULL
- ''')
-
- if not os.path.exists(output_dir):
- os.makedirs(output_dir)
-
- audio_dir = os.path.join(output_dir, 'audio')
- image_dir = os.path.join(output_dir, 'images')
-
- for dir_path in [audio_dir, image_dir]:
- if not os.path.exists(dir_path):
- os.makedirs(dir_path)
-
- count = 0
- for id, filename, file_type, file_data in cursor.fetchall():
- try:
- if file_type == 'audio':
- # 尝试从 filename 中提取扩展名,如果没有则默认 .mp3
- ext = os.path.splitext(filename)[1]
- if not ext:
- ext = '.mp3'
- output_path = os.path.join(audio_dir, f"{id}{ext}")
- else:
- # 图片文件,保留原文件名
- safe_filename = os.path.basename(filename)
- if not safe_filename:
- safe_filename = f"{id}.jpg" # 默认图片扩展名
- output_path = os.path.join(image_dir, safe_filename)
-
- with open(output_path, 'wb') as f:
- f.write(file_data)
- count += 1
-
- except Exception as e:
- print(f"导出文件 '{filename}' 失败: {e}")
- continue
-
- cursor.close()
- print(f"成功导出 {count} 个媒体文件到 {output_dir}")
-
- def extract_images_from_definition(self, definition_html: str, word: str) -> List[Dict]:
- """
- 从 definition HTML 中提取图片引用
- """
-
- soup = BeautifulSoup(definition_html, 'html.parser')
- images_refs = []
-
- # 查找带有 picfile 属性的 span 标签
- ldoce_entry = soup.find('span', class_='ldoceEntry Entry')
- if ldoce_entry:
- picfile_spans = ldoce_entry.find_all('span', attrs={'picfile': True})
- for pic_span in picfile_spans:
- img_tag = pic_span.find('img')
- sense_id = pic_span.get('id')
- if img_tag:
- alt_attr = img_tag.get('alt')
- src_attr = img_tag.get('src')
- base64_attr = img_tag.get('base64')
- if base64_attr:
- # 检查是否是 base64 格式
- if base64_attr.startswith('data:image/'):
- # 提取 base64 数据
- base64_data = base64_attr.split(',')[1] if ',' in base64_attr else base64_attr
- try:
- # 解码 base64 数据
- image_data = base64.b64decode(base64_data)
- images_refs.append({
- 'sense_id': sense_id,
- 'filename': alt_attr,
- 'src': base64_attr,
- 'image_data': image_data, # 实际的二进制图片数据
- 'type': 'image',
- 'word': word
- })
- except Exception as e:
- print(f"解码 base64 图片数据失败: {e}")
- # 如果解码失败,仍然记录基本信息
- images_refs.append({
- 'sense_id': sense_id,
- 'filename': alt_attr,
- 'src': src_attr,
- 'type': 'image',
- 'word': word
- })
- else:
- # 不是 base64 格式,可能是文件路径
- images_refs.append({
- 'sense_id': sense_id,
- 'filename': alt_attr,
- 'src': src_attr,
- 'type': 'image',
- 'word': word
- })
-
- return images_refs
-
- def parse_definition_to_metadata(self, definition_html: str) -> tuple[Optional[WordMetaData], List[Dict]]:
- """
- 从 definition HTML 中提取 WordMetaData 信息,并处理图片信息
- 返回: (metadata, images_info_list)
- """
- soup = BeautifulSoup(definition_html, 'html.parser') # 可改为 'lxml' if installed
- images_info: List[Dict] = []
- word_metadata: Dict[str, Any] = {'dict_list': []}
-
- try:
- # 查找所有 dictentry 容器
- dict_entries = soup.find_all('span', class_='dictentry')
- if not dict_entries:
- print(f"未找到 dictentry 节点")
- return WordMetaData(**word_metadata), images_info
-
- for dict_entry in dict_entries:
- entry: Dict[str, Any] = {}
- # --- 1. 基本词条信息 ---
- head_tag = dict_entry.find(class_='Head')
- if head_tag:
- # GRAM 及物性
- head_gram_tag = head_tag.find(class_='GRAM')
- if head_gram_tag:
- full_text = ''.join(head_gram_tag.stripped_strings)
- match = re.search(r'\[([^\]]+)\]', full_text)
- if match:
- content = match.group(1)
- entry['transitive'] = [item.strip().lower() for item in content.split(',')]
-
- hwd_tag = dict_entry.find(class_='HWD')
- if hwd_tag:
- entry['headword'] = hwd_tag.get_text(strip=True)
-
- # 同形异义词编号 HOMNUM
- homnum_tag = dict_entry.find(class_='HOMNUM')
- if homnum_tag:
- try:
- entry['homograph_number'] = int(homnum_tag.get_text(strip=True))
- except ValueError:
- pass # Ignore if not a number
-
- # 词性 lm5pp_POS (取第一个)
- pos_tag = dict_entry.find(class_='lm5pp_POS')
- if pos_tag:
- entry['part_of_speech'] = pos_tag.get_text(strip=True)
-
- # --- 2. 发音 Pronunciations ---
- pron_dict = {}
- # 英式发音 IPA
- uk_pron_tag = dict_entry.find(class_='PRON') # 通常第一个是英式
- if uk_pron_tag:
- # 处理 ə 这样的音标变体
- ipa_text = ''.join(uk_pron_tag.stripped_strings)
- pron_dict['uk_ipa'] = ipa_text.strip('/ ') # 去掉斜杠
-
- # 美式发音 IPA (可能在 AMEVARPRON 中)
- us_pron_tag = dict_entry.find(class_='AMEVARPRON')
- if us_pron_tag:
- us_ipa_text = ''.join(us_pron_tag.stripped_strings)
- pron_dict['us_ipa'] = us_ipa_text.strip('/ $ ') # 去掉斜杠和美元符号
-
- # 英式音频 - 优先查找 data-src-mp3,然后查找 href="sound://..."
- uk_audio_tag = dict_entry.find('a', class_='speaker brefile', attrs={'data-src-mp3': lambda x: x and x.startswith('sound://')})
- if not uk_audio_tag:
- # 查找 href 属性以 sound:// 开头的
- uk_audio_tag = dict_entry.find('a', class_='speaker brefile', href=lambda x: x and x.startswith('sound://'))
- if not uk_audio_tag:
- # 更宽松的查找,只要 class 包含 speaker 和 brefile
- uk_audio_tag = dict_entry.find('a', class_=lambda x: x and 'speaker' in x and 'brefile' in x, attrs={'data-src-mp3': True})
- if not uk_audio_tag:
- uk_audio_tag = dict_entry.find('a', class_=lambda x: x and 'speaker' in x and 'brefile' in x, href=lambda x: x and x.startswith('sound://'))
-
- if uk_audio_tag:
- # 优先使用 data-src-mp3
- uk_audio_src = uk_audio_tag.get('data-src-mp3')
- if not uk_audio_src or not uk_audio_src.startswith('sound://'):
- # 否则使用 href
- uk_audio_href = uk_audio_tag.get('href', '')
- if uk_audio_href.startswith('sound://'):
- uk_audio_src = uk_audio_href
- if uk_audio_src:
- pron_dict['uk_audio'] = uk_audio_src.replace('sound://', '', 1)
- pron_dict['uk_audio_title'] = uk_audio_tag.get('title', '')
-
- # 美式音频 - 优先查找 data-src-mp3,然后查找 href="sound://..."
- us_audio_tag = dict_entry.find('a', class_='speaker amefile', attrs={'data-src-mp3': lambda x: x and x.startswith('sound://')})
- if not us_audio_tag:
- us_audio_tag = dict_entry.find('a', class_='speaker amefile', href=lambda x: x and x.startswith('sound://'))
- if not us_audio_tag:
- us_audio_tag = dict_entry.find('a', class_=lambda x: x and 'speaker' in x and 'amefile' in x, attrs={'data-src-mp3': True})
- if not us_audio_tag:
- us_audio_tag = dict_entry.find('a', class_=lambda x: x and 'speaker' in x and 'amefile' in x, href=lambda x: x and x.startswith('sound://'))
-
- if us_audio_tag:
- us_audio_src = us_audio_tag.get('data-src-mp3')
- if not us_audio_src or not us_audio_src.startswith('sound://'):
- us_audio_href = us_audio_tag.get('href', '')
- if us_audio_href.startswith('sound://'):
- us_audio_src = us_audio_href
- if us_audio_src:
- pron_dict['us_audio'] = us_audio_src.replace('sound://', '', 1)
- pron_dict['us_audio_title'] = us_audio_tag.get('title', '')
-
- if pron_dict:
- entry['pronunciations'] = Pronunciation(**pron_dict)
-
- # --- 3. 频率 Frequency ---
- freq_dict = {}
- freq_level_tag = dict_entry.find(class_='LEVEL')
- if freq_level_tag:
- freq_dict['level'] = freq_level_tag.get('title', '').strip()
- freq_dict['level_tag'] = freq_level_tag.get_text(strip=True)
-
- freq_spoken_tag = dict_entry.find(class_='FREQ', title=lambda x: x and 'spoken' in x.lower())
- if freq_spoken_tag:
- freq_dict['spoken'] = freq_spoken_tag.get('title', '').strip()
- freq_dict['spoken_tag'] = freq_spoken_tag.get_text(strip=True)
-
- freq_written_tag = dict_entry.find(class_='FREQ', title=lambda x: x and 'written' in x.lower())
- if freq_written_tag:
- freq_dict['written'] = freq_written_tag.get('title', '').strip()
- freq_dict['written_tag'] = freq_written_tag.get_text(strip=True)
-
- if freq_dict:
- entry['frequency'] = Frequency(**freq_dict)
-
- # --- 4. 话题 Topics ---
- topics_list = []
- topic_tags = dict_entry.find_all('a', class_='topic')
- for topic_tag in topic_tags:
- topic_text = topic_tag.get_text(strip=True)
- topic_href = topic_tag.get('href', '')
- if topic_text:
- topics_list.append(Topic(name=topic_text, href=topic_href))
- if topics_list:
- entry['topics'] = topics_list
-
- # --- 5. 词族 Word Family ---
- word_fams_div = dict_entry.find(class_='LDOCE_word_family')
- if word_fams_div:
- families_list = []
- current_pos = None
- current_items = []
- # 遍历子元素
- for child in word_fams_div.children:
- if isinstance(child, Tag):
- if 'pos' in child.get('class', []):
- # 如果遇到新的 pos,先保存上一个
- if current_pos and current_items:
- families_list.append(WordFamily(pos=current_pos, items=current_items))
- # 开始新的 pos 组
- current_pos = child.get_text(strip=True)
- current_items = []
- elif 'w' in child.get('class', []): # 包括 'crossRef w' 和 'w'
- item_text = child.get_text(strip=True)
- item_href = child.get('href', '') if child.name == 'a' else None
- current_items.append(FamilyItem(text=item_text, href=item_href))
- # 保存最后一个 pos 组
- if current_pos and current_items:
- families_list.append(WordFamily(pos=current_pos, items=current_items))
-
- if families_list:
- entry['word_family'] = families_list
-
- # --- 6. 义项 Senses 和 定义/例子 ---
- senses_list = []
- # 查找所有 Sense div (可能带有 newline 类)
- sense_tags = dict_entry.find_all('span', class_=lambda x: x and 'Sense' in x)
- for sense_tag in sense_tags:
- if not isinstance(sense_tag, Tag):
- continue
- sense_id = sense_tag.get('id', '')
- sense_dict: Dict[str, Any] = {'id': sense_id}
-
- # Sense 编号 (sensenum)
- sensenum_tag = sense_tag.find(class_='sensenum')
- if sensenum_tag:
- sense_dict['number'] = sensenum_tag.get_text(strip=True)
-
- # GRAM 可数性
- gram_tag = sense_tag.find(class_='GRAM')
- if gram_tag:
- full_text = ''.join(gram_tag.stripped_strings)
- # 使用正则表达式匹配方括号内的内容,例如 [countable, uncountable]
- match = re.search(r'\[([^\]]+)\]', full_text)
- if match:
- # 提取方括号内的文本,如 "countable, uncountable"
- content = match.group(1)
- # 按逗号分割,并清理每个词
- sense_dict['countability'] = [item.strip().lower() for item in content.split(',')]
-
- # --- 修改逻辑:精细化处理 Crossref 标签 ---
- crossref_container_tags = sense_tag.find_all('span', class_=lambda x: x and 'Crossref' in x)
- crossref_items_list = []
- for container_tag in crossref_container_tags:
- # 查找容器内所有的 crossRef 链接
- crossref_link_tags = container_tag.find_all('a', class_='crossRef')
- for link_tag in crossref_link_tags:
- crossref_item_dict: Dict[str, Any] = {'sense_id': sense_id}
-
- # 1. 尝试从 link_tag 前面的兄弟节点 (通常是 REFLEX) 获取描述性文本
- # text_parts = []
- # # 遍历 link_tag 之前的直接兄弟节点
- # prev_sibling = link_tag.previous_sibling
- # while prev_sibling and hasattr(prev_sibling, 'name') and prev_sibling.name != 'a':
- # # 检查是否是包含文本的标签 (如 REFLEX, neutral span)
- # if hasattr(prev_sibling, 'get_text'):
- # txt = prev_sibling.get_text(strip=True)
- # if txt:
- # text_parts.append(txt)
- # prev_sibling = prev_sibling.previous_sibling
- # # 如果前面没找到描述性文本,则回退到 link_tag 自身的文本
- # if not text_parts:
- # link_text = link_tag.get_text(strip=True)
- # if link_text:
- # text_parts.append(link_text)
- # # 组合找到的文本
- # if text_parts:
- # crossref_item_dict['text'] = ' '.join(reversed(text_parts)).strip() # 反转是因为我们是向前查找的
-
- # 2. 获取 href
- href = link_tag.get('href')
- if href:
- crossref_item_dict['entry_href'] = href
-
- ref_hwd = link_tag.find('span', class_='REFHWD')
- text = ref_hwd.get_text(strip=True)
- if text:
- crossref_item_dict['text'] = text
-
- # 检查是否是图片相关的交叉引用 (ldoce-show-image)
- if 'ldoce-show-image' in link_tag.get('class', []):
- # 提取图片 ID
- showid = link_tag.get('showid', '')
- if showid:
- crossref_item_dict['show_id'] = showid
-
- # --- 修改逻辑:提取完整的 base64 字符串 ---
- # 提取 base64 属性值 (可能包含前缀 data:image/...)
- full_base64_data = link_tag.get('src', '')
- if not full_base64_data:
- full_base64_data = link_tag.get('base64', '')
-
- if full_base64_data and full_base64_data.startswith('data:'):
- # --- 新增逻辑:组合 image_filename 并准备图片信息 ---
- # 为了文件名更安全,可以对 base64 字符串的一部分进行哈希或截取
- # 这里简化处理,直接用 showid 和 base64 的一部分 (例如前50个字符) 组合
- # 或者使用 base64 字符串的哈希值
- import hashlib
- # 使用 base64 字符串的 SHA1 哈希的前16位作为唯一标识符的一部分
- base64_hash = hashlib.sha1(full_base64_data.encode('utf-8')).hexdigest()[:16]
- # 组合 file_name
- image_filename = f"{showid}_sha1_{base64_hash}" # 推荐使用哈希
- crossref_item_dict['image_filename'] = image_filename
- # 可以考虑从 base64 前缀提取 MIME 类型
- mime_type = full_base64_data.split(';')[0].split(':')[1] if ';' in full_base64_data else 'image/jpeg'
-
- # 准备图片信息字典,供后续存入 dict_media 表
- images_info.append({
- 'sense_id': sense_id,
- 'filename': image_filename,
- 'src': f"crossref:{showid}", # 可以包含 showid 便于识别
- 'type': 'image_crossref',
- 'crossref_showid': showid,
- # 存储完整的 base64 数据
- 'crossref_full_base64': full_base64_data,
- # 提取图片标题
- 'crossref_title': link_tag.get('title', ''),
- 'mime_type': mime_type
- })
- else:
- crossref_item_dict['image_filename'] = full_base64_data
-
- # 提取图片标题 (title 属性)
- image_title = link_tag.get('title', '')
- if image_title:
- crossref_item_dict['image_title'] = image_title
-
- # 提取 LDOCE 版本信息 (从容器 span 标签上获取)
- container_classes = container_tag.get('class', [])
- version_classes = [cls for cls in container_classes if cls.startswith('LDOCEVERSION_')]
- if version_classes:
- crossref_item_dict['ldoce_version'] = version_classes[0]
-
- # 如果提取到了任何信息,则添加到列表
- if crossref_item_dict:
- try:
- crossref_item = CrossReference(**crossref_item_dict)
- crossref_items_list.append(crossref_item)
- except Exception as e:
- print(f"创建 CrossReference 对象失败: {e}, 数据: {crossref_item_dict}")
-
- if crossref_items_list:
- sense_dict['cross_references'] = crossref_items_list
-
- # Signpost 和其中文 (SIGNPOST)
- signpost_tag = sense_tag.find(class_='SIGNPOST')
- if signpost_tag:
- # 英文部分是 SIGNPOST 标签本身的内容(不含子标签)
- # signpost_en_text = signpost_tag.get_text(strip=True) # 这会包含子标签 cn_txt
- # 更精确地获取英文部分
- signpost_parts = []
- for content in signpost_tag.contents:
- if isinstance(content, str):
- signpost_parts.append(content.strip())
- elif content.name != 'span' or 'cn_txt' not in content.get('class', []):
- signpost_parts.append(content.get_text(strip=True))
- sense_dict['signpost_en'] = ' '.join(filter(None, signpost_parts))
-
- cn_signpost_tag = signpost_tag.find(class_='cn_txt')
- if cn_signpost_tag:
- sense_dict['signpost_cn'] = cn_signpost_tag.get_text(strip=True)
-
- # 定义 (DEF) - 可能有英文和中文
- defs_list = []
- def_tags = sense_tag.find_all(class_='DEF')
- i = 0
- while i < len(def_tags):
- en_def_tag = def_tags[i]
- cn_def_tag = None
- # 检查下一个 DEF 是否是中文翻译
- if i + 1 < len(def_tags) and def_tags[i + 1].find(class_='cn_txt'):
- cn_def_tag = def_tags[i + 1].find(class_='cn_txt')
- i += 2 # 跳过中英文一对
- else:
- i += 1 # 只处理英文定义
-
- def_en_text = self._extract_text_with_links(en_def_tag) # 处理内部链接 a.defRef
- def_cn_text = cn_def_tag.get_text(strip=True) if cn_def_tag else None
-
- related_in_def_list = []
- for content in en_def_tag.contents:
- if hasattr(content, 'name'):
- if content.name == 'a' and 'defRef' in content.get('class', []):
- # 提取 href 属性中的链接词
- href = content.get('href', '')
- # 假设 href 格式为 entry://word 或类似,提取 word 部分
- # 简单处理:去掉前缀,按 '#' 或 '/' 分割取第一部分
- if href:
- # 去掉协议部分
- if '://' in href:
- word_part = href.split('://', 1)[1]
- else:
- word_part = href
- # 去掉锚点
- word_part = word_part.split('#', 1)[0]
- # 去掉查询参数 (如果有的话)
- word_part = word_part.split('?', 1)[0]
- # 去掉路径中的文件名部分,只保留词 (简单处理)
- # 例如 entry://Food, dish-topic food -> Food, dish-topic food
- # 例如 entry://red -> red
- # 例如 entry://inside#inside__9__a -> inside
- related_word = word_part.strip()
- if related_word:
- related_in_def_list.append(related_word)
-
- # 过滤掉空定义
- if def_en_text or def_cn_text:
- defs_list.append(Definition(en=def_en_text, cn=def_cn_text, related_words=related_in_def_list))
-
- if defs_list:
- sense_dict['definitions'] = defs_list
-
- # 例子 (EXAMPLE)
- examples_list = []
- example_tags = sense_tag.find_all(class_='EXAMPLE')
- for ex_tag in example_tags:
- if not isinstance(ex_tag, Tag):
- continue
- example_dict: Dict[str, Any] = {}
-
- # 英文例句 (english)
- en_span_tag = ex_tag.find(class_='english')
- if en_span_tag:
- example_dict['en'] = self._extract_text_with_links(en_span_tag) # 处理内部链接
-
- # 中文翻译 (cn_txt)
- cn_span_tag = ex_tag.find(class_='cn_txt')
- if cn_span_tag:
- example_dict['cn'] = cn_span_tag.get_text(strip=True)
-
- # 搭配 (COLLOINEXA)
- collocation_tag = ex_tag.find(class_='COLLOINEXA')
- if collocation_tag:
- # 搭配文本可能需要特殊处理,因为它可能在 en 文本中被高亮
- # 这里简单提取文本
- example_dict['collocation'] = collocation_tag.get_text(strip=True)
-
- # 例子内链接词 (crossRef in example)
- related_in_ex_list = []
- # 查找例子文本内的 defRef 或 crossRef 链接
- if en_span_tag:
- ref_tags_in_ex = en_span_tag.find_all('a', class_=['defRef', 'crossRef'])
- for ref_tag in ref_tags_in_ex:
- ref_text = ref_tag.get_text(strip=True)
- if ref_text:
- related_in_ex_list.append(ref_text)
- if related_in_ex_list:
- example_dict['related_words_in_example'] = related_in_ex_list
-
- # --- 示例音频提取 (关键修改点) ---
- # 查找示例音频链接,匹配 href="sound://..."
- ex_audio_tag = ex_tag.find('a', class_='speaker exafile', href=lambda x: x and x.startswith('sound://'))
- if not ex_audio_tag:
- # 更宽松的匹配 class 包含 speaker 和 exafile
- ex_audio_tag = ex_tag.find('a', class_=lambda x: x and 'speaker' in x and 'exafile' in x, href=lambda x: x and x.startswith('sound://'))
-
- if ex_audio_tag:
- audio_href = ex_audio_tag.get('href', '')
- if audio_href.startswith('sound://'):
- example_dict['audio'] = audio_href.replace('sound://', '', 1)
-
- if example_dict.get('en') or example_dict.get('cn'): # 只添加有内容的例子
- examples_list.append(Example(**example_dict))
-
- if examples_list:
- sense_dict['examples'] = examples_list
-
- if sense_dict.get('definitions') or sense_dict.get('examples'): # 只添加有定义或例子的 Sense
- senses_list.append(Sense(**sense_dict))
-
- if senses_list:
- entry['senses'] = senses_list
-
- word_metadata['dict_list'].append(entry)
-
- # etym
- etym_tag = soup.find('span', class_='etym')
- if etym_tag:
- etym_map: Dict[str, Any] = {'item': []}
- asset_intro = etym_tag.find('span', class_='asset_intro')
- if asset_intro:
- etym_map['intro'] = asset_intro.get_text(strip=True)
-
- head_tag = etym_tag.find('span', class_='Head')
- if head_tag:
- hw_tag = head_tag.find('span', class_='HWD')
- if hw_tag:
- etym_map['headword'] = hw_tag.get_text(strip=True)
- hom_tag = head_tag.find('span', class_='HOMNUM')
- if hom_tag:
- etym_map['hom_num'] = hom_tag.get_text(strip=True)
-
- sense_tags = etym_tag.find_all('span', class_='Sense')
- for sense_tag in sense_tags:
- item: Dict[str, Any] = {}
- lang_tag = sense_tag.find('span', class_='LANG')
- if lang_tag:
- item['language'] = lang_tag.get_text(strip=True).strip()
-
- origin_tag = sense_tag.find('span', class_='ORIGIN')
- if origin_tag:
- item['origin'] = origin_tag.get_text(strip=True).strip()
-
- etym_map['item'].append(EtymologyItem(**item))
-
- word_metadata['etymology'] = Etymology(**etym_map)
-
- # --- 7. 创建 WordMetaData 对象 ---
- if word_metadata:
- try:
- metadata = WordMetaData(**word_metadata)
- return metadata, images_info # images_info 在此方法中未填充
- except Exception as e:
- print(f"WordMetaData 验证失败,原始数据: {json.dumps(word_metadata, ensure_ascii=False, indent=2)}")
- print(f"验证错误: {e}")
- # 可以选择返回 None 或者不验证的 dict
- return None, images_info
- else:
- return None, images_info
-
- except Exception as e:
- print(f"解析 HTML 时出错: {e}")
- import traceback
- traceback.print_exc() # 打印详细错误信息
- return None, images_info
-
-
- def _extract_text_with_links(self, tag: Tag) -> str:
- """提取标签文本,保留内部链接词的文本,但不保留 HTML 结构。
- 例如: 'a hard round fruit' -> 'a hard round fruit'
- """
- if not tag:
- return ""
- parts = []
- for content in tag.contents:
- if isinstance(content, str):
- parts.append(content.strip())
- elif hasattr(content, 'name') and content.name == 'a' and 'defRef' in content.get('class', []):
- # 提取链接词的文本
- parts.append(content.get_text(strip=True))
- elif hasattr(content, 'name'): # 其他标签,递归提取文本
- parts.append(self._extract_text_with_links(content))
- # 忽略其他非标签、非文本内容
- return ' '.join(filter(None, parts)) # 过滤空字符串并用空格连接
-
- def save_entry_images(self, entry_id: int, word: str, images_info: List[Dict]) -> None:
- """
- 保存词条的图片信息到 dict_media 表
- """
- from psycopg2.extras import Json
+ def parse_definition_to_metadata(self, html_str: str) -> Tuple[Dict, List[Dict]]:
+ """解析HTML定义字符串,提取元数据"""
+ soup = BeautifulSoup(html_str, 'html.parser')
+
+ # 提取发音
+ pronunciations = []
+ pron_links = soup.find_all('a', class_='pronounce')
+ for link in pron_links:
+ pron_type = link.get('data-rel', '')
+ pron_url = link.get('href', '')
+ if pron_type and pron_url:
+ pronunciations.append(Pronunciation(type=pron_type, url=pron_url))
+
+ # 提取词性
+ pos_elements = soup.find_all('span', class_='pos')
+ pos_list = [pos.get_text().strip() for pos in pos_elements]
+
+ # 提取释义
+ definitions = []
+ sense_elements = soup.find_all('span', class_='def')
+ for sense in sense_elements:
+ definition_text = sense.get_text().strip()
+ if definition_text:
+ definitions.append(Definition(text=definition_text))
+
+ # 提取例句
+ examples = []
+ example_elements = soup.find_all('span', class_='example')
+ for example in example_elements:
+ example_text = example.get_text().strip()
+ if example_text:
+ examples.append(Example(text=example_text))
+
+ # 构建元数据
+ metadata = {
+ "pronunciations": [p.model_dump() for p in pronunciations],
+ "parts_of_speech": pos_list,
+ "definitions": [d.model_dump() for d in definitions],
+ "examples": [e.model_dump() for e in examples]
+ }
+
+ # 提取媒体信息
+ media_info = []
+ img_elements = soup.find_all('img')
+ for img in img_elements:
+ src = img.get('src', '')
+ if src:
+ media_info.append({
+ 'type': 'image',
+ 'src': src
+ })
+
+ return metadata, media_info
+
+ def save_dict_media(self, dict_media: Dict[str, bytes], media_references: List[Dict], entry_ids: Dict[str, int]) -> None:
+ """保存词典媒体文件到数据库"""
+ from mysql.connector import Error
import hashlib
- cursor = self.conn.cursor()
+ cursor = self.conn.cursor(dictionary=True)
try:
- for img_info in images_info:
- # 检查是否存在 crossref_full_base64 并尝试解码
- image_data = None
- if 'crossref_full_base64' in img_info:
- try:
- # Base64 字符串可能包含前缀 (如 data:image/jpeg;base64,...)
- b64_string = img_info['crossref_full_base64']
- if b64_string.startswith('data:'):
- # 分割并获取实际的 base64 数据部分
- header, b64_data = b64_string.split(',', 1)
- else:
- # 如果没有前缀,整个字符串就是 base64 数据
- b64_data = b64_string
+ for media_ref in media_references:
+ word = media_ref['word']
+ filename = media_ref['filename']
+ file_type = media_ref['type']
- # 解码 Base64 字符串为二进制数据
- image_data = base64.b64decode(b64_data)
- # print(f"成功解码 crossref 图片: {img_info.get('filename', 'unknown')}")
- except Exception as e:
- print(
- f"解码 crossref_full_base64 数据失败 (文件名: {img_info.get('filename', 'unknown')}): {e}")
- # 如果解码失败,可以选择跳过这个图片或记录错误
- # continue # 跳过当前图片
- # 或者保留 image_data 为 None,后续逻辑会处理
+ # 查找对应的 entry_id
+ entry_id = entry_ids.get(word)
+ if not entry_id:
+ continue
- # 如果上面解码成功,使用解码后的 image_data;否则检查是否已有 'image_data' (来自 extract_images_from_definition)
- if image_data is None and 'image_data' in img_info:
- image_data = img_info['image_data']
+ # 查找文件数据
+ # 处理文件名,确保与 dict_media 中的键匹配
+ normalized_filename = filename.replace('\\', '/').lstrip('/')
+ file_data = dict_media.get(normalized_filename)
+ if not file_data:
+ # 尝试其他可能的文件名变体
+ alt_filename = filename.lstrip('/')
+ file_data = dict_media.get(alt_filename)
+ if not file_data:
+ print(f"警告: 找不到媒体文件 {filename} 的数据")
+ continue
- filename = img_info['filename']
- src = img_info['src']
- file_type = img_info['type']
- details = {
- 'sense_id': img_info.get('sense_id'),
- 'src': src,
- 'word': word,
- 'mime_type': img_info.get('mime_type'),
- 'show_id': img_info.get('crossref_showid'),
- 'crossref_title': img_info.get('crossref_title'),
- }
- # 移除 details 中的 None 值 (可选,保持数据整洁)
- details = {k: v for k, v in details.items() if v is not None}
+ # 计算文件哈希
+ file_hash = hashlib.md5(file_data).hexdigest()
- # 检查是否已存在相同的图片记录
+ # 检查数据库中是否已存在相同的文件
cursor.execute('''
- SELECT id
- FROM dict_media
- WHERE file_name = %s
- AND dict_id = %s
- ''', (filename, entry_id))
+ SELECT id FROM dict_media
+ WHERE file_hash = %s AND file_type = %s
+ ''', (file_hash, file_type))
+ existing_record = cursor.fetchone()
- if cursor.fetchone() is None:
- # 处理图片数据
- if image_data:
- # 有实际的图片二进制数据(base64 解码后的数据)
- file_hash = hashlib.sha256(image_data).hexdigest()
+ if existing_record:
+ # 如果文件已存在,只需关联到当前词条
+ media_id = existing_record['id']
+ cursor.execute('''
+ INSERT IGNORE INTO dict_entry_media (entry_id, media_id)
+ VALUES (%s, %s)
+ ''', (entry_id, media_id))
+ else:
+ # 插入新文件记录
+ cursor.execute('''
+ INSERT INTO dict_media (filename, file_type, file_data, file_hash)
+ VALUES (%s, %s, %s, %s)
+ ''', (filename, file_type, file_data, file_hash))
+ media_id = cursor.lastrowid
- cursor.execute('''
- INSERT INTO dict_media (dict_id, file_name, file_type, file_data, file_hash, details)
- VALUES (%s, %s, %s, %s, %s, %s)
- ''', (entry_id, filename, file_type, psycopg2.Binary(image_data), file_hash, Json(details)))
- else:
- # 没有实际图片数据,可能是文件路径引用
- file_hash = hashlib.sha256(src.encode()).hexdigest()
+ # 关联到词条
+ cursor.execute('''
+ INSERT IGNORE INTO dict_entry_media (entry_id, media_id)
+ VALUES (%s, %s)
+ ''', (entry_id, media_id))
- cursor.execute('''
- INSERT INTO dict_media (dict_id, file_name, file_type, file_data, file_hash, details)
- VALUES (%s, %s, %s, %s, %s)
- ''', (entry_id, filename, file_type, src, file_hash, Json(details)))
+ # 提取详细信息(如果有的话)
+ details = {}
+ if file_type == 'image':
+ # 对于图片,可以提取一些基本信息
+ details['size'] = len(file_data)
+ # 这里可以添加更多图片处理逻辑
- except Exception as e:
- print(f"保存词条 '{word}' 的图片信息时出错: {e}")
+ # 更新媒体记录的详细信息
+ if details:
+ cursor.execute('''
+ UPDATE dict_media
+ SET details = %s
+ WHERE id = %s
+ ''', (json.dumps(details), media_id))
+
+ except Error as e:
+ print(f"保存媒体文件时出错: {e}")
self.conn.commit()
cursor.close()
+ def export_media_files(self, export_dir: str) -> None:
+ """导出媒体文件到本地目录"""
+ cursor = self.conn.cursor(dictionary=True)
+
+ try:
+ # 创建导出目录
+ os.makedirs(export_dir, exist_ok=True)
+
+ # 查询所有媒体文件
+ cursor.execute('SELECT id, filename, file_type, file_data FROM dict_media')
+ media_records = cursor.fetchall()
+
+ for record in media_records:
+ media_id, filename, file_type, file_data = record['id'], record['filename'], record['file_type'], record['file_data']
+ if file_data:
+ # 确保文件名安全
+ safe_filename = self._sanitize_filename(filename)
+ file_path = os.path.join(export_dir, safe_filename)
+
+ # 写入文件
+ with open(file_path, 'wb') as f:
+ f.write(file_data)
+
+ print(f"导出媒体文件: {file_path}")
+
+ except Error as e:
+ print(f"导出媒体文件时出错: {e}")
+
+ cursor.close()
+
+ def _sanitize_filename(self, filename: str) -> str:
+ """清理文件名,确保安全"""
+ # 移除或替换不安全的字符
+ unsafe_chars = '<>:"/\\|?*'
+ for char in unsafe_chars:
+ filename = filename.replace(char, '_')
+ return filename
+
def close(self):
"""关闭数据库连接"""
if self.conn:
@@ -1109,10 +434,10 @@ def main():
# 数据库配置
db_config = {
'host': 'localhost',
- 'database': 'postgres',
+ 'database': 'mysql',
'user': 'root',
'password': 'root',
- 'port': 5432
+ 'port': 3306
}
# 文件路径
@@ -1134,7 +459,7 @@ def main():
# 可选:导出媒体文件到本地目录
# parser.export_media_files('./exported_media')
- except Exception as e:
+ except Error as e:
print(f"解析过程中出现错误: {e}")
finally:
parser.close()
diff --git a/assets/generate_and_save_coupons.py b/assets/generate_and_save_coupons.py
new file mode 100644
index 0000000..f8d02a5
--- /dev/null
+++ b/assets/generate_and_save_coupons.py
@@ -0,0 +1,179 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Script to generate and save coupons to the database
+"""
+
+import os
+import sys
+import random
+from datetime import datetime, timedelta
+
+# Add the backend directory to the path so we can import modules
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
+
+# Import required modules
+from sqlalchemy import create_engine
+from sqlalchemy.orm import sessionmaker
+from sqlalchemy.exc import SQLAlchemyError
+
+from backend.app.admin.model.coupon import Coupon
+from backend.utils.snowflake import snowflake
+from backend.core.conf import settings, get_db_uri
+
+
+def generate_coupon_codes(prefix: str, quantity: int):
+ """
+ Generate coupon codes with specified prefix and quantity.
+
+ Format: [PREFIX][NUMBER] - Total 6 characters
+ Example: A12345, TEST0, XYZ999
+
+ Args:
+ prefix (str): The letter prefix for the coupon codes (should be uppercase)
+ quantity (int): Number of coupon codes to generate
+
+ Returns:
+ list: List of generated coupon codes
+ """
+ if not prefix.isalpha() or not prefix.isupper():
+ raise ValueError("Prefix must be uppercase letters only")
+
+ if len(prefix) == 0 or len(prefix) > 5:
+ raise ValueError("Prefix must be 1-5 characters long")
+
+ if quantity <= 0:
+ raise ValueError("Quantity must be greater than 0")
+
+ # Calculate number of digits based on prefix length (total 6 characters)
+ num_digits = 6 - len(prefix)
+
+ # Maximum possible combinations
+ max_combinations = 10 ** num_digits
+
+ if quantity > max_combinations:
+ raise ValueError(f"With prefix '{prefix}' (length {len(prefix)}), can only generate {max_combinations} unique codes (0 to {max_combinations - 1})")
+
+ codes = []
+ # Generate incremental numbers starting from 0
+ for i in range(quantity):
+ # Format with leading zeros to make it the required number of digits
+ formatted_number = f"{i:0{num_digits}d}"
+ # Combine prefix with formatted number
+ coupon_code = f"{prefix}{formatted_number}"
+ codes.append(coupon_code)
+
+ return codes
+
+
+def save_coupons_to_db(prefix: str, quantity: int, coupon_type: str, points: int, expire_days: int = None):
+ """
+ Generate and save coupons to the database.
+
+ Coupon codes are always 6 characters total:
+ - 1-letter prefix: 5 digits (up to 100000 codes: A00000-A99999)
+ - 4-letter prefix: 2 digits (up to 100 codes: TEST00-TEST99)
+ - 5-letter prefix: 1 digit (up to 10 codes: ABCDE0-ABCDE9)
+
+ Args:
+ prefix (str): The letter prefix for the coupon codes
+ quantity (int): Number of coupon codes to generate
+ coupon_type (str): Type of the coupons
+ points (int): Points value of the coupons
+ expire_days (int, optional): Days until expiration. If None, no expiration.
+ """
+ # Create database engine and session
+ db_url = get_db_uri(settings)
+ # Replace asyncmy with mysql+mysqlconnector for synchronous connection
+ sync_db_url = db_url.replace('mysql+asyncmy', 'mysql+mysqlconnector')
+
+ try:
+ engine = create_engine(sync_db_url, echo=False)
+ SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
+ db = SessionLocal()
+
+ # Generate coupon codes
+ codes = generate_coupon_codes(prefix, quantity)
+
+ # Create coupon objects
+ coupons = []
+ for code in codes:
+ # Generate snowflake ID
+ coupon_id = snowflake.generate()
+
+ # Calculate expiration date if needed
+ expires_at = None
+ if expire_days is not None and expire_days > 0:
+ expires_at = datetime.now() + timedelta(days=expire_days)
+
+ # Create coupon object
+ # Note: id is auto-generated by snowflake, but we want to use our own snowflake generator
+ coupon = Coupon(
+ code=code,
+ type=coupon_type,
+ points=points,
+ expires_at=expires_at
+ )
+ # Set the id manually after creation
+ coupon.id = coupon_id
+ coupons.append(coupon)
+
+ # Bulk insert coupons
+ db.add_all(coupons)
+ db.commit()
+
+ print(f"Successfully saved {len(coupons)} coupons to the database.")
+ print(f"Prefix: {prefix}, Type: {coupon_type}, Points: {points}")
+ if expire_days:
+ print(f"Expires in: {expire_days} days")
+
+ # Display first 5 coupons as examples
+ print("\nSample coupons generated:")
+ for coupon in coupons[:5]:
+ print(f" ID: {coupon.id}, Code: {coupon.code}")
+
+ db.close()
+
+ except SQLAlchemyError as e:
+ print(f"Database error: {e}")
+ if 'db' in locals():
+ db.rollback()
+ db.close()
+ except Exception as e:
+ print(f"Error: {e}")
+ if 'db' in locals():
+ db.close()
+
+
+def main():
+ """Main function to demonstrate usage"""
+ print("Coupon Generator and Database Saver")
+ print("=" * 40)
+
+ # Example: Generate and save coupons with different prefixes
+ try:
+ # Single character prefix (5 digits, incremental from 00000)
+ # print("Generating coupons with single character prefix 'A'...")
+ # save_coupons_to_db('A', 5, 'NORMAL', 100, 30)
+ # print("\n" + "-" * 40 + "\n")
+
+ # 4-character prefix (2 digits, incremental from 00)
+ print("Generating coupons with 4-character prefix 'TEST'...")
+ save_coupons_to_db('VIP', 5, 'test', 1000, 60)
+ print("\n" + "-" * 40 + "\n")
+
+ # 3-character prefix (3 digits, incremental from 000)
+ # print("Generating coupons with 3-character prefix 'XYZ'...")
+ # save_coupons_to_db('XYZ', 3, 'SPECIAL', 500, 15)
+ # print("\n" + "-" * 40 + "\n")
+
+ # 5-character prefix (1 digit, incremental from 0)
+ # print("Generating coupons with 5-character prefix 'ABCDE'...")
+ # save_coupons_to_db('ABCDE', 5, 'PREMIUM', 2000, 90)
+
+ except Exception as e:
+ print(f"Error in main: {e}")
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/backend/alembic.ini b/backend/alembic.ini
index cb69830..a08f379 100755
--- a/backend/alembic.ini
+++ b/backend/alembic.ini
@@ -5,7 +5,7 @@
script_location = alembic
# template used to generate migration files
-file_template = %%(year)d-%%(month).2d-%%(day).2d_%%(hour).2d-%%(minute).2d_%%(rev)s_%%(slug)s
+# file_template = %%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
@@ -21,7 +21,7 @@ prepend_sys_path = .
# max length of characters to apply to the
# "slug" field
-# truncate_slug_length = 40
+# max_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
@@ -32,25 +32,25 @@ prepend_sys_path = .
# versions/ directory
# sourceless = false
-# version location specification; This defaults
-# to alembic/versions. When using multiple version
-# directories, initial revisions must be specified with --version-path.
-# The path separator used here should be the separator specified by "version_path_separator"
-# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions
+# version number format
+version_num_format = %04d
# version path separator; As mentioned above, this is the character used to split
-# version_locations. Valid values are:
+# version_locations. The default within new alembic.ini files is "os", which uses
+# os.pathsep. If this key is omitted entirely, it falls back to the legacy
+# behavior of splitting on spaces and/or commas.
+# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
-version_path_separator = os # default: use os.pathsep
+version_path_separator = os
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
-sqlalchemy.url = postgresql+asyncpg://root:root@127.0.0.1:5432/db
+sqlalchemy.url = mysql+asyncmy://root:root@127.0.0.1:3306/app # Changed from postgresql+asyncpg to mysql+asyncmy
[post_write_hooks]
@@ -68,26 +68,23 @@ sqlalchemy.url = postgresql+asyncpg://root:root@127.0.0.1:5432/db
[loggers]
keys = root,sqlalchemy,alembic
-[handlers]
-keys = console
-
-[formatters]
-keys = generic
-
[logger_root]
level = WARN
handlers = console
-qualname =
+qalname = root
[logger_sqlalchemy]
level = WARN
handlers =
-qualname = sqlalchemy.engine
+qalname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
-qualname = alembic
+qalname = alembic
+
+[handlers]
+keys = console
[handler_console]
class = StreamHandler
@@ -95,6 +92,9 @@ args = (sys.stderr,)
level = NOTSET
formatter = generic
+[formatters]
+keys = generic
+
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
-datefmt = %H:%M:%S
+datefmt = %H:%M:%S
\ No newline at end of file
diff --git a/backend/app/admin/model/audit_log.py b/backend/app/admin/model/audit_log.py
index 9c71cb0..aabbfa9 100755
--- a/backend/app/admin/model/audit_log.py
+++ b/backend/app/admin/model/audit_log.py
@@ -3,11 +3,11 @@
from datetime import datetime
from typing import Optional, List
-from sqlalchemy import Integer, BigInteger, Text, String, Numeric, Float, DateTime, ForeignKey, Index
-from sqlalchemy.dialects.postgresql import JSONB, ARRAY
+from sqlalchemy import Integer, BigInteger, Text, String, Numeric, Float, DateTime, ForeignKey, Index, func # Added func import
+from sqlalchemy.dialects.mysql import JSON as MySQLJSON # Changed from postgresql.JSONB to mysql.JSON
from sqlalchemy.orm import Mapped, mapped_column
-from backend.common.model import snowflake_id_key, Base
+from backend.common.model import Base, snowflake_id_key
class AuditLog(Base):
@@ -16,9 +16,9 @@ class AuditLog(Base):
id: Mapped[snowflake_id_key] = mapped_column(init=False, primary_key=True)
api_type: Mapped[str] = mapped_column(String(20), nullable=False, comment="API类型: recognition embedding assessment")
model_name: Mapped[str] = mapped_column(String(50), nullable=False, comment="模型名称")
- request_data: Mapped[Optional[dict]] = mapped_column(JSONB, comment="请求数据")
- response_data: Mapped[Optional[dict]] = mapped_column(JSONB, comment="响应数据")
- token_usage: Mapped[Optional[dict]] = mapped_column(JSONB, comment="消耗的token数量")
+ request_data: Mapped[Optional[dict]] = mapped_column(MySQLJSON, comment="请求数据")
+ response_data: Mapped[Optional[dict]] = mapped_column(MySQLJSON, comment="响应数据")
+ token_usage: Mapped[Optional[dict]] = mapped_column(MySQLJSON, comment="消耗的token数量")
cost: Mapped[Optional[float]] = mapped_column(Numeric(10, 5), comment="API调用成本")
duration: Mapped[Optional[float]] = mapped_column(Float, comment="调用耗时(秒)")
status_code: Mapped[Optional[int]] = mapped_column(Integer, comment="HTTP状态码")
@@ -40,10 +40,11 @@ class AuditLog(Base):
class DailySummary(Base):
__tablename__ = 'daily_summary'
- id: Mapped[snowflake_id_key] = mapped_column(init=False, primary_key=True)
+ id: Mapped[snowflake_id_key] = mapped_column(BigInteger, init=False, primary_key=True)
user_id: Mapped[Optional[int]] = mapped_column(BigInteger, ForeignKey('wx_user.id'), comment="调用用户ID")
- image_ids: Mapped[List[str]] = mapped_column(ARRAY(Text), default=None, comment="图片ID列表")
- thumbnail_ids: Mapped[List[str]] = mapped_column(ARRAY(Text), default=None, comment="图片缩略图列表")
+ # MySQL doesn't have ARRAY type, so we'll use JSON to store lists
+ image_ids: Mapped[Optional[List[str]]] = mapped_column(MySQLJSON, default=None, comment="图片ID列表") # Changed from ARRAY to JSON
+ thumbnail_ids: Mapped[Optional[List[str]]] = mapped_column(MySQLJSON, default=None, comment="图片缩略图列表") # Changed from ARRAY to JSON
summary_time: Mapped[datetime] = mapped_column(DateTime, default=None, comment="总结的时间")
# 索引优化
diff --git a/backend/app/admin/model/dict.py b/backend/app/admin/model/dict.py
index 5a0b4a0..25644ff 100755
--- a/backend/app/admin/model/dict.py
+++ b/backend/app/admin/model/dict.py
@@ -1,48 +1,46 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
-from typing import Optional
+from datetime import datetime
from enum import Enum
+from typing import Optional, List
-from sqlalchemy import String, Column, LargeBinary, ForeignKey, BigInteger, Index, func, JSON, Text, Numeric, Enum as SQLEnum
-from sqlalchemy.dialects.postgresql import JSONB
-from sqlalchemy.orm import Mapped, mapped_column, declared_attr
+from sqlalchemy import String, Text, DateTime, func, BigInteger, Index, ForeignKey
+from sqlalchemy.dialects.mysql import JSON as MySQLJSON # Changed from postgresql.JSONB to mysql.JSON
+from sqlalchemy.orm import Mapped, mapped_column
+from sqlalchemy.sql.sqltypes import LargeBinary
+from backend.common.model import Base, id_key, DataClassBase
from backend.app.admin.schema.dict import WordMetaData
from backend.app.admin.schema.pydantic_type import PydanticType
-from backend.common.model import snowflake_id_key, DataClassBase
class DictionaryEntry(DataClassBase):
"""词典条目表"""
- @declared_attr.directive
- def __tablename__(cls) -> str:
- return "dict_entry"
+ __tablename__ = "dict_entry"
- id: Mapped[int] = mapped_column(primary_key=True, init=True, autoincrement=True)
+ id: Mapped[id_key] = mapped_column(BigInteger, init=False, primary_key=True)
word: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
definition: Mapped[Optional[str]] = mapped_column(Text, default=None)
details: Mapped[Optional[WordMetaData]] = mapped_column(PydanticType(pydantic_type=WordMetaData), default=None) # 其他可能的字段(根据实际需求添加)
__table_args__ = (
- Index('idx_dict_word', word),
+ Index('idx_dict_word', 'word'),
)
class DictionaryMedia(DataClassBase):
"""词典媒体资源表"""
- @declared_attr.directive
- def __tablename__(cls) -> str:
- return "dict_media"
+ __tablename__ = "dict_media"
- id: Mapped[int] = mapped_column(primary_key=True, init=True, autoincrement=True)
+ id: Mapped[id_key] = mapped_column(BigInteger, init=False, primary_key=True)
file_name: Mapped[str] = mapped_column(String(255), nullable=False)
file_type: Mapped[str] = mapped_column(String(50), nullable=False) # 'audio', 'image'
- dict_id: Mapped[Optional[int]] = mapped_column(BigInteger, ForeignKey("dict_entry.id"), default=None)
- file_data: Mapped[Optional[bytes]] = mapped_column(LargeBinary, default=None)
+ dict_id: Mapped[Optional[int]] = mapped_column(BigInteger, default=None)
+ file_data: Mapped[Optional[bytes]] = mapped_column(Text, default=None) # Changed from LargeBinary to Text for MySQL compatibility
file_hash: Mapped[Optional[str]] = mapped_column(String(64), default=None)
- details: Mapped[Optional[dict]] = mapped_column(JSONB(astext_type=Text()), default=None, comment="其他信息") # 其他信息
+ details: Mapped[Optional[dict]] = mapped_column(MySQLJSON, default=None, comment="其他信息") # Changed from JSONB to MySQLJSON
__table_args__ = (
Index('idx_media_filename', file_name),
@@ -72,19 +70,16 @@ class DictCategory(str, Enum):
class YdDict(DataClassBase):
"""YD词典查询结果表"""
-
- @declared_attr.directive
- def __tablename__(cls) -> str:
- return "yd_dict"
+ __tablename__ = "yd_dict"
id: Mapped[int] = mapped_column(primary_key=True, init=False, autoincrement=True)
word: Mapped[str] = mapped_column(String(255), nullable=False, comment="查询的词或词组")
uk_phone: Mapped[str] = mapped_column(String(50), nullable=True, comment="uk 音标")
us_phone: Mapped[str] = mapped_column(String(50), nullable=True, comment="us 音标")
- lang: Mapped[YdDictLanguage] = mapped_column(SQLEnum(YdDictLanguage), nullable=False, comment="查询的语言")
- dict_type: Mapped[YdDictType] = mapped_column(SQLEnum(YdDictType), nullable=False, comment="词典类型(英中,英英)")
- category: Mapped[DictCategory] = mapped_column(SQLEnum(DictCategory), nullable=False, comment="词典分类(一般词典,少儿词典)")
- query_result: Mapped[dict] = mapped_column(JSONB(astext_type=Text()), nullable=False, comment="JSON结构的查询结果")
+ lang: Mapped[YdDictLanguage] = mapped_column(String(20), nullable=False, comment="查询的语言")
+ dict_type: Mapped[YdDictType] = mapped_column(String(20), nullable=False, comment="词典类型(英中,英英)")
+ category: Mapped[DictCategory] = mapped_column(String(20), nullable=False, comment="词典分类(一般词典,少儿词典)")
+ query_result: Mapped[dict] = mapped_column(MySQLJSON, nullable=False, comment="JSON结构的查询结果")
__table_args__ = (
Index('idx_yd_dict_word', word),
@@ -94,20 +89,17 @@ class YdDict(DataClassBase):
class YdMedia(DataClassBase):
"""YD词典媒体资源表(发音文件等)"""
-
- @declared_attr.directive
- def __tablename__(cls) -> str:
- return "yd_media"
+ __tablename__ = "yd_media"
id: Mapped[int] = mapped_column(primary_key=True, init=False, autoincrement=True)
file_name: Mapped[str] = mapped_column(String(255), nullable=False, comment="文件名")
file_type: Mapped[str] = mapped_column(String(50), nullable=False, comment="文件类型(audio, image等)")
- yd_dict_id: Mapped[int] = mapped_column(BigInteger, ForeignKey("yd_dict.id"), nullable=False, comment="关联的YD词典条目")
+ yd_dict_id: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="关联的YD词典条目")
file_data: Mapped[Optional[bytes]] = mapped_column(LargeBinary, default=None, comment="文件二进制数据")
file_url: Mapped[Optional[str]] = mapped_column(String(500), default=None, comment="文件URL(如果存储在外部)")
phonetic_symbol: Mapped[Optional[str]] = mapped_column(String(100), default=None, comment="关联的音标")
usage_type: Mapped[Optional[str]] = mapped_column(String(50), default=None, comment="用途类型(word_pronunciation, example_sentence等)")
- details: Mapped[Optional[dict]] = mapped_column(JSONB(astext_type=Text()), default=None, comment="其他信息")
+ details: Mapped[Optional[dict]] = mapped_column(MySQLJSON, default=None, comment="其他信息")
__table_args__ = (
Index('idx_yd_media_filename', file_name),
diff --git a/backend/app/admin/model/feedback.py b/backend/app/admin/model/feedback.py
index a243196..f57fb61 100755
--- a/backend/app/admin/model/feedback.py
+++ b/backend/app/admin/model/feedback.py
@@ -1,20 +1,20 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import datetime
-from typing import Optional, List
+from typing import Optional
-from sqlalchemy import String, Text, DateTime, ForeignKey, Index, BigInteger
-from sqlalchemy.dialects.postgresql import JSONB
-from sqlalchemy.orm import Mapped, mapped_column, relationship
+from sqlalchemy import String, Text, DateTime, func, BigInteger, Index
+from sqlalchemy.dialects.mysql import JSON as MySQLJSON # Changed from postgresql.JSONB to mysql.JSON
+from sqlalchemy.orm import Mapped, mapped_column
-from backend.common.model import snowflake_id_key, Base
+from backend.common.model import Base, id_key, snowflake_id_key
class Feedback(Base):
__tablename__ = 'feedback'
id: Mapped[snowflake_id_key] = mapped_column(BigInteger, init=False, primary_key=True)
- user_id: Mapped[int] = mapped_column(BigInteger, ForeignKey('wx_user.id'), nullable=False, comment='用户ID')
+ user_id: Mapped[int] = mapped_column(BigInteger, nullable=False, comment='用户ID')
content: Mapped[str] = mapped_column(Text, nullable=False, comment='反馈内容')
contact_info: Mapped[Optional[str]] = mapped_column(String(255), nullable=True, comment='联系方式')
category: Mapped[Optional[str]] = mapped_column(String(50), nullable=True, comment='反馈分类')
diff --git a/backend/app/admin/model/file.py b/backend/app/admin/model/file.py
index 01b52f8..68cca54 100755
--- a/backend/app/admin/model/file.py
+++ b/backend/app/admin/model/file.py
@@ -1,12 +1,14 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
+from datetime import datetime
from typing import Optional
-from sqlalchemy import BigInteger, Text, String, Index, DateTime, LargeBinary
-from sqlalchemy.dialects.postgresql import JSONB
-from sqlalchemy.orm import mapped_column, Mapped
+from sqlalchemy import String, Text, DateTime, func, BigInteger, Index
+from sqlalchemy.dialects.mysql import MEDIUMBLOB
+from sqlalchemy.dialects.mysql import JSON as MySQLJSON # Changed from postgresql.JSONB to mysql.JSON
+from sqlalchemy.orm import Mapped, mapped_column
-from backend.common.model import snowflake_id_key, Base
+from backend.common.model import Base, id_key, snowflake_id_key
class File(Base):
@@ -18,11 +20,10 @@ class File(Base):
content_type: Mapped[Optional[str]] = mapped_column(String(100), nullable=True) # MIME类型
file_size: Mapped[int] = mapped_column(BigInteger, nullable=False) # 文件大小(字节)
storage_path: Mapped[Optional[str]] = mapped_column(Text, nullable=True) # 存储路径(非数据库存储时使用)
- file_data: Mapped[Optional[bytes]] = mapped_column(LargeBinary, default=None, nullable=True) # 文件二进制数据(数据库存储时使用)
+ file_data: Mapped[Optional[bytes]] = mapped_column(MEDIUMBLOB, default=None, nullable=True) # 文件二进制数据(数据库存储时使用)
storage_type: Mapped[str] = mapped_column(String(20), nullable=False, default='database') # 存储类型: database, local, s3
- metadata_info: Mapped[Optional[dict]] = mapped_column(JSONB(astext_type=Text()), default=None, comment="元数据信息")
+ metadata_info: Mapped[Optional[dict]] = mapped_column(MySQLJSON, default=None, comment="元数据信息")
- # 表参数 - 包含所有必要的约束
__table_args__ = (
- Index('idx_file_hash', file_hash),
- )
+ Index('idx_file_name', file_name),
+ )
\ No newline at end of file
diff --git a/backend/app/admin/model/order.py b/backend/app/admin/model/order.py
index 3a0c7e8..eb86d13 100755
--- a/backend/app/admin/model/order.py
+++ b/backend/app/admin/model/order.py
@@ -3,11 +3,11 @@
from datetime import datetime
from typing import Optional
-from sqlalchemy import String, Column, BigInteger, ForeignKey, Boolean, DateTime, Index, func, JSON, Text, Numeric
-from sqlalchemy.dialects.postgresql import JSONB
+from sqlalchemy import String, Numeric, DateTime, func, BigInteger, Index, ForeignKey, Boolean, Text
+from sqlalchemy.dialects.mysql import JSON as MySQLJSON # Changed from postgresql.JSONB to mysql.JSON
from sqlalchemy.orm import Mapped, mapped_column
-from backend.common.model import snowflake_id_key, Base
+from backend.common.model import Base, id_key, snowflake_id_key
class Order(Base):
@@ -51,7 +51,7 @@ class FreezeLog(Base):
user_id: Mapped[int] = mapped_column(BigInteger, ForeignKey('wx_user.id'), nullable=False)
order_id: Mapped[int] = mapped_column(BigInteger, ForeignKey('order.id'), nullable=False)
amount: Mapped[int] = mapped_column(BigInteger, comment='冻结次数')
- reason: Mapped[Optional[str]] = mapped_column(Text, comment='冻结原因')
+ reason: Mapped[Optional[str]] = mapped_column(Text, default=None, comment='冻结原因') # 添加默认值
status: Mapped[str] = mapped_column(String(16), default='pending', comment='状态:pending/confirmed/cancelled')
__table_args__ = (
@@ -63,13 +63,13 @@ class FreezeLog(Base):
class UsageLog(Base):
__tablename__ = 'usage_log'
- id: Mapped[snowflake_id_key] = mapped_column(BigInteger, init=False, primary_key=True)
+ id: Mapped[id_key] = mapped_column(BigInteger, init=False, primary_key=True)
user_id: Mapped[int] = mapped_column(BigInteger, ForeignKey('wx_user.id'), nullable=False, comment='用户ID')
action: Mapped[str] = mapped_column(String(32), comment='动作:purchase/renewal/use/carryover/share/ad/freeze/unfreeze/refund')
amount: Mapped[int] = mapped_column(BigInteger, comment='变动数量')
balance_after: Mapped[int] = mapped_column(BigInteger, comment='变动后余额')
- related_id: Mapped[Optional[int]] = mapped_column(BigInteger, default=None, comment='关联ID,如订单ID、冻结记录ID')
- details: Mapped[Optional[dict]] = mapped_column(JSONB, default=None, comment='附加信息')
+ related_id: Mapped[Optional[int]] = mapped_column(BigInteger, default=None, comment='关联ID,如订单ID、冻结记录ID') # 添加默认值
+ details: Mapped[Optional[dict]] = mapped_column(MySQLJSON, default=None, comment='附加信息') # Changed from JSONB to MySQLJSON and add default
__table_args__ = (
Index('idx_usage_user_action', 'user_id', 'action'),
diff --git a/backend/app/admin/model/points.py b/backend/app/admin/model/points.py
index 8ec9e81..189e962 100644
--- a/backend/app/admin/model/points.py
+++ b/backend/app/admin/model/points.py
@@ -3,18 +3,18 @@
from datetime import datetime, timedelta
from typing import Optional
-from sqlalchemy import String, Column, BigInteger, ForeignKey, DateTime, Index, Text
-from sqlalchemy.dialects.postgresql import JSONB
+from sqlalchemy import String, BigInteger, DateTime, func, Index
+from sqlalchemy.dialects.mysql import JSON as MySQLJSON # Changed from postgresql.JSONB to mysql.JSON
from sqlalchemy.orm import Mapped, mapped_column
-from backend.common.model import snowflake_id_key, Base
+from backend.common.model import Base, id_key, snowflake_id_key
class Points(Base):
__tablename__ = 'points'
id: Mapped[snowflake_id_key] = mapped_column(BigInteger, init=False, primary_key=True)
- user_id: Mapped[int] = mapped_column(BigInteger, ForeignKey('wx_user.id'), unique=True, nullable=False, comment='关联的用户ID')
+ user_id: Mapped[int] = mapped_column(BigInteger, unique=True, nullable=False, comment='关联的用户ID')
balance: Mapped[int] = mapped_column(BigInteger, default=0, comment='当前积分余额')
total_earned: Mapped[int] = mapped_column(BigInteger, default=0, comment='累计获得积分')
total_spent: Mapped[int] = mapped_column(BigInteger, default=0, comment='累计消费积分')
@@ -31,12 +31,12 @@ class PointsLog(Base):
__tablename__ = 'points_log'
id: Mapped[snowflake_id_key] = mapped_column(BigInteger, init=False, primary_key=True)
- user_id: Mapped[int] = mapped_column(BigInteger, ForeignKey('wx_user.id'), nullable=False, comment='用户ID')
+ user_id: Mapped[int] = mapped_column(BigInteger, nullable=False, comment='用户ID')
action: Mapped[str] = mapped_column(String(32), comment='动作:earn/spend')
amount: Mapped[int] = mapped_column(BigInteger, comment='变动数量')
balance_after: Mapped[int] = mapped_column(BigInteger, comment='变动后余额')
related_id: Mapped[Optional[int]] = mapped_column(BigInteger, default=None, comment='关联ID')
- details: Mapped[Optional[dict]] = mapped_column(JSONB, default=None, comment='附加信息')
+ details: Mapped[Optional[dict]] = mapped_column(MySQLJSON, default=None, comment='附加信息')
# 索引优化
__table_args__ = (
diff --git a/backend/app/admin/model/wx_user.py b/backend/app/admin/model/wx_user.py
index 8017bd3..774c07d 100755
--- a/backend/app/admin/model/wx_user.py
+++ b/backend/app/admin/model/wx_user.py
@@ -3,7 +3,7 @@
from typing import Optional
from sqlalchemy import String, Column, BigInteger, SmallInteger, Boolean, DateTime, Index, func, JSON, Text, Numeric
-from sqlalchemy.dialects.postgresql import JSONB
+from sqlalchemy.dialects.mysql import JSON as MySQLJSON
from sqlalchemy.orm import Mapped, mapped_column
from backend.common.model import snowflake_id_key, Base
@@ -17,7 +17,7 @@ class WxUser(Base):
session_key: Mapped[str] = mapped_column(String(128), nullable=False, comment='会话密钥')
unionid: Mapped[Optional[str]] = mapped_column(String(64), default=None, index=True, comment='微信UnionID')
mobile: Mapped[Optional[str]] = mapped_column(String(15), default=None, index=True, comment='加密手机号')
- profile: Mapped[Optional[dict]] = mapped_column(JSONB(astext_type=Text()), default=None, comment='用户资料JSON')
+ profile: Mapped[Optional[dict]] = mapped_column(MySQLJSON, default=None, comment='用户资料JSON')
# class WxPayment(Base):
diff --git a/backend/app/admin/schema/pydantic_type.py b/backend/app/admin/schema/pydantic_type.py
index 76ce691..d6fcd5f 100755
--- a/backend/app/admin/schema/pydantic_type.py
+++ b/backend/app/admin/schema/pydantic_type.py
@@ -1,25 +1,31 @@
-from sqlalchemy import Column, BigInteger, String, Text
-from sqlalchemy.dialects.postgresql import JSONB
-from pgvector.sqlalchemy import Vector
-from sqlalchemy.types import TypeDecorator
-
-from backend.utils.json_encoder import jsonable_encoder
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+from sqlalchemy import TypeDecorator, Text
+from sqlalchemy.dialects.mysql import JSON as MySQLJSON # Changed from postgresql.JSONB to mysql.JSON
+from sqlalchemy.orm import DeclarativeBase
+import json
class PydanticType(TypeDecorator):
- """处理 Pydantic 模型的 SQLAlchemy 自定义类型"""
- impl = JSONB
+ """
+ 自定义 Pydantic 类型装饰器
+ """
+ impl = Text # Changed from JSONB to Text for MySQL compatibility
- def __init__(self, pydantic_type=None, *args, **kwargs):
+ def __init__(self, pydantic_type, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pydantic_type = pydantic_type
def process_bind_param(self, value, dialect):
- if value is None:
- return None
- return jsonable_encoder(value)
+ if value is not None:
+ if isinstance(value, self.pydantic_type):
+ return json.dumps(value.model_dump())
+ else:
+ return json.dumps(value)
+ return None
def process_result_value(self, value, dialect):
- if value is None or self.pydantic_type is None:
- return value
- return self.pydantic_type(**value)
\ No newline at end of file
+ if value is not None:
+ data = json.loads(value)
+ return self.pydantic_type(**data)
+ return None
\ No newline at end of file
diff --git a/backend/app/ai/model/article.py b/backend/app/ai/model/article.py
index 361c133..9a86f0a 100644
--- a/backend/app/ai/model/article.py
+++ b/backend/app/ai/model/article.py
@@ -4,7 +4,7 @@ from typing import Optional
from datetime import datetime
from sqlalchemy import BigInteger, Text, String, DateTime, ForeignKey
-from sqlalchemy.dialects.postgresql import JSONB
+from sqlalchemy.dialects.mysql import JSON as MySQLJSON
from sqlalchemy.orm import mapped_column, Mapped
from backend.common.model import snowflake_id_key, Base
@@ -23,7 +23,7 @@ class Article(Base):
author: Mapped[Optional[str]] = mapped_column(String(100), nullable=True, comment="作者")
category: Mapped[Optional[str]] = mapped_column(String(50), nullable=True, comment="分类")
level: Mapped[Optional[str]] = mapped_column(String(20), nullable=True, comment="难度等级")
- info: Mapped[Optional[dict]] = mapped_column(JSONB, default=None, comment="附加信息")
+ info: Mapped[Optional[dict]] = mapped_column(MySQLJSON, default=None, comment="附加信息")
# 表参数 - 包含所有必要的约束
__table_args__ = (
@@ -42,7 +42,7 @@ class ArticleParagraph(Base):
paragraph_index: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="段落序号")
content: Mapped[str] = mapped_column(Text, nullable=False, comment="段落内容")
standard_audio_id: Mapped[Optional[int]] = mapped_column(BigInteger, ForeignKey('file.id'), nullable=True, comment="标准朗读音频文件ID")
- info: Mapped[Optional[dict]] = mapped_column(JSONB, default=None, comment="附加信息")
+ info: Mapped[Optional[dict]] = mapped_column(MySQLJSON, default=None, comment="附加信息")
# 表参数 - 包含所有必要的约束
__table_args__ = (
@@ -61,7 +61,7 @@ class ArticleSentence(Base):
sentence_index: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="句子序号")
content: Mapped[str] = mapped_column(Text, nullable=False, comment="句子内容")
standard_audio_id: Mapped[Optional[int]] = mapped_column(BigInteger, ForeignKey('file.id'), nullable=True, comment="标准朗读音频文件ID")
- info: Mapped[Optional[dict]] = mapped_column(JSONB, default=None, comment="附加信息")
+ info: Mapped[Optional[dict]] = mapped_column(MySQLJSON, default=None, comment="附加信息")
# 表参数 - 包含所有必要的约束
__table_args__ = (
diff --git a/backend/app/ai/model/image.py b/backend/app/ai/model/image.py
index 6153aef..71a8359 100755
--- a/backend/app/ai/model/image.py
+++ b/backend/app/ai/model/image.py
@@ -3,8 +3,7 @@
from typing import Optional
from sqlalchemy import BigInteger, Text, String, Index, ForeignKey
-from sqlalchemy.dialects.postgresql import JSONB
-from pgvector.sqlalchemy import Vector
+from sqlalchemy.dialects.mysql import JSON as MySQLJSON
from sqlalchemy.orm import mapped_column, Mapped
from backend.app.ai.schema.image import ImageMetadata
@@ -19,7 +18,7 @@ class Image(Base):
file_id: Mapped[Optional[int]] = mapped_column(BigInteger, ForeignKey('file.id'), nullable=True, comment="关联的文件ID")
thumbnail_id: Mapped[Optional[int]] = mapped_column(BigInteger, default=None, nullable=True, comment="缩略图ID")
info: Mapped[Optional[ImageMetadata]] = mapped_column(PydanticType(pydantic_type=ImageMetadata), default=None, comment="附加元数据") # 其他可能的字段(根据实际需求添加)
- details: Mapped[Optional[dict]] = mapped_column(JSONB(astext_type=Text()), default=None, comment="其他信息") # 其他信息
+ details: Mapped[Optional[dict]] = mapped_column(MySQLJSON, default=None, comment="其他信息") # 其他信息
# 表参数 - 包含所有必要的约束
__table_args__ = (
diff --git a/backend/app/ai/model/image_task.py b/backend/app/ai/model/image_task.py
index 46c23a7..a060b67 100644
--- a/backend/app/ai/model/image_task.py
+++ b/backend/app/ai/model/image_task.py
@@ -4,7 +4,7 @@ from enum import Enum
from typing import Optional
from sqlalchemy import BigInteger, Text, String, Index, Integer
-from sqlalchemy.dialects.postgresql import JSONB
+from sqlalchemy.dialects.mysql import JSON as MySQLJSON
from sqlalchemy.orm import mapped_column, Mapped
from backend.common.model import snowflake_id_key, Base
@@ -24,10 +24,10 @@ class ImageProcessingTask(Base):
image_id: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="关联的图片ID")
file_id: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="关联的文件ID")
user_id: Mapped[int] = mapped_column(BigInteger, nullable=False, comment="用户ID")
- dict_level: Mapped[str] = mapped_column(String, nullable=False, comment="词典等级")
- type: Mapped[str] = mapped_column(String, nullable=False, comment="处理类型")
- status: Mapped[ImageTaskStatus] = mapped_column(String, default=ImageTaskStatus.PENDING, comment="任务状态")
- result: Mapped[Optional[dict]] = mapped_column(JSONB(astext_type=Text()), default=None, comment="处理结果")
+ dict_level: Mapped[str] = mapped_column(String(20), nullable=False, comment="词典等级")
+ type: Mapped[str] = mapped_column(String(50), nullable=False, comment="处理类型")
+ status: Mapped[ImageTaskStatus] = mapped_column(String(20), default=ImageTaskStatus.PENDING, comment="任务状态")
+ result: Mapped[Optional[dict]] = mapped_column(MySQLJSON, default=None, comment="处理结果")
error_message: Mapped[Optional[str]] = mapped_column(Text, default=None, comment="错误信息")
retry_count: Mapped[int] = mapped_column(Integer, default=0, comment="重试次数")
diff --git a/backend/app/ai/model/image_text.py b/backend/app/ai/model/image_text.py
index b64bc10..6a0bde2 100644
--- a/backend/app/ai/model/image_text.py
+++ b/backend/app/ai/model/image_text.py
@@ -3,7 +3,7 @@
from typing import Optional
from sqlalchemy import BigInteger, Text, String, Integer, DateTime, ForeignKey
-from sqlalchemy.dialects.postgresql import JSONB
+from sqlalchemy.dialects.mysql import JSON as MySQLJSON
from sqlalchemy.orm import mapped_column, Mapped
from backend.common.model import snowflake_id_key, Base
@@ -24,10 +24,10 @@ class ImageText(Base):
standard_audio_id: Mapped[Optional[int]] = mapped_column(BigInteger, ForeignKey('file.id'), nullable=True, comment="标准朗读音频文件ID")
ipa: Mapped[Optional[str]] = mapped_column(String(100), default=None, comment="ipa")
zh: Mapped[Optional[str]] = mapped_column(String(100), default=None, comment="中文")
- position: Mapped[Optional[dict]] = mapped_column(JSONB, default=None, comment="文本在图片中的位置信息或文章中的位置信息")
+ position: Mapped[Optional[dict]] = mapped_column(MySQLJSON, default=None, comment="文本在图片中的位置信息或文章中的位置信息")
dict_level: Mapped[Optional[str]] = mapped_column(String(20), default=None, comment="词典等级")
source: Mapped[Optional[str]] = mapped_column(String(20), default=None, comment="文本来源 (ref_word/description/article)")
- info: Mapped[Optional[dict]] = mapped_column(JSONB, default=None, comment="附加信息")
+ info: Mapped[Optional[dict]] = mapped_column(MySQLJSON, default=None, comment="附加信息")
# 表参数 - 包含所有必要的约束
__table_args__ = (
diff --git a/backend/app/ai/model/recording.py b/backend/app/ai/model/recording.py
index 974ab52..8add126 100755
--- a/backend/app/ai/model/recording.py
+++ b/backend/app/ai/model/recording.py
@@ -4,7 +4,7 @@ from typing import Optional
from datetime import datetime
from sqlalchemy import BigInteger, Text, ForeignKey, String, Integer, DateTime, Boolean, Index
-from sqlalchemy.dialects.postgresql import JSONB
+from sqlalchemy.dialects.mysql import JSON as MySQLJSON
from sqlalchemy.orm import mapped_column, Mapped
from backend.app.ai.schema.recording import RecordingMetadata
@@ -24,7 +24,7 @@ class Recording(Base):
text: Mapped[Optional[str]] = mapped_column(String(255), nullable=True, comment='朗读文本')
eval_mode: Mapped[Optional[int]] = mapped_column(Integer, nullable=True, comment='评测模式')
info: Mapped[Optional[RecordingMetadata]] = mapped_column(PydanticType(pydantic_type=RecordingMetadata), default=None, comment="附加元数据") # 其他可能的字段(根据实际需求添加)
- details: Mapped[Optional[dict]] = mapped_column(JSONB(astext_type=Text()), default=None, comment="评估信息") # 其他信息
+ details: Mapped[Optional[dict]] = mapped_column(MySQLJSON, default=None, comment="评估信息") # 其他信息
is_standard: Mapped[bool] = mapped_column(Boolean, default=False, comment="是否为标准朗读音频")
# 表参数 - 包含所有必要的约束
diff --git a/backend/app/ai/schema/image.py b/backend/app/ai/schema/image.py
index 7311bbd..5e1998e 100755
--- a/backend/app/ai/schema/image.py
+++ b/backend/app/ai/schema/image.py
@@ -73,7 +73,7 @@ class ImageShowRes(ImageRecognizeRes):
class ImageInfoSchemaBase(SchemaBase):
- embedding: Optional[list] = None
+ # embedding: Optional[list] = None
info: Optional[ImageMetadata] = None
details: Optional[dict] = None
diff --git a/backend/app/ai/service/image_service.py b/backend/app/ai/service/image_service.py
index 99a3a90..e790df4 100755
--- a/backend/app/ai/service/image_service.py
+++ b/backend/app/ai/service/image_service.py
@@ -356,19 +356,19 @@ class ImageService:
background_tasks.add_task(ImageService.generate_thumbnail, image_id, file_id)
# embedding
- embed_params = QwenEmbedImageParams(
- user_id=current_user.id,
- dict_level=dict_level,
- image_id=new_image.id,
- file_name=file_name,
- format=image_format_str,
- data=base64_image,
- )
- embed_response = await Qwen.embed_image(embed_params)
- if embed_response.get("error"):
- raise Exception(embed_response["error"])
-
- embedding = embed_response.get("embedding")
+ # embed_params = QwenEmbedImageParams(
+ # user_id=current_user.id,
+ # dict_level=dict_level,
+ # image_id=new_image.id,
+ # file_name=file_name,
+ # format=image_format_str,
+ # data=base64_image,
+ # )
+ # embed_response = await Qwen.embed_image(embed_params)
+ # if embed_response.get("error"):
+ # raise Exception(embed_response["error"])
+ #
+ # embedding = embed_response.get("embedding")
# 提取元数据
additional_info = {
@@ -382,7 +382,7 @@ class ImageService:
await image_dao.update(
db, new_image.id,
UpdateImageParam(
- embedding=embedding,
+ # embedding=embedding,
info=metadata or {},
)
)
diff --git a/backend/core/conf.py b/backend/core/conf.py
index 9b1ce79..d1a71be 100755
--- a/backend/core/conf.py
+++ b/backend/core/conf.py
@@ -3,7 +3,7 @@
from functools import lru_cache
from typing import Any, Literal
from celery.schedules import crontab
-from pydantic import model_validator, PostgresDsn
+from pydantic import model_validator, HttpUrl # Changed from PostgresDsn to HttpUrl
from pydantic_settings import BaseSettings, SettingsConfigDict
from backend.core.path_conf import BASE_PATH
@@ -58,10 +58,10 @@ class Settings(BaseSettings):
# .env 数据库
DATABASE_ECHO: bool | Literal['debug'] = False
DATABASE_HOST: str
- DATABASE_PORT: int = 5432
+ DATABASE_PORT: int = 3306
DATABASE_USER: str
DATABASE_PASSWORD: str
- DATABASE_DB_NAME: str = 'postgres'
+ DATABASE_DB_NAME: str = 'app'
# .env Redis
REDIS_HOST: str
@@ -194,14 +194,7 @@ def get_settings():
# 环境区分示例
def get_db_uri(settings: Settings):
- return PostgresDsn.build(
- scheme="postgresql+asyncpg",
- username=settings.DATABASE_USER,
- password=settings.DATABASE_PASSWORD,
- host=settings.DATABASE_HOST,
- port=settings.DATABASE_PORT,
- path=settings.DATABASE_DB_NAME,
- ).unicode_string()
+ # Changed from PostgresDsn.build to manual URL construction for MySQL
+ return f"mysql+asyncmy://{settings.DATABASE_USER}:{settings.DATABASE_PASSWORD}@{settings.DATABASE_HOST}:{settings.DATABASE_PORT}/{settings.DATABASE_DB_NAME}"
-
-settings = get_settings()
+settings = get_settings()
\ No newline at end of file
diff --git a/backend/database/db.py b/backend/database/db.py
index b86afed..33c2233 100755
--- a/backend/database/db.py
+++ b/backend/database/db.py
@@ -26,28 +26,26 @@ def create_async_engine_and_session(
application_name: str = "app"
) -> tuple[create_async_engine, async_sessionmaker[AsyncSession], async_sessionmaker[AsyncSession]]:
"""
- 创建 PostgreSQL 异步引擎和会话工厂
+ 创建 MySQL 异步引擎和会话工厂
参数优化说明:
- pool_size: 建议设置为 (核心数 * 2) + 有效磁盘数
- max_overflow: 峰值连接缓冲,避免连接风暴
- - pool_recycle: 防止 PostgreSQL 连接超时 (默认为 1 小时)
+ - pool_recycle: 防止 MySQL 连接超时 (默认为 1 小时)
- pool_pre_ping: 强烈建议开启,处理连接失效问题
- application_name: 帮助 DBA 识别连接来源
"""
try:
- # 创建异步引擎 (针对 PostgreSQL 优化)
+ # 创建异步引擎 (针对 MySQL 优化)
engine = create_async_engine(
url,
echo=echo,
echo_pool=echo,
future=True,
connect_args={
- "server_settings": {
- "application_name": application_name,
- "jit": "off", # 禁用 JIT 编译,提高简单查询性能
- "statement_timeout": "30000" # 30 秒查询超时
- }
+ "charset": "utf8mb4", # MySQL 特定字符集
+ "autocommit": True, # 自动提交
+ "connect_timeout": 60, # 连接超时
},
pool_size=pool_size,
max_overflow=max_overflow,
@@ -55,10 +53,10 @@ def create_async_engine_and_session(
pool_recycle=pool_recycle,
pool_pre_ping=pool_pre_ping,
pool_use_lifo=True, # 使用 LIFO 提高连接池效率
- # PostgreSQL 特定优化参数
+ # MySQL 特定优化参数
poolclass=None, # 使用默认 QueuePool
execution_options={
- "isolation_level": "REPEATABLE READ", # 推荐隔离级别
+ "isolation_level": "READ COMMITTED", # MySQL 推荐隔离级别
"compiled_cache": None # 禁用缓存,避免内存泄漏
}
)
@@ -67,18 +65,23 @@ def create_async_engine_and_session(
pool_size=5,
max_overflow=10,
pool_pre_ping=True,
- pool_recycle=300
+ pool_recycle=300,
+ connect_args={
+ "charset": "utf8mb4",
+ "autocommit": True,
+ "connect_timeout": 60,
+ }
)
except Exception as e:
- log.error(f'❌ PostgreSQL 数据库连接失败: {e}')
+ log.error(f'❌ MySQL 数据库连接失败: {e}')
sys.exit(1)
else:
- # 创建异步会话工厂 (针对 PostgreSQL 优化)
+ # 创建异步会话工厂 (针对 MySQL 优化)
db_session = async_sessionmaker(
bind=engine,
autoflush=False,
expire_on_commit=False,
- # PostgreSQL 特定优化
+ # MySQL 特定优化
class_=AsyncSession,
twophase=False, # 禁用两阶段提交
enable_baked_queries=False, # 禁用 baked 查询避免内存问题
@@ -90,7 +93,7 @@ def create_async_engine_and_session(
autoflush=False
)
- log.info(f'✅ PostgreSQL 异步引擎创建成功 | 连接池: [{pool_size}] - [{max_overflow}]')
+ log.info(f'✅ MySQL 异步引擎创建成功 | 连接池: [{pool_size}] - [{max_overflow}]')
return engine, db_session, background_db_session
@@ -116,4 +119,4 @@ SQLALCHEMY_DATABASE_URL = get_db_uri(settings)
async_engine, async_db_session, background_db_session = create_async_engine_and_session(SQLALCHEMY_DATABASE_URL)
# Session Annotated
-CurrentSession = Annotated[AsyncSession, Depends(get_db)]
+CurrentSession = Annotated[AsyncSession, Depends(get_db)]
\ No newline at end of file
diff --git a/deploy/docker-compose/docker-compose.yml b/deploy/docker-compose/docker-compose.yml
index 3640e0a..81ac099 100755
--- a/deploy/docker-compose/docker-compose.yml
+++ b/deploy/docker-compose/docker-compose.yml
@@ -1,25 +1,39 @@
+version: '3.8'
+
services:
fsm_server:
build:
context: ../../
dockerfile: Dockerfile
- image: fsm_server:latest
+ ports:
+ - "8000:8000"
container_name: fsm_server
restart: always
depends_on:
- fsm_mysql
- fsm_redis
volumes:
- - fsm_static:/fsm/backend/static
+ - fsm_static:/www/fsm_server/backend/static
+ environment:
+ - SERVER_HOST=0.0.0.0
+ - SERVER_PORT=8000
+ - DATABASE_HOST=fsm_mysql
+ - DATABASE_PORT=3306
+ - DATABASE_USER=root
+ - DATABASE_PASSWORD=123456
+ - DATABASE_DB_NAME=fsm
+ - REDIS_HOST=fsm_redis
+ - REDIS_PORT=6379
+ - REDIS_PASSWORD=
+ - REDIS_DATABASE=0
networks:
- fsm_network
- command:
- - bash
- - -c
- - |
+ command: |
+ sh -c "
wait-for-it -s fsm_mysql:3306 -s fsm_redis:6379 -t 300
supervisord -c /etc/supervisor/supervisord.conf
supervisorctl restart
+ "
fsm_mysql:
image: mysql:8.0.29
@@ -42,17 +56,24 @@ services:
--lower_case_table_names=1
fsm_redis:
- image: redis:6.2.7
+ image: redis:7.0.4
ports:
- "6379:6379"
container_name: fsm_redis
restart: always
- environment:
- - TZ=Asia/Shanghai
volumes:
- - fsm_redis:/var/lib/redis
+ - fsm_redis:/data
networks:
- fsm_network
+ command: |
+ --requirepass ""
+ --appendonly yes
+ --appendfilename "redis-staging.aof"
+ --appendfsync everysec
+ --dir /data
+ --databases 16
+ --maxmemory 256mb
+ --maxmemory-policy allkeys-lru
fsm_nginx:
image: nginx:stable
@@ -83,4 +104,4 @@ volumes:
fsm_redis:
name: fsm_redis
fsm_static:
- name: fsm_static
+ name: fsm_static
\ No newline at end of file
diff --git a/pyproject.toml b/pyproject.toml
index 7271d02..9dd06dd 100755
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,75 +1,81 @@
[project]
-name = "app"
-description = """
-A RBAC (Role-Based Access Control) permission control system built on FastAPI, featuring a unique pseudo-three-tier
-architecture design, with built-in basic implementation of fastapi admin as a template library, free and open-source.
-"""
+name = "blabla-server"
+version = "0.0.1"
+description = "FastAPI Best Architecture"
authors = [
{ name = "Felix", email = "hengzone@outlook.com" },
]
-readme = "README.md"
-license = { text = "MIT" }
-requires-python = ">=3.10"
-dynamic = ['version']
dependencies = [
- "aiofiles>=24.1.0",
- "aiosmtplib>=4.0.2",
- "alembic>=1.16.5",
- "asgi-correlation-id>=4.3.4",
- "asgiref>=3.9.1",
+ "fastapi>=0.115.0",
+ "uvicorn>=0.30.0",
+ "sqlalchemy>=2.0.0",
+ "alembic>=1.13.0",
"asyncmy>=0.2.10",
- "asyncpg>=0.30.0",
- "apscheduler==3.11.0",
- "bcrypt>=4.3.0",
- "cappa>=0.30.0",
- "cryptography>=45.0.6",
- "dulwich>=0.24.1",
- "fast-captcha>=0.3.2",
- "fastapi-limiter>=0.1.6",
- "fastapi-pagination>=0.14.0",
- "fastapi[standard-no-fastapi-cloud-cli]>=0.116.1",
- "fastapi-utilities==0.3.1",
- "flower>=2.0.1",
- "gevent>=25.8.2",
- "granian>=2.5.1",
- "ip2loc>=1.0.0",
- "itsdangerous>=2.2.0",
- "jinja2>=3.1.6",
- "loguru>=0.7.3",
- "msgspec>=0.19.0",
- "psutil>=7.0.0",
- "psycopg[binary]>=3.2.9",
- "pwdlib>=0.2.1",
- "pydantic>=2.11.7",
- "pydantic-settings>=2.10.1",
- "pymysql>=1.1.1",
- "python-jose>=3.5.0",
- "python-socketio>=5.13.0",
- "pycrypto==2.6.1",
- "redis[hiredis]>=6.4.0",
- "rtoml>=0.12.0",
- "sqlalchemy-crud-plus>=1.11.0",
- "sqlalchemy[asyncio]>=2.0.43",
- "sqlparse>=0.5.3",
+ "mysql-connector-python>=8.0.33", # Added MySQL connector
+ "pydantic>=2.0.0",
+ "pydantic-settings>=2.0.0",
+ "passlib>=1.7.4",
+ "bcrypt>=4.0.0",
+ "python-jose>=3.3.0",
+ "python-multipart>=0.0.9",
+ "redis>=5.0.0",
+ "fastapi-limiter>=0.1.5",
+ "fastapi-pagination>=0.12.0",
+ "celery>=5.3.0",
+ "flower>=2.0.0",
+ "loguru>=0.7.0",
+ "apscheduler>=3.10.0",
+ "typer>=0.9.0",
+ "rich>=13.0.0",
+ "httpx>=0.25.0",
+ "jinja2>=3.1.0",
+ "python-dotenv>=1.0.0",
+ "cryptography>=41.0.0",
+ "python-socketio>=5.8.0",
+ "asgi-correlation-id>=4.2.0",
+ "fastapi-utilities>=0.3.0",
+ "sqlalchemy-crud-plus>=1.0.0",
+ "path>=16.7.0",
+ "fast-captcha>=0.3.0",
"user-agents>=2.2.0",
+ "ip2loc>=1.0.0",
+ "dashscope>=1.14.0",
+ "dulwich>=0.27.0",
+ "msgspec>=0.18.0",
+ "rtoml>=0.11.0",
+ "psutil>=5.9.0",
+ "pwdlib>=0.2.0",
+ "itsdangerous>=2.1.0",
+ "aiofiles>=23.0.0",
+ "asgiref>=3.7.0",
]
+requires-python = ">=3.10"
+readme = "README.md"
-[dependency-groups]
-dev = [
- "pytest>=8.4.0",
- "pytest-sugar>=1.1.1",
-]
-lint = [
- "pre-commit>=4.3.0",
-]
server = [
"aio-pika>=9.5.7",
"wait-for-it>=2.3.0",
]
+[build-system]
+requires = ["setuptools>=61.0", "wheel"]
+build-backend = "setuptools.build_meta"
+
+[tool.setuptools.packages.find]
+include = ["backend*"]
+
+[tool.setuptools.package-data]
+"*" = ["*.md", "*.yaml", "*.yml"]
+
[tool.uv]
python-downloads = "manual"
default-groups = ["dev", "lint"]
+dev-dependencies = [
+ "pytest>=7.4.0",
+ "pytest-sugar>=0.9.7",
+ "pre-commit>=3.4.0",
+ "ruff>=0.1.0",
+]
[[tool.uv.index]]
name = "aliyun"
@@ -84,6 +90,21 @@ path = "backend/__init__.py"
[project.scripts]
myapp = "backend.cli:main"
-[build-system]
-requires = ["hatchling"]
-build-backend = "hatchling.build"
+[tool.ruff]
+line-length = 120
+indent-width = 4
+target-version = "py310"
+
+[tool.ruff.lint]
+select = [
+ "E", # pycodestyle errors
+ "W", # pycodestyle warnings
+ "F", # pyflakes
+ "I", # isort
+ "C", # flake8-comprehensions
+ "B", # flake8-bugbear
+ "Q", # flake8-quotes
+ "SIM", # flake8-simplify
+ "TID", # flake8-tidy-imports
+ "RUF", # Ruff-specific rules
+]
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index b39c147..0f53a8b 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -24,7 +24,7 @@ async-timeout==5.0.1 ; python_full_version < '3.11.3'
# redis
asyncmy==0.2.10
# via fastapi-best-architecture
-asyncpg==0.30.0
+# asyncpg==0.30.0 # Removed PostgreSQL driver
# via fastapi-best-architecture
bcrypt==4.3.0
# via fastapi-best-architecture
@@ -319,3 +319,5 @@ zope-event==5.0
# via gevent
zope-interface==7.2
# via gevent
+mysql-connector-python==8.0.33 # Added MySQL connector
+ # via fastapi-best-architecture
\ No newline at end of file