From 0fc4f421fcb46ae1bf564b8b410de47f8b1848d8 Mon Sep 17 00:00:00 2001 From: Felix Date: Thu, 4 Dec 2025 15:30:04 +0800 Subject: [PATCH] fix code --- .DS_Store | Bin 8196 -> 6148 bytes assets/call_api/request.py | 20 -- assets/dict/dictionary_parser.py | 469 ---------------------------- assets/generate_and_save_coupons.py | 179 ----------- 4 files changed, 668 deletions(-) delete mode 100644 assets/call_api/request.py delete mode 100755 assets/dict/dictionary_parser.py delete mode 100644 assets/generate_and_save_coupons.py diff --git a/.DS_Store b/.DS_Store index 3099daff72326caf7431b806f202a168c424205f..b2aa329618febf2627e516444ff235e17ee2ba2c 100644 GIT binary patch delta 273 zcmZp1XfcprU|?W$DortDU=RQ@Ie-{MGjdEU6q~50#gUV47@VA+TQIS3bv+wUf`vhk zA)O(Up#&xY6k}Ks_hkvPTyDOLOHxjL5>O0iU_<7Tg5!>;@+o-b3o;;<0`)MgSXcm} z*cp;U84?+i8L}Bt8S)rXQi_vvlJfI&7$ygbOKdC*U|Y=2!6C>DQVR3|H;`}z`C?7>({vNOA- zHZ}FhOVPv^jf(na)CbfkM0_$)q9(=%f<$={W8xDp-tfW1_@6nmg%(JBP}As~=A8eW z|8i#jf4+at{!_*nS_*mtW3`MinL4MMO2cg$m-Bv0lLAf!QII`j9&@uJ?r_#}vbSi5 zj4%RW1i}b}5eOp?MqoBXfX-~*9VTCIdPlsQ>Dq!7l-b zmJ{1=G^aYiw}}Nb8PEwqeN);~R1XMD5tJAZ<|L1Cb)w0DP6#T@8H71QaAgD)3jC{+ zUCdW!ND3N`VFbbm+!_I1yH~IbbLr2RQNN$h4qK+{B@&k*m6gw_sFW*ZRX&m)a7NNz z#w)ljnfxKX>U10{TbgIG2Tfz7U#m_zwwEz&BkvoSxhnMp~SLt-HQ$S_ouC zR{FIiV`EL5H`d1*wrm)$kBv1nCK~Evo3=EKk1KLbeErq~{e#w!?Mw??nGoIo0YYSZRac9o4 z_vMWu`PQ9ttiq5@l7+d9l`D|4(QH|1^HHIl%R82}&v8x9bnM<9&p7FE8Q*A{sHe|$ z@?Of{tjDDNg;o{6#@=CW>>f1oE>T4}GkytXbr((7 z%vwfo#?2YFZraDYMoirs=%cXOp81GrO1Yska4N1cFEGamqj8%=Y~ybm!3k_ zJW(*cQ7taZ;u?_^aYfe79@fdp4oBxZ>b@irS+QD{4{?4fZ_p^%YGiFa8Du(Layhuo zM1!JeZs#C9dts_>+$1ZS-eqO7h83_L*h;M3-9_PXz^vIWD?K@9Xo#F~1uv3nm1XU- zqx{0{8Q0r8Y}iw`l-eQ7eeOUeZ|of*#OEe$y6HK2KN@>hL?XR>t7cs{ zbaHxW?UJfUI?^R;7s*C5?V?For)MA_FULz*iznBJY5b$X34i9tIG#)hcC{c=BR}CH zEWujXE_Rd+vr#t5o@cMI_t{x?j$L41vdio$`+@z)equkf>+E-wV=k((3@fo3b+`|W z*nlQ%LJIBZz#jA=2OS2E!$JW?j9?Ux;R!s6r!a}<@Dg6eX}pTp@iyMUyZ8_v;}cxK zr?`x7aTVX;8h*uf{3a< zNTWoiy+bJ7f^UG*ncJsZ^x+*lckOPUwdwZFe4b!;Ro5(999y|&ebbhQnx|kWP+J1( z%?aRRep;sZnD=op6R&`EYloZCv8>uK#vy_lpB~M=Y64$;*jYqXPls00yO0A1( z6_he!xi-2ks!5coVws3Gs#+Q4q2k+ajIL8P$YOE7x>eQ6DI>*lo4Q=pm?9P}(blNO z^Gp7xkp4Nl$i60|Un8XdfilQgj2KoSLCD^QB%0BRHbVD4>_-}%ID&p0Lzb{^!G?nq z$YTuSg!BnKj;C=7&)`|SfENkxr|}A*{SCZ{x9}d$;v7D}c^~&*;456g_cM^(F&)YH zWhRn~>AY<@_Hp8K^jEShos~H@NN1(ZbluHaSvvc;sDkOl9A*}C*yzmylFiaqChn`2 z*Z*yI{{BBp?>@Y(Falu&{*DNsyfxk0LOYcX_?esXT02Jl7 None: - """解析MDX和MDD文件""" - try: - # 解析MDX文件 - entries, media_references = self.parse_mdx_file_mdict(mdx_path) - - # 保存词汇条目 - entry_ids = self.save_entries(entries) - - # 如果有MDD文件,解析媒体文件 - if mdd_path and os.path.exists(mdd_path): - self.parse_mdd_file(mdd_path, media_references, entry_ids) - else: - print("未提供MDD文件或文件不存在") - - print(f"解析完成,共处理 {len(entries)} 个词汇条目") - - except Error as e: - print(f"解析词典文件失败: {e}") - raise - - def parse_mdx_file_mdict(self, mdx_path: str) -> Tuple[List[Tuple[str, str]], List[Dict]]: - """使用 mdict_reader 解析 MDX 文件""" - print(f"正在解析MDX文件: {mdx_path}") - - try: - mdx = MDX(mdx_path) - entries = [] - media_references = [] - - for key, value in mdx.items(): - word = key.decode('utf-8') if isinstance(key, bytes) else str(key) - definition = value.decode('utf-8') if isinstance(value, bytes) else str(value) - - if word and definition: - entries.append((word, definition)) - # 提取媒体文件引用 - media_refs = self.extract_media_references(definition, word) - media_references.extend(media_refs) - - return entries, media_references - - except Error as e: - print(f"解析MDX文件失败: {e}") - raise - - def parse_mdd_file(self, mdd_path: str, media_references: List[Dict], entry_ids: Dict[str, int]) -> None: - """解析MDD文件中的媒体资源 - 使用 mdict_reader""" - print(f"正在解析MDD文件: {mdd_path}") - - try: - # 使用 mdict_reader 解析 MDD 文件 - mdd = MDD(mdd_path) - - # 创建文件名到媒体数据的映射 - dict_media = {} - for key, value in mdd.items(): - filename = key.decode('utf-8') if isinstance(key, bytes) else str(key) - # 确保文件名格式统一 - filename = filename.replace('\\', '/').lstrip('/') - dict_media[filename] = value - - # 保存媒体文件 - self.save_dict_media(dict_media, media_references, entry_ids) - - except Error as e: - print(f"解析MDD文件失败: {e}") - raise - - def extract_media_references(self, definition: str, word: str) -> List[Dict]: - """从定义中提取媒体文件引用""" - media_refs = [] - - # 提取音频文件引用 - 更通用的模式,匹配 sound:// 或 href="sound://..." - # 这个模式应该能覆盖 aeroplane.txt 中的 sound://media/english/... 链接 - audio_patterns = [ - r'sound://([^"\s>]+\.mp3)', # 直接 sound:// 开头,后跟非空格/"/>字符直到 .mp3 - r'href\s*=\s*["\']sound://([^"\'>]+\.mp3)["\']', # href="sound://..." - r'href\s*=\s*["\']sound://([^"\'>]+)["\']', # 更宽松的 href="sound://...",不一定以.mp3结尾 - r'data-src-mp3\s*=\s*["\']sound://([^"\'>]+\.mp3)["\']', # data-src-mp3="sound://..." - r'data-src-mp3\s*=\s*["\']([^"\'>]+\.mp3)["\']', # data-src-mp3="..." (相对路径) - r'audio\s*=\s*["\']([^"\']+)["\']', # audio="..." - ] - - for pattern in audio_patterns: - matches = re.findall(pattern, definition, re.IGNORECASE) - for match in matches: - # 清理可能的多余字符(如结尾的引号或空格,虽然正则应该已经避免了) - clean_filename = match.strip()#.rstrip('"\'') - if clean_filename: - media_refs.append({ - 'filename': clean_filename, - 'type': 'audio', - 'word': word - }) - - # 提取图片文件引用 - image_patterns = [ - r']*src\s*=\s*["\']([^"\']+\.(?:jpg|jpeg|png|gif|bmp))["\']', # src="..." - r'\[image:([^\]]+\.(?:jpg|jpeg|png|gif|bmp))\]', # [image:...] - r'src\s*=\s*["\']([^"\']+\.(?:jpg|jpeg|png|gif|bmp))["\']' # 更宽松的 src="..." - ] - - for pattern in image_patterns: - matches = re.findall(pattern, definition, re.IGNORECASE) - for match in matches: - # 清理可能的多余字符 - clean_filename = match.strip()#.rstrip('"\'') - if clean_filename: - media_refs.append({ - 'filename': clean_filename, - 'type': 'image', - 'word': word - }) - - return media_refs - - def save_entries(self, entries: List[Tuple[str, str]]) -> Dict[str, int]: - """保存词汇条目到数据库,并更新 details 字段""" - from mysql.connector import Error - import hashlib - - cursor = self.conn.cursor(dictionary=True) - entry_ids = {} - - for word, definition in entries: - try: - # 检查数据库中是否已存在该词条 - cursor.execute('SELECT id, definition, details FROM dict_entry WHERE word = %s', (word,)) - existing_record = cursor.fetchone() - - metadata = None - existing_details = None - final_definition = definition # 默认使用当前 definition - - # 如果存在现有记录 - if existing_record: - entry_id, existing_definition, existing_details_json = existing_record['id'], existing_record['definition'], existing_record['details'] - - # 获取现有的 details - if existing_details_json: - try: - existing_details = WordMetaData(**existing_details_json) - except: - existing_details = None - - # 如果当前 definition 是以 @@@ 开头的引用链接 - if definition.startswith('@@@'): - # 保留现有的 definition,只更新 details 中的 ref_link - final_definition = existing_definition # 保持原有的 definition - - # 提取新的 @@@ 链接 - lines = definition.split('\n') - new_ref_links = [] - for line in lines: - if line.startswith('@@@'): - link = line[3:].strip() - if link: - new_ref_links.append(link) - else: - break - - # 合并链接信息 - if new_ref_links: - if existing_details: - # 如果已有 details,合并 ref_link - if existing_details.ref_link: - # 合并现有链接和新链接,去重但保持顺序 - combined_links = existing_details.ref_link[:] - for link in new_ref_links: - if link not in combined_links: - combined_links.append(link) - else: - combined_links = new_ref_links - else: - combined_links = new_ref_links - - # 更新 details - if existing_details: - metadata = existing_details.model_copy(update={"ref_link": combined_links}) - else: - metadata = WordMetaData(ref_link=combined_links) - - # 如果是新词条或需要更新 details - if not existing_record or metadata: - # 如果是新词条,创建默认 metadata - if not existing_record: - metadata = WordMetaData() - - # 准备 details 数据 - details_dict = metadata.model_dump() if metadata else None - - if existing_record: - # 更新现有记录 - cursor.execute(''' - UPDATE dict_entry - SET definition = %s, details = %s - WHERE word = %s - ''', (final_definition, json.dumps(details_dict) if details_dict else None, word)) - entry_id = existing_record['id'] - else: - # 插入新记录 - cursor.execute(''' - INSERT INTO dict_entry (word, definition, details) - VALUES (%s, %s, %s) - ''', (word, final_definition, json.dumps(details_dict) if details_dict else None)) - entry_id = cursor.lastrowid - - else: - # 保持现有记录不变 - entry_id = existing_record['id'] - - entry_ids[word] = entry_id - - except Error as e: - print(f"保存词条 '{word}' 时出错: {e}") - - self.conn.commit() - cursor.close() - return entry_ids - - def parse_definition_to_metadata(self, html_str: str) -> Tuple[Dict, List[Dict]]: - """解析HTML定义字符串,提取元数据""" - soup = BeautifulSoup(html_str, 'html.parser') - - # 提取发音 - pronunciations = [] - pron_links = soup.find_all('a', class_='pronounce') - for link in pron_links: - pron_type = link.get('data-rel', '') - pron_url = link.get('href', '') - if pron_type and pron_url: - pronunciations.append(Pronunciation(type=pron_type, url=pron_url)) - - # 提取词性 - pos_elements = soup.find_all('span', class_='pos') - pos_list = [pos.get_text().strip() for pos in pos_elements] - - # 提取释义 - definitions = [] - sense_elements = soup.find_all('span', class_='def') - for sense in sense_elements: - definition_text = sense.get_text().strip() - if definition_text: - definitions.append(Definition(text=definition_text)) - - # 提取例句 - examples = [] - example_elements = soup.find_all('span', class_='example') - for example in example_elements: - example_text = example.get_text().strip() - if example_text: - examples.append(Example(text=example_text)) - - # 构建元数据 - metadata = { - "pronunciations": [p.model_dump() for p in pronunciations], - "parts_of_speech": pos_list, - "definitions": [d.model_dump() for d in definitions], - "examples": [e.model_dump() for e in examples] - } - - # 提取媒体信息 - media_info = [] - img_elements = soup.find_all('img') - for img in img_elements: - src = img.get('src', '') - if src: - media_info.append({ - 'type': 'image', - 'src': src - }) - - return metadata, media_info - - def save_dict_media(self, dict_media: Dict[str, bytes], media_references: List[Dict], entry_ids: Dict[str, int]) -> None: - """保存词典媒体文件到数据库""" - from mysql.connector import Error - import hashlib - - cursor = self.conn.cursor(dictionary=True) - - try: - for media_ref in media_references: - word = media_ref['word'] - filename = media_ref['filename'] - file_type = media_ref['type'] - - # 查找对应的 entry_id - entry_id = entry_ids.get(word) - if not entry_id: - continue - - # 查找文件数据 - # 处理文件名,确保与 dict_media 中的键匹配 - normalized_filename = filename.replace('\\', '/').lstrip('/') - file_data = dict_media.get(normalized_filename) - if not file_data: - # 尝试其他可能的文件名变体 - alt_filename = filename.lstrip('/') - file_data = dict_media.get(alt_filename) - if not file_data: - print(f"警告: 找不到媒体文件 {filename} 的数据") - continue - - # 计算文件哈希 - file_hash = hashlib.md5(file_data).hexdigest() - - # 检查数据库中是否已存在相同的文件 - cursor.execute(''' - SELECT id FROM dict_media - WHERE file_hash = %s AND file_type = %s - ''', (file_hash, file_type)) - existing_record = cursor.fetchone() - - if existing_record: - # 如果文件已存在,只需关联到当前词条 - media_id = existing_record['id'] - cursor.execute(''' - INSERT IGNORE INTO dict_entry_media (entry_id, media_id) - VALUES (%s, %s) - ''', (entry_id, media_id)) - else: - # 插入新文件记录 - cursor.execute(''' - INSERT INTO dict_media (filename, file_type, file_data, file_hash) - VALUES (%s, %s, %s, %s) - ''', (filename, file_type, file_data, file_hash)) - media_id = cursor.lastrowid - - # 关联到词条 - cursor.execute(''' - INSERT IGNORE INTO dict_entry_media (entry_id, media_id) - VALUES (%s, %s) - ''', (entry_id, media_id)) - - # 提取详细信息(如果有的话) - details = {} - if file_type == 'image': - # 对于图片,可以提取一些基本信息 - details['size'] = len(file_data) - # 这里可以添加更多图片处理逻辑 - - # 更新媒体记录的详细信息 - if details: - cursor.execute(''' - UPDATE dict_media - SET details = %s - WHERE id = %s - ''', (json.dumps(details), media_id)) - - except Error as e: - print(f"保存媒体文件时出错: {e}") - - self.conn.commit() - cursor.close() - - def export_media_files(self, export_dir: str) -> None: - """导出媒体文件到本地目录""" - cursor = self.conn.cursor(dictionary=True) - - try: - # 创建导出目录 - os.makedirs(export_dir, exist_ok=True) - - # 查询所有媒体文件 - cursor.execute('SELECT id, filename, file_type, file_data FROM dict_media') - media_records = cursor.fetchall() - - for record in media_records: - media_id, filename, file_type, file_data = record['id'], record['filename'], record['file_type'], record['file_data'] - if file_data: - # 确保文件名安全 - safe_filename = self._sanitize_filename(filename) - file_path = os.path.join(export_dir, safe_filename) - - # 写入文件 - with open(file_path, 'wb') as f: - f.write(file_data) - - print(f"导出媒体文件: {file_path}") - - except Error as e: - print(f"导出媒体文件时出错: {e}") - - cursor.close() - - def _sanitize_filename(self, filename: str) -> str: - """清理文件名,确保安全""" - # 移除或替换不安全的字符 - unsafe_chars = '<>:"/\\|?*' - for char in unsafe_chars: - filename = filename.replace(char, '_') - return filename - - def close(self): - """关闭数据库连接""" - if self.conn: - self.conn.close() - - -# 使用示例 -def main(): - # 数据库配置 - db_config = { - 'host': 'localhost', - 'database': 'mysql', - 'user': 'root', - 'password': 'root', - 'port': 3306 - } - - # 文件路径 - mdx_path = './LDOCE5.mdx' - mdd_path = './LDOCE5.mdd' # 可选 - - # 创建解析器实例 - parser = DictionaryParser(db_config) - - try: - # with open('./exported_media/kernel.html', 'r', encoding='utf-8') as file: - # html_str = file.read() - # de,image_info = parser.parse_definition_to_metadata(html_str) - # print(de) - - # 解析词典文件 - parser.parse_mdx_mdd(mdx_path, mdd_path) - - # 可选:导出媒体文件到本地目录 - # parser.export_media_files('./exported_media') - - except Error as e: - print(f"解析过程中出现错误: {e}") - finally: - parser.close() - - -if __name__ == "__main__": - main() diff --git a/assets/generate_and_save_coupons.py b/assets/generate_and_save_coupons.py deleted file mode 100644 index f8d02a5..0000000 --- a/assets/generate_and_save_coupons.py +++ /dev/null @@ -1,179 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Script to generate and save coupons to the database -""" - -import os -import sys -import random -from datetime import datetime, timedelta - -# Add the backend directory to the path so we can import modules -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) - -# Import required modules -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker -from sqlalchemy.exc import SQLAlchemyError - -from backend.app.admin.model.coupon import Coupon -from backend.utils.snowflake import snowflake -from backend.core.conf import settings, get_db_uri - - -def generate_coupon_codes(prefix: str, quantity: int): - """ - Generate coupon codes with specified prefix and quantity. - - Format: [PREFIX][NUMBER] - Total 6 characters - Example: A12345, TEST0, XYZ999 - - Args: - prefix (str): The letter prefix for the coupon codes (should be uppercase) - quantity (int): Number of coupon codes to generate - - Returns: - list: List of generated coupon codes - """ - if not prefix.isalpha() or not prefix.isupper(): - raise ValueError("Prefix must be uppercase letters only") - - if len(prefix) == 0 or len(prefix) > 5: - raise ValueError("Prefix must be 1-5 characters long") - - if quantity <= 0: - raise ValueError("Quantity must be greater than 0") - - # Calculate number of digits based on prefix length (total 6 characters) - num_digits = 6 - len(prefix) - - # Maximum possible combinations - max_combinations = 10 ** num_digits - - if quantity > max_combinations: - raise ValueError(f"With prefix '{prefix}' (length {len(prefix)}), can only generate {max_combinations} unique codes (0 to {max_combinations - 1})") - - codes = [] - # Generate incremental numbers starting from 0 - for i in range(quantity): - # Format with leading zeros to make it the required number of digits - formatted_number = f"{i:0{num_digits}d}" - # Combine prefix with formatted number - coupon_code = f"{prefix}{formatted_number}" - codes.append(coupon_code) - - return codes - - -def save_coupons_to_db(prefix: str, quantity: int, coupon_type: str, points: int, expire_days: int = None): - """ - Generate and save coupons to the database. - - Coupon codes are always 6 characters total: - - 1-letter prefix: 5 digits (up to 100000 codes: A00000-A99999) - - 4-letter prefix: 2 digits (up to 100 codes: TEST00-TEST99) - - 5-letter prefix: 1 digit (up to 10 codes: ABCDE0-ABCDE9) - - Args: - prefix (str): The letter prefix for the coupon codes - quantity (int): Number of coupon codes to generate - coupon_type (str): Type of the coupons - points (int): Points value of the coupons - expire_days (int, optional): Days until expiration. If None, no expiration. - """ - # Create database engine and session - db_url = get_db_uri(settings) - # Replace asyncmy with mysql+mysqlconnector for synchronous connection - sync_db_url = db_url.replace('mysql+asyncmy', 'mysql+mysqlconnector') - - try: - engine = create_engine(sync_db_url, echo=False) - SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) - db = SessionLocal() - - # Generate coupon codes - codes = generate_coupon_codes(prefix, quantity) - - # Create coupon objects - coupons = [] - for code in codes: - # Generate snowflake ID - coupon_id = snowflake.generate() - - # Calculate expiration date if needed - expires_at = None - if expire_days is not None and expire_days > 0: - expires_at = datetime.now() + timedelta(days=expire_days) - - # Create coupon object - # Note: id is auto-generated by snowflake, but we want to use our own snowflake generator - coupon = Coupon( - code=code, - type=coupon_type, - points=points, - expires_at=expires_at - ) - # Set the id manually after creation - coupon.id = coupon_id - coupons.append(coupon) - - # Bulk insert coupons - db.add_all(coupons) - db.commit() - - print(f"Successfully saved {len(coupons)} coupons to the database.") - print(f"Prefix: {prefix}, Type: {coupon_type}, Points: {points}") - if expire_days: - print(f"Expires in: {expire_days} days") - - # Display first 5 coupons as examples - print("\nSample coupons generated:") - for coupon in coupons[:5]: - print(f" ID: {coupon.id}, Code: {coupon.code}") - - db.close() - - except SQLAlchemyError as e: - print(f"Database error: {e}") - if 'db' in locals(): - db.rollback() - db.close() - except Exception as e: - print(f"Error: {e}") - if 'db' in locals(): - db.close() - - -def main(): - """Main function to demonstrate usage""" - print("Coupon Generator and Database Saver") - print("=" * 40) - - # Example: Generate and save coupons with different prefixes - try: - # Single character prefix (5 digits, incremental from 00000) - # print("Generating coupons with single character prefix 'A'...") - # save_coupons_to_db('A', 5, 'NORMAL', 100, 30) - # print("\n" + "-" * 40 + "\n") - - # 4-character prefix (2 digits, incremental from 00) - print("Generating coupons with 4-character prefix 'TEST'...") - save_coupons_to_db('VIP', 5, 'test', 1000, 60) - print("\n" + "-" * 40 + "\n") - - # 3-character prefix (3 digits, incremental from 000) - # print("Generating coupons with 3-character prefix 'XYZ'...") - # save_coupons_to_db('XYZ', 3, 'SPECIAL', 500, 15) - # print("\n" + "-" * 40 + "\n") - - # 5-character prefix (1 digit, incremental from 0) - # print("Generating coupons with 5-character prefix 'ABCDE'...") - # save_coupons_to_db('ABCDE', 5, 'PREMIUM', 2000, 90) - - except Exception as e: - print(f"Error in main: {e}") - - -if __name__ == "__main__": - main() \ No newline at end of file