import base64 import os import re import mysql.connector import hashlib from typing import List, Tuple, Dict, Optional, Any from readmdict import MDX, MDD from bs4 import BeautifulSoup, Tag import json from mysql.connector import Error from backend.app.admin.schema.dict import Example, Frequency, Pronunciation, FamilyItem, WordFamily, \ WordMetaData, Sense, Definition, Topic, CrossReference, DictEntry, Etymology, EtymologyItem class DictionaryParser: def __init__(self, db_config: Dict): """初始化数据库连接""" self.db_config = db_config self.conn = None self.connect_db() def connect_db(self): """连接到MySQL数据库""" try: self.conn = mysql.connector.connect(**self.db_config) except Error as e: print(f"数据库连接失败: {e}") raise def parse_mdx_mdd(self, mdx_path: str, mdd_path: str = None) -> None: """解析MDX和MDD文件""" try: # 解析MDX文件 entries, media_references = self.parse_mdx_file_mdict(mdx_path) # 保存词汇条目 entry_ids = self.save_entries(entries) # 如果有MDD文件,解析媒体文件 if mdd_path and os.path.exists(mdd_path): self.parse_mdd_file(mdd_path, media_references, entry_ids) else: print("未提供MDD文件或文件不存在") print(f"解析完成,共处理 {len(entries)} 个词汇条目") except Error as e: print(f"解析词典文件失败: {e}") raise def parse_mdx_file_mdict(self, mdx_path: str) -> Tuple[List[Tuple[str, str]], List[Dict]]: """使用 mdict_reader 解析 MDX 文件""" print(f"正在解析MDX文件: {mdx_path}") try: mdx = MDX(mdx_path) entries = [] media_references = [] for key, value in mdx.items(): word = key.decode('utf-8') if isinstance(key, bytes) else str(key) definition = value.decode('utf-8') if isinstance(value, bytes) else str(value) if word and definition: entries.append((word, definition)) # 提取媒体文件引用 media_refs = self.extract_media_references(definition, word) media_references.extend(media_refs) return entries, media_references except Error as e: print(f"解析MDX文件失败: {e}") raise def parse_mdd_file(self, mdd_path: str, media_references: List[Dict], entry_ids: Dict[str, int]) -> None: """解析MDD文件中的媒体资源 - 使用 mdict_reader""" print(f"正在解析MDD文件: {mdd_path}") try: # 使用 mdict_reader 解析 MDD 文件 mdd = MDD(mdd_path) # 创建文件名到媒体数据的映射 dict_media = {} for key, value in mdd.items(): filename = key.decode('utf-8') if isinstance(key, bytes) else str(key) # 确保文件名格式统一 filename = filename.replace('\\', '/').lstrip('/') dict_media[filename] = value # 保存媒体文件 self.save_dict_media(dict_media, media_references, entry_ids) except Error as e: print(f"解析MDD文件失败: {e}") raise def extract_media_references(self, definition: str, word: str) -> List[Dict]: """从定义中提取媒体文件引用""" media_refs = [] # 提取音频文件引用 - 更通用的模式,匹配 sound:// 或 href="sound://..." # 这个模式应该能覆盖 aeroplane.txt 中的 sound://media/english/... 链接 audio_patterns = [ r'sound://([^"\s>]+\.mp3)', # 直接 sound:// 开头,后跟非空格/"/>字符直到 .mp3 r'href\s*=\s*["\']sound://([^"\'>]+\.mp3)["\']', # href="sound://..." r'href\s*=\s*["\']sound://([^"\'>]+)["\']', # 更宽松的 href="sound://...",不一定以.mp3结尾 r'data-src-mp3\s*=\s*["\']sound://([^"\'>]+\.mp3)["\']', # data-src-mp3="sound://..." r'data-src-mp3\s*=\s*["\']([^"\'>]+\.mp3)["\']', # data-src-mp3="..." (相对路径) r'audio\s*=\s*["\']([^"\']+)["\']', # audio="..." ] for pattern in audio_patterns: matches = re.findall(pattern, definition, re.IGNORECASE) for match in matches: # 清理可能的多余字符(如结尾的引号或空格,虽然正则应该已经避免了) clean_filename = match.strip()#.rstrip('"\'') if clean_filename: media_refs.append({ 'filename': clean_filename, 'type': 'audio', 'word': word }) # 提取图片文件引用 image_patterns = [ r']*src\s*=\s*["\']([^"\']+\.(?:jpg|jpeg|png|gif|bmp))["\']', # src="..." r'\[image:([^\]]+\.(?:jpg|jpeg|png|gif|bmp))\]', # [image:...] r'src\s*=\s*["\']([^"\']+\.(?:jpg|jpeg|png|gif|bmp))["\']' # 更宽松的 src="..." ] for pattern in image_patterns: matches = re.findall(pattern, definition, re.IGNORECASE) for match in matches: # 清理可能的多余字符 clean_filename = match.strip()#.rstrip('"\'') if clean_filename: media_refs.append({ 'filename': clean_filename, 'type': 'image', 'word': word }) return media_refs def save_entries(self, entries: List[Tuple[str, str]]) -> Dict[str, int]: """保存词汇条目到数据库,并更新 details 字段""" from mysql.connector import Error import hashlib cursor = self.conn.cursor(dictionary=True) entry_ids = {} for word, definition in entries: try: # 检查数据库中是否已存在该词条 cursor.execute('SELECT id, definition, details FROM dict_entry WHERE word = %s', (word,)) existing_record = cursor.fetchone() metadata = None existing_details = None final_definition = definition # 默认使用当前 definition # 如果存在现有记录 if existing_record: entry_id, existing_definition, existing_details_json = existing_record['id'], existing_record['definition'], existing_record['details'] # 获取现有的 details if existing_details_json: try: existing_details = WordMetaData(**existing_details_json) except: existing_details = None # 如果当前 definition 是以 @@@ 开头的引用链接 if definition.startswith('@@@'): # 保留现有的 definition,只更新 details 中的 ref_link final_definition = existing_definition # 保持原有的 definition # 提取新的 @@@ 链接 lines = definition.split('\n') new_ref_links = [] for line in lines: if line.startswith('@@@'): link = line[3:].strip() if link: new_ref_links.append(link) else: break # 合并链接信息 if new_ref_links: if existing_details: # 如果已有 details,合并 ref_link if existing_details.ref_link: # 合并现有链接和新链接,去重但保持顺序 combined_links = existing_details.ref_link[:] for link in new_ref_links: if link not in combined_links: combined_links.append(link) else: combined_links = new_ref_links else: combined_links = new_ref_links # 更新 details if existing_details: metadata = existing_details.model_copy(update={"ref_link": combined_links}) else: metadata = WordMetaData(ref_link=combined_links) # 如果是新词条或需要更新 details if not existing_record or metadata: # 如果是新词条,创建默认 metadata if not existing_record: metadata = WordMetaData() # 准备 details 数据 details_dict = metadata.model_dump() if metadata else None if existing_record: # 更新现有记录 cursor.execute(''' UPDATE dict_entry SET definition = %s, details = %s WHERE word = %s ''', (final_definition, json.dumps(details_dict) if details_dict else None, word)) entry_id = existing_record['id'] else: # 插入新记录 cursor.execute(''' INSERT INTO dict_entry (word, definition, details) VALUES (%s, %s, %s) ''', (word, final_definition, json.dumps(details_dict) if details_dict else None)) entry_id = cursor.lastrowid else: # 保持现有记录不变 entry_id = existing_record['id'] entry_ids[word] = entry_id except Error as e: print(f"保存词条 '{word}' 时出错: {e}") self.conn.commit() cursor.close() return entry_ids def parse_definition_to_metadata(self, html_str: str) -> Tuple[Dict, List[Dict]]: """解析HTML定义字符串,提取元数据""" soup = BeautifulSoup(html_str, 'html.parser') # 提取发音 pronunciations = [] pron_links = soup.find_all('a', class_='pronounce') for link in pron_links: pron_type = link.get('data-rel', '') pron_url = link.get('href', '') if pron_type and pron_url: pronunciations.append(Pronunciation(type=pron_type, url=pron_url)) # 提取词性 pos_elements = soup.find_all('span', class_='pos') pos_list = [pos.get_text().strip() for pos in pos_elements] # 提取释义 definitions = [] sense_elements = soup.find_all('span', class_='def') for sense in sense_elements: definition_text = sense.get_text().strip() if definition_text: definitions.append(Definition(text=definition_text)) # 提取例句 examples = [] example_elements = soup.find_all('span', class_='example') for example in example_elements: example_text = example.get_text().strip() if example_text: examples.append(Example(text=example_text)) # 构建元数据 metadata = { "pronunciations": [p.model_dump() for p in pronunciations], "parts_of_speech": pos_list, "definitions": [d.model_dump() for d in definitions], "examples": [e.model_dump() for e in examples] } # 提取媒体信息 media_info = [] img_elements = soup.find_all('img') for img in img_elements: src = img.get('src', '') if src: media_info.append({ 'type': 'image', 'src': src }) return metadata, media_info def save_dict_media(self, dict_media: Dict[str, bytes], media_references: List[Dict], entry_ids: Dict[str, int]) -> None: """保存词典媒体文件到数据库""" from mysql.connector import Error import hashlib cursor = self.conn.cursor(dictionary=True) try: for media_ref in media_references: word = media_ref['word'] filename = media_ref['filename'] file_type = media_ref['type'] # 查找对应的 entry_id entry_id = entry_ids.get(word) if not entry_id: continue # 查找文件数据 # 处理文件名,确保与 dict_media 中的键匹配 normalized_filename = filename.replace('\\', '/').lstrip('/') file_data = dict_media.get(normalized_filename) if not file_data: # 尝试其他可能的文件名变体 alt_filename = filename.lstrip('/') file_data = dict_media.get(alt_filename) if not file_data: print(f"警告: 找不到媒体文件 {filename} 的数据") continue # 计算文件哈希 file_hash = hashlib.md5(file_data).hexdigest() # 检查数据库中是否已存在相同的文件 cursor.execute(''' SELECT id FROM dict_media WHERE file_hash = %s AND file_type = %s ''', (file_hash, file_type)) existing_record = cursor.fetchone() if existing_record: # 如果文件已存在,只需关联到当前词条 media_id = existing_record['id'] cursor.execute(''' INSERT IGNORE INTO dict_entry_media (entry_id, media_id) VALUES (%s, %s) ''', (entry_id, media_id)) else: # 插入新文件记录 cursor.execute(''' INSERT INTO dict_media (filename, file_type, file_data, file_hash) VALUES (%s, %s, %s, %s) ''', (filename, file_type, file_data, file_hash)) media_id = cursor.lastrowid # 关联到词条 cursor.execute(''' INSERT IGNORE INTO dict_entry_media (entry_id, media_id) VALUES (%s, %s) ''', (entry_id, media_id)) # 提取详细信息(如果有的话) details = {} if file_type == 'image': # 对于图片,可以提取一些基本信息 details['size'] = len(file_data) # 这里可以添加更多图片处理逻辑 # 更新媒体记录的详细信息 if details: cursor.execute(''' UPDATE dict_media SET details = %s WHERE id = %s ''', (json.dumps(details), media_id)) except Error as e: print(f"保存媒体文件时出错: {e}") self.conn.commit() cursor.close() def export_media_files(self, export_dir: str) -> None: """导出媒体文件到本地目录""" cursor = self.conn.cursor(dictionary=True) try: # 创建导出目录 os.makedirs(export_dir, exist_ok=True) # 查询所有媒体文件 cursor.execute('SELECT id, filename, file_type, file_data FROM dict_media') media_records = cursor.fetchall() for record in media_records: media_id, filename, file_type, file_data = record['id'], record['filename'], record['file_type'], record['file_data'] if file_data: # 确保文件名安全 safe_filename = self._sanitize_filename(filename) file_path = os.path.join(export_dir, safe_filename) # 写入文件 with open(file_path, 'wb') as f: f.write(file_data) print(f"导出媒体文件: {file_path}") except Error as e: print(f"导出媒体文件时出错: {e}") cursor.close() def _sanitize_filename(self, filename: str) -> str: """清理文件名,确保安全""" # 移除或替换不安全的字符 unsafe_chars = '<>:"/\\|?*' for char in unsafe_chars: filename = filename.replace(char, '_') return filename def close(self): """关闭数据库连接""" if self.conn: self.conn.close() # 使用示例 def main(): # 数据库配置 db_config = { 'host': 'localhost', 'database': 'mysql', 'user': 'root', 'password': 'root', 'port': 3306 } # 文件路径 mdx_path = './LDOCE5.mdx' mdd_path = './LDOCE5.mdd' # 可选 # 创建解析器实例 parser = DictionaryParser(db_config) try: # with open('./exported_media/kernel.html', 'r', encoding='utf-8') as file: # html_str = file.read() # de,image_info = parser.parse_definition_to_metadata(html_str) # print(de) # 解析词典文件 parser.parse_mdx_mdd(mdx_path, mdd_path) # 可选:导出媒体文件到本地目录 # parser.export_media_files('./exported_media') except Error as e: print(f"解析过程中出现错误: {e}") finally: parser.close() if __name__ == "__main__": main()