Files
backend/assets/dict/dictionary_parser.py
2025-11-22 10:26:30 +08:00

470 lines
18 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import os
import re
import mysql.connector
import hashlib
from typing import List, Tuple, Dict, Optional, Any
from readmdict import MDX, MDD
from bs4 import BeautifulSoup, Tag
import json
from mysql.connector import Error
from backend.app.admin.schema.dict import Example, Frequency, Pronunciation, FamilyItem, WordFamily, \
WordMetaData, Sense, Definition, Topic, CrossReference, DictEntry, Etymology, EtymologyItem
class DictionaryParser:
def __init__(self, db_config: Dict):
"""初始化数据库连接"""
self.db_config = db_config
self.conn = None
self.connect_db()
def connect_db(self):
"""连接到MySQL数据库"""
try:
self.conn = mysql.connector.connect(**self.db_config)
except Error as e:
print(f"数据库连接失败: {e}")
raise
def parse_mdx_mdd(self, mdx_path: str, mdd_path: str = None) -> None:
"""解析MDX和MDD文件"""
try:
# 解析MDX文件
entries, media_references = self.parse_mdx_file_mdict(mdx_path)
# 保存词汇条目
entry_ids = self.save_entries(entries)
# 如果有MDD文件解析媒体文件
if mdd_path and os.path.exists(mdd_path):
self.parse_mdd_file(mdd_path, media_references, entry_ids)
else:
print("未提供MDD文件或文件不存在")
print(f"解析完成,共处理 {len(entries)} 个词汇条目")
except Error as e:
print(f"解析词典文件失败: {e}")
raise
def parse_mdx_file_mdict(self, mdx_path: str) -> Tuple[List[Tuple[str, str]], List[Dict]]:
"""使用 mdict_reader 解析 MDX 文件"""
print(f"正在解析MDX文件: {mdx_path}")
try:
mdx = MDX(mdx_path)
entries = []
media_references = []
for key, value in mdx.items():
word = key.decode('utf-8') if isinstance(key, bytes) else str(key)
definition = value.decode('utf-8') if isinstance(value, bytes) else str(value)
if word and definition:
entries.append((word, definition))
# 提取媒体文件引用
media_refs = self.extract_media_references(definition, word)
media_references.extend(media_refs)
return entries, media_references
except Error as e:
print(f"解析MDX文件失败: {e}")
raise
def parse_mdd_file(self, mdd_path: str, media_references: List[Dict], entry_ids: Dict[str, int]) -> None:
"""解析MDD文件中的媒体资源 - 使用 mdict_reader"""
print(f"正在解析MDD文件: {mdd_path}")
try:
# 使用 mdict_reader 解析 MDD 文件
mdd = MDD(mdd_path)
# 创建文件名到媒体数据的映射
dict_media = {}
for key, value in mdd.items():
filename = key.decode('utf-8') if isinstance(key, bytes) else str(key)
# 确保文件名格式统一
filename = filename.replace('\\', '/').lstrip('/')
dict_media[filename] = value
# 保存媒体文件
self.save_dict_media(dict_media, media_references, entry_ids)
except Error as e:
print(f"解析MDD文件失败: {e}")
raise
def extract_media_references(self, definition: str, word: str) -> List[Dict]:
"""从定义中提取媒体文件引用"""
media_refs = []
# 提取音频文件引用 - 更通用的模式,匹配 sound:// 或 href="sound://..."
# 这个模式应该能覆盖 aeroplane.txt 中的 sound://media/english/... 链接
audio_patterns = [
r'sound://([^"\s>]+\.mp3)', # 直接 sound:// 开头,后跟非空格/"/>字符直到 .mp3
r'href\s*=\s*["\']sound://([^"\'>]+\.mp3)["\']', # href="sound://..."
r'href\s*=\s*["\']sound://([^"\'>]+)["\']', # 更宽松的 href="sound://...",不一定以.mp3结尾
r'data-src-mp3\s*=\s*["\']sound://([^"\'>]+\.mp3)["\']', # data-src-mp3="sound://..."
r'data-src-mp3\s*=\s*["\']([^"\'>]+\.mp3)["\']', # data-src-mp3="..." (相对路径)
r'audio\s*=\s*["\']([^"\']+)["\']', # audio="..."
]
for pattern in audio_patterns:
matches = re.findall(pattern, definition, re.IGNORECASE)
for match in matches:
# 清理可能的多余字符(如结尾的引号或空格,虽然正则应该已经避免了)
clean_filename = match.strip()#.rstrip('"\'')
if clean_filename:
media_refs.append({
'filename': clean_filename,
'type': 'audio',
'word': word
})
# 提取图片文件引用
image_patterns = [
r'<img[^>]*src\s*=\s*["\']([^"\']+\.(?:jpg|jpeg|png|gif|bmp))["\']', # src="..."
r'\[image:([^\]]+\.(?:jpg|jpeg|png|gif|bmp))\]', # [image:...]
r'src\s*=\s*["\']([^"\']+\.(?:jpg|jpeg|png|gif|bmp))["\']' # 更宽松的 src="..."
]
for pattern in image_patterns:
matches = re.findall(pattern, definition, re.IGNORECASE)
for match in matches:
# 清理可能的多余字符
clean_filename = match.strip()#.rstrip('"\'')
if clean_filename:
media_refs.append({
'filename': clean_filename,
'type': 'image',
'word': word
})
return media_refs
def save_entries(self, entries: List[Tuple[str, str]]) -> Dict[str, int]:
"""保存词汇条目到数据库,并更新 details 字段"""
from mysql.connector import Error
import hashlib
cursor = self.conn.cursor(dictionary=True)
entry_ids = {}
for word, definition in entries:
try:
# 检查数据库中是否已存在该词条
cursor.execute('SELECT id, definition, details FROM dict_entry WHERE word = %s', (word,))
existing_record = cursor.fetchone()
metadata = None
existing_details = None
final_definition = definition # 默认使用当前 definition
# 如果存在现有记录
if existing_record:
entry_id, existing_definition, existing_details_json = existing_record['id'], existing_record['definition'], existing_record['details']
# 获取现有的 details
if existing_details_json:
try:
existing_details = WordMetaData(**existing_details_json)
except:
existing_details = None
# 如果当前 definition 是以 @@@ 开头的引用链接
if definition.startswith('@@@'):
# 保留现有的 definition只更新 details 中的 ref_link
final_definition = existing_definition # 保持原有的 definition
# 提取新的 @@@ 链接
lines = definition.split('\n')
new_ref_links = []
for line in lines:
if line.startswith('@@@'):
link = line[3:].strip()
if link:
new_ref_links.append(link)
else:
break
# 合并链接信息
if new_ref_links:
if existing_details:
# 如果已有 details合并 ref_link
if existing_details.ref_link:
# 合并现有链接和新链接,去重但保持顺序
combined_links = existing_details.ref_link[:]
for link in new_ref_links:
if link not in combined_links:
combined_links.append(link)
else:
combined_links = new_ref_links
else:
combined_links = new_ref_links
# 更新 details
if existing_details:
metadata = existing_details.model_copy(update={"ref_link": combined_links})
else:
metadata = WordMetaData(ref_link=combined_links)
# 如果是新词条或需要更新 details
if not existing_record or metadata:
# 如果是新词条,创建默认 metadata
if not existing_record:
metadata = WordMetaData()
# 准备 details 数据
details_dict = metadata.model_dump() if metadata else None
if existing_record:
# 更新现有记录
cursor.execute('''
UPDATE dict_entry
SET definition = %s, details = %s
WHERE word = %s
''', (final_definition, json.dumps(details_dict) if details_dict else None, word))
entry_id = existing_record['id']
else:
# 插入新记录
cursor.execute('''
INSERT INTO dict_entry (word, definition, details)
VALUES (%s, %s, %s)
''', (word, final_definition, json.dumps(details_dict) if details_dict else None))
entry_id = cursor.lastrowid
else:
# 保持现有记录不变
entry_id = existing_record['id']
entry_ids[word] = entry_id
except Error as e:
print(f"保存词条 '{word}' 时出错: {e}")
self.conn.commit()
cursor.close()
return entry_ids
def parse_definition_to_metadata(self, html_str: str) -> Tuple[Dict, List[Dict]]:
"""解析HTML定义字符串提取元数据"""
soup = BeautifulSoup(html_str, 'html.parser')
# 提取发音
pronunciations = []
pron_links = soup.find_all('a', class_='pronounce')
for link in pron_links:
pron_type = link.get('data-rel', '')
pron_url = link.get('href', '')
if pron_type and pron_url:
pronunciations.append(Pronunciation(type=pron_type, url=pron_url))
# 提取词性
pos_elements = soup.find_all('span', class_='pos')
pos_list = [pos.get_text().strip() for pos in pos_elements]
# 提取释义
definitions = []
sense_elements = soup.find_all('span', class_='def')
for sense in sense_elements:
definition_text = sense.get_text().strip()
if definition_text:
definitions.append(Definition(text=definition_text))
# 提取例句
examples = []
example_elements = soup.find_all('span', class_='example')
for example in example_elements:
example_text = example.get_text().strip()
if example_text:
examples.append(Example(text=example_text))
# 构建元数据
metadata = {
"pronunciations": [p.model_dump() for p in pronunciations],
"parts_of_speech": pos_list,
"definitions": [d.model_dump() for d in definitions],
"examples": [e.model_dump() for e in examples]
}
# 提取媒体信息
media_info = []
img_elements = soup.find_all('img')
for img in img_elements:
src = img.get('src', '')
if src:
media_info.append({
'type': 'image',
'src': src
})
return metadata, media_info
def save_dict_media(self, dict_media: Dict[str, bytes], media_references: List[Dict], entry_ids: Dict[str, int]) -> None:
"""保存词典媒体文件到数据库"""
from mysql.connector import Error
import hashlib
cursor = self.conn.cursor(dictionary=True)
try:
for media_ref in media_references:
word = media_ref['word']
filename = media_ref['filename']
file_type = media_ref['type']
# 查找对应的 entry_id
entry_id = entry_ids.get(word)
if not entry_id:
continue
# 查找文件数据
# 处理文件名,确保与 dict_media 中的键匹配
normalized_filename = filename.replace('\\', '/').lstrip('/')
file_data = dict_media.get(normalized_filename)
if not file_data:
# 尝试其他可能的文件名变体
alt_filename = filename.lstrip('/')
file_data = dict_media.get(alt_filename)
if not file_data:
print(f"警告: 找不到媒体文件 {filename} 的数据")
continue
# 计算文件哈希
file_hash = hashlib.md5(file_data).hexdigest()
# 检查数据库中是否已存在相同的文件
cursor.execute('''
SELECT id FROM dict_media
WHERE file_hash = %s AND file_type = %s
''', (file_hash, file_type))
existing_record = cursor.fetchone()
if existing_record:
# 如果文件已存在,只需关联到当前词条
media_id = existing_record['id']
cursor.execute('''
INSERT IGNORE INTO dict_entry_media (entry_id, media_id)
VALUES (%s, %s)
''', (entry_id, media_id))
else:
# 插入新文件记录
cursor.execute('''
INSERT INTO dict_media (filename, file_type, file_data, file_hash)
VALUES (%s, %s, %s, %s)
''', (filename, file_type, file_data, file_hash))
media_id = cursor.lastrowid
# 关联到词条
cursor.execute('''
INSERT IGNORE INTO dict_entry_media (entry_id, media_id)
VALUES (%s, %s)
''', (entry_id, media_id))
# 提取详细信息(如果有的话)
details = {}
if file_type == 'image':
# 对于图片,可以提取一些基本信息
details['size'] = len(file_data)
# 这里可以添加更多图片处理逻辑
# 更新媒体记录的详细信息
if details:
cursor.execute('''
UPDATE dict_media
SET details = %s
WHERE id = %s
''', (json.dumps(details), media_id))
except Error as e:
print(f"保存媒体文件时出错: {e}")
self.conn.commit()
cursor.close()
def export_media_files(self, export_dir: str) -> None:
"""导出媒体文件到本地目录"""
cursor = self.conn.cursor(dictionary=True)
try:
# 创建导出目录
os.makedirs(export_dir, exist_ok=True)
# 查询所有媒体文件
cursor.execute('SELECT id, filename, file_type, file_data FROM dict_media')
media_records = cursor.fetchall()
for record in media_records:
media_id, filename, file_type, file_data = record['id'], record['filename'], record['file_type'], record['file_data']
if file_data:
# 确保文件名安全
safe_filename = self._sanitize_filename(filename)
file_path = os.path.join(export_dir, safe_filename)
# 写入文件
with open(file_path, 'wb') as f:
f.write(file_data)
print(f"导出媒体文件: {file_path}")
except Error as e:
print(f"导出媒体文件时出错: {e}")
cursor.close()
def _sanitize_filename(self, filename: str) -> str:
"""清理文件名,确保安全"""
# 移除或替换不安全的字符
unsafe_chars = '<>:"/\\|?*'
for char in unsafe_chars:
filename = filename.replace(char, '_')
return filename
def close(self):
"""关闭数据库连接"""
if self.conn:
self.conn.close()
# 使用示例
def main():
# 数据库配置
db_config = {
'host': 'localhost',
'database': 'mysql',
'user': 'root',
'password': 'root',
'port': 3306
}
# 文件路径
mdx_path = './LDOCE5.mdx'
mdd_path = './LDOCE5.mdd' # 可选
# 创建解析器实例
parser = DictionaryParser(db_config)
try:
# with open('./exported_media/kernel.html', 'r', encoding='utf-8') as file:
# html_str = file.read()
# de,image_info = parser.parse_definition_to_metadata(html_str)
# print(de)
# 解析词典文件
parser.parse_mdx_mdd(mdx_path, mdd_path)
# 可选:导出媒体文件到本地目录
# parser.export_media_files('./exported_media')
except Error as e:
print(f"解析过程中出现错误: {e}")
finally:
parser.close()
if __name__ == "__main__":
main()