Files
backend/assets/dict/dictionary_parser.py
2025-10-18 10:54:08 +08:00

1145 lines
57 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import os
import re
import psycopg2
import hashlib
from typing import List, Tuple, Dict, Optional, Any
from readmdict import MDX, MDD
from bs4 import BeautifulSoup, Tag
import json
from backend.app.admin.schema.dict import Example, Frequency, Pronunciation, FamilyItem, WordFamily, \
WordMetaData, Sense, Definition, Topic, CrossReference, DictEntry, Etymology, EtymologyItem
class DictionaryParser:
def __init__(self, db_config: Dict):
"""初始化数据库连接"""
self.db_config = db_config
self.conn = None
self.connect_db()
def connect_db(self):
"""连接到PostgreSQL数据库"""
try:
self.conn = psycopg2.connect(**self.db_config)
except Exception as e:
print(f"数据库连接失败: {e}")
raise
def parse_mdx_mdd(self, mdx_path: str, mdd_path: str = None) -> None:
"""解析MDX和MDD文件"""
try:
# 解析MDX文件
entries, media_references = self.parse_mdx_file_mdict(mdx_path)
# 保存词汇条目
entry_ids = self.save_entries(entries)
# 如果有MDD文件解析媒体文件
if mdd_path and os.path.exists(mdd_path):
self.parse_mdd_file(mdd_path, media_references, entry_ids)
else:
print("未提供MDD文件或文件不存在")
print(f"解析完成,共处理 {len(entries)} 个词汇条目")
except Exception as e:
print(f"解析词典文件失败: {e}")
raise
def parse_mdx_file_mdict(self, mdx_path: str) -> Tuple[List[Tuple[str, str]], List[Dict]]:
"""使用 mdict_reader 解析 MDX 文件"""
print(f"正在解析MDX文件: {mdx_path}")
try:
mdx = MDX(mdx_path)
entries = []
media_references = []
for key, value in mdx.items():
word = key.decode('utf-8') if isinstance(key, bytes) else str(key)
definition = value.decode('utf-8') if isinstance(value, bytes) else str(value)
if word and definition:
entries.append((word, definition))
# 提取媒体文件引用
media_refs = self.extract_media_references(definition, word)
media_references.extend(media_refs)
return entries, media_references
except Exception as e:
print(f"解析MDX文件失败: {e}")
raise
def parse_mdd_file(self, mdd_path: str, media_references: List[Dict], entry_ids: Dict[str, int]) -> None:
"""解析MDD文件中的媒体资源 - 使用 mdict_reader"""
print(f"正在解析MDD文件: {mdd_path}")
try:
# 使用 mdict_reader 解析 MDD 文件
mdd = MDD(mdd_path)
# 创建文件名到媒体数据的映射
dict_media = {}
for key, value in mdd.items():
filename = key.decode('utf-8') if isinstance(key, bytes) else str(key)
# 确保文件名格式统一
filename = filename.replace('\\', '/').lstrip('/')
dict_media[filename] = value
# 保存媒体文件
self.save_dict_media(dict_media, media_references, entry_ids)
except Exception as e:
print(f"解析MDD文件失败: {e}")
raise
def extract_media_references(self, definition: str, word: str) -> List[Dict]:
"""从定义中提取媒体文件引用"""
media_refs = []
# 提取音频文件引用 - 更通用的模式,匹配 sound:// 或 href="sound://..."
# 这个模式应该能覆盖 aeroplane.txt 中的 sound://media/english/... 链接
audio_patterns = [
r'sound://([^"\s>]+\.mp3)', # 直接 sound:// 开头,后跟非空格/"/>字符直到 .mp3
r'href\s*=\s*["\']sound://([^"\'>]+\.mp3)["\']', # href="sound://..."
r'href\s*=\s*["\']sound://([^"\'>]+)["\']', # 更宽松的 href="sound://...",不一定以.mp3结尾
r'data-src-mp3\s*=\s*["\']sound://([^"\'>]+\.mp3)["\']', # data-src-mp3="sound://..."
r'data-src-mp3\s*=\s*["\']([^"\'>]+\.mp3)["\']', # data-src-mp3="..." (相对路径)
r'audio\s*=\s*["\']([^"\']+)["\']', # audio="..."
]
for pattern in audio_patterns:
matches = re.findall(pattern, definition, re.IGNORECASE)
for match in matches:
# 清理可能的多余字符(如结尾的引号或空格,虽然正则应该已经避免了)
clean_filename = match.strip()#.rstrip('"\'')
if clean_filename:
media_refs.append({
'filename': clean_filename,
'type': 'audio',
'word': word
})
# 提取图片文件引用
image_patterns = [
r'<img[^>]*src\s*=\s*["\']([^"\']+\.(?:jpg|jpeg|png|gif|bmp))["\']', # src="..."
r'\[image:([^\]]+\.(?:jpg|jpeg|png|gif|bmp))\]', # [image:...]
r'src\s*=\s*["\']([^"\']+\.(?:jpg|jpeg|png|gif|bmp))["\']' # 更宽松的 src="..."
]
for pattern in image_patterns:
matches = re.findall(pattern, definition, re.IGNORECASE)
for match in matches:
# 清理可能的多余字符
clean_filename = match.strip()#.rstrip('"\'')
if clean_filename:
media_refs.append({
'filename': clean_filename,
'type': 'image',
'word': word
})
return media_refs
def save_entries(self, entries: List[Tuple[str, str]]) -> Dict[str, int]:
"""保存词汇条目到数据库,并更新 details 字段"""
from psycopg2.extras import Json
import hashlib
cursor = self.conn.cursor()
entry_ids = {}
for word, definition in entries:
try:
# 检查数据库中是否已存在该词条
cursor.execute('SELECT id, definition, details FROM dict_entry WHERE word = %s', (word,))
existing_record = cursor.fetchone()
metadata = None
existing_details = None
final_definition = definition # 默认使用当前 definition
# 如果存在现有记录
if existing_record:
entry_id, existing_definition, existing_details_json = existing_record
# 获取现有的 details
if existing_details_json:
try:
existing_details = WordMetaData(**existing_details_json)
except:
existing_details = None
# 如果当前 definition 是以 @@@ 开头的引用链接
if definition.startswith('@@@'):
# 保留现有的 definition只更新 details 中的 ref_link
final_definition = existing_definition # 保持原有的 definition
# 提取新的 @@@ 链接
lines = definition.split('\n')
new_ref_links = []
for line in lines:
if line.startswith('@@@'):
link = line[3:].strip()
if link:
new_ref_links.append(link)
else:
break
# 合并链接信息
if new_ref_links:
if existing_details:
# 如果已有 details合并 ref_link
if existing_details.ref_link:
# 合并现有链接和新链接,去重但保持顺序
combined_links = existing_details.ref_link[:]
for link in new_ref_links:
if link not in combined_links:
combined_links.append(link)
existing_details.ref_link = combined_links
else:
existing_details.ref_link = new_ref_links
metadata = existing_details
else:
# 如果没有现有 details创建新的 metadata
metadata = WordMetaData()
metadata.ref_link = new_ref_links
# 保留现有的 metadata
elif existing_details:
metadata = existing_details
else:
# 如果当前 definition 不是 @@@ 开头,则正常更新 definition 和解析 HTML
final_definition = definition
# 解析 HTML 内容获取 metadata 信息
html_metadata, images_info1 = self.parse_definition_to_metadata(definition)
if images_info1:
self.save_entry_images(entry_id, word, images_info1)
# 合并 metadata 信息
if html_metadata:
if existing_details:
# 保留现有的 ref_link合并其他字段
html_metadata.ref_link = existing_details.ref_link
metadata = html_metadata
# 提取并处理图片信息
images_info = self.extract_images_from_definition(definition, word)
if images_info:
self.save_entry_images(entry_id, word, images_info)
else:
# 新词条,正常处理
if definition.startswith('@@@'):
# 处理 @@@ 开头的引用链接
lines = definition.split('\n')
ref_links = []
for line in lines:
if line.startswith('@@@'):
link = line[3:].strip()
if link:
ref_links.append(link)
else:
break
if ref_links:
metadata = WordMetaData()
metadata.ref_link = ref_links
else:
# 解析 HTML 内容
html_metadata, images_info1 = self.parse_definition_to_metadata(definition)
metadata = html_metadata
# 提取并处理图片信息
images_info = self.extract_images_from_definition(definition, word)
if images_info or images_info1:
# 先插入词条获取 entry_id
cursor.execute('''
INSERT INTO dict_entry (word, definition, details)
VALUES (%s, %s, %s) RETURNING id
''', (word, definition, Json(metadata.model_dump()) if metadata else None))
entry_id = cursor.fetchone()[0]
entry_ids[word] = entry_id
# 处理图片信息
if images_info:
self.save_entry_images(entry_id, word, images_info)
if images_info1:
self.save_entry_images(entry_id, word, images_info1)
continue # 跳过后续的插入操作
# 保存或更新词条到数据库
if existing_record:
# 更新现有记录
cursor.execute('''
UPDATE dict_entry
SET definition = %s,
details = %s
WHERE word = %s RETURNING id
''', (final_definition, Json(metadata.model_dump()) if metadata else None, word))
entry_id = cursor.fetchone()[0] if cursor.rowcount > 0 else existing_record[0]
entry_ids[word] = entry_id
else:
# 插入新记录(仅当不是上面处理过的情况)
if word not in entry_ids: # 避免重复插入
cursor.execute('''
INSERT INTO dict_entry (word, definition, details)
VALUES (%s, %s, %s) RETURNING id
''', (word, final_definition, Json(metadata.model_dump()) if metadata else None))
result = cursor.fetchone()
if result:
entry_ids[word] = result[0]
except Exception as e:
print(f"保存词汇 '{word}' 时出错: {e}")
continue
self.conn.commit()
cursor.close()
return entry_ids
def save_dict_media(self, media_files: Dict[str, bytes], media_references: List[Dict],
entry_ids: Dict[str, int]) -> None:
"""保存媒体文件到数据库"""
# 按文件名分组媒体引用
refs_by_filename = {}
for ref in media_references:
filename = ref['filename'].replace('\\', '/').lstrip('/')
if filename not in refs_by_filename:
refs_by_filename[filename] = []
refs_by_filename[filename].append(ref)
saved_count = 0
error_count = 0
for filename, file_data in media_files.items():
if filename in refs_by_filename:
try:
# 每次操作都使用新的游标
cursor = self.conn.cursor()
# 计算文件哈希
file_hash = hashlib.sha256(file_data).hexdigest()
# 先检查是否已存在
cursor.execute('''
SELECT COUNT(*)
FROM dict_media
WHERE file_name = %s
''', (filename,))
if cursor.fetchone()[0] > 0:
print(f"文件已存在,跳过: {filename}")
cursor.close()
continue
file_type = refs_by_filename[filename][0]['type']
# 保存文件数据
cursor.execute('''
INSERT INTO dict_media (file_name, file_type, file_data, file_hash)
VALUES (%s, %s, %s, %s) RETURNING id
''', (filename, file_type, psycopg2.Binary(file_data), file_hash))
media_id = cursor.fetchone()[0]
# 关联到对应的词汇条目
update_count = 0
for ref in refs_by_filename[filename]:
word = ref['word']
if word in entry_ids:
cursor.execute('''
UPDATE dict_media
SET dict_id = %s
WHERE id = %s
''', (entry_ids[word], media_id))
update_count += 1
self.conn.commit()
cursor.close()
saved_count += 1
if saved_count % 100 == 0:
print(f"已处理 {saved_count} 个媒体文件")
except Exception as e:
# 发生错误时回滚并继续处理下一个文件
try:
self.conn.rollback()
cursor.close()
except:
pass
error_count += 1
print(f"保存媒体文件 '{filename}' 时出错: {e}")
continue
else:
# 处理图片文件(没有在 media_references 中的文件)
try:
cursor = self.conn.cursor()
# 计算文件哈希
file_hash = hashlib.sha256(file_data).hexdigest()
# 检查是否已存在
cursor.execute('''
SELECT COUNT(*)
FROM dict_media
WHERE file_name = %s
''', (filename,))
if cursor.fetchone()[0] == 0:
# 保存图片文件数据
cursor.execute('''
INSERT INTO dict_media (file_name, file_type, file_data, file_hash)
VALUES (%s, %s, %s, %s)
''', (filename, 'image', psycopg2.Binary(file_data), file_hash))
self.conn.commit()
cursor.close()
saved_count += 1
except Exception as e:
try:
self.conn.rollback()
cursor.close()
except:
pass
error_count += 1
print(f"保存图片文件 '{filename}' 时出错: {e}")
print(f"媒体文件处理完成: 成功 {saved_count} 个,错误 {error_count}")
def export_media_files(self, output_dir: str) -> None:
"""导出媒体文件到指定目录"""
cursor = self.conn.cursor()
cursor.execute('''
SELECT id, file_name, file_type, file_data
FROM dict_media
WHERE file_data IS NOT NULL
''')
if not os.path.exists(output_dir):
os.makedirs(output_dir)
audio_dir = os.path.join(output_dir, 'audio')
image_dir = os.path.join(output_dir, 'images')
for dir_path in [audio_dir, image_dir]:
if not os.path.exists(dir_path):
os.makedirs(dir_path)
count = 0
for id, filename, file_type, file_data in cursor.fetchall():
try:
if file_type == 'audio':
# 尝试从 filename 中提取扩展名,如果没有则默认 .mp3
ext = os.path.splitext(filename)[1]
if not ext:
ext = '.mp3'
output_path = os.path.join(audio_dir, f"{id}{ext}")
else:
# 图片文件,保留原文件名
safe_filename = os.path.basename(filename)
if not safe_filename:
safe_filename = f"{id}.jpg" # 默认图片扩展名
output_path = os.path.join(image_dir, safe_filename)
with open(output_path, 'wb') as f:
f.write(file_data)
count += 1
except Exception as e:
print(f"导出文件 '{filename}' 失败: {e}")
continue
cursor.close()
print(f"成功导出 {count} 个媒体文件到 {output_dir}")
def extract_images_from_definition(self, definition_html: str, word: str) -> List[Dict]:
"""
从 definition HTML 中提取图片引用
"""
soup = BeautifulSoup(definition_html, 'html.parser')
images_refs = []
# 查找带有 picfile 属性的 span 标签
ldoce_entry = soup.find('span', class_='ldoceEntry Entry')
if ldoce_entry:
picfile_spans = ldoce_entry.find_all('span', attrs={'picfile': True})
for pic_span in picfile_spans:
img_tag = pic_span.find('img')
sense_id = pic_span.get('id')
if img_tag:
alt_attr = img_tag.get('alt')
src_attr = img_tag.get('src')
base64_attr = img_tag.get('base64')
if base64_attr:
# 检查是否是 base64 格式
if base64_attr.startswith('data:image/'):
# 提取 base64 数据
base64_data = base64_attr.split(',')[1] if ',' in base64_attr else base64_attr
try:
# 解码 base64 数据
image_data = base64.b64decode(base64_data)
images_refs.append({
'sense_id': sense_id,
'filename': alt_attr,
'src': base64_attr,
'image_data': image_data, # 实际的二进制图片数据
'type': 'image',
'word': word
})
except Exception as e:
print(f"解码 base64 图片数据失败: {e}")
# 如果解码失败,仍然记录基本信息
images_refs.append({
'sense_id': sense_id,
'filename': alt_attr,
'src': src_attr,
'type': 'image',
'word': word
})
else:
# 不是 base64 格式,可能是文件路径
images_refs.append({
'sense_id': sense_id,
'filename': alt_attr,
'src': src_attr,
'type': 'image',
'word': word
})
return images_refs
def parse_definition_to_metadata(self, definition_html: str) -> tuple[Optional[WordMetaData], List[Dict]]:
"""
从 definition HTML 中提取 WordMetaData 信息,并处理图片信息
返回: (metadata, images_info_list)
"""
soup = BeautifulSoup(definition_html, 'html.parser') # 可改为 'lxml' if installed
images_info: List[Dict] = []
word_metadata: Dict[str, Any] = {'dict_list': []}
try:
# 查找所有 dictentry 容器
dict_entries = soup.find_all('span', class_='dictentry')
if not dict_entries:
print(f"未找到 dictentry 节点")
return WordMetaData(**word_metadata), images_info
for dict_entry in dict_entries:
entry: Dict[str, Any] = {}
# --- 1. 基本词条信息 ---
head_tag = dict_entry.find(class_='Head')
if head_tag:
# GRAM 及物性
head_gram_tag = head_tag.find(class_='GRAM')
if head_gram_tag:
full_text = ''.join(head_gram_tag.stripped_strings)
match = re.search(r'\[([^\]]+)\]', full_text)
if match:
content = match.group(1)
entry['transitive'] = [item.strip().lower() for item in content.split(',')]
hwd_tag = dict_entry.find(class_='HWD')
if hwd_tag:
entry['headword'] = hwd_tag.get_text(strip=True)
# 同形异义词编号 HOMNUM
homnum_tag = dict_entry.find(class_='HOMNUM')
if homnum_tag:
try:
entry['homograph_number'] = int(homnum_tag.get_text(strip=True))
except ValueError:
pass # Ignore if not a number
# 词性 lm5pp_POS (取第一个)
pos_tag = dict_entry.find(class_='lm5pp_POS')
if pos_tag:
entry['part_of_speech'] = pos_tag.get_text(strip=True)
# --- 2. 发音 Pronunciations ---
pron_dict = {}
# 英式发音 IPA
uk_pron_tag = dict_entry.find(class_='PRON') # 通常第一个是英式
if uk_pron_tag:
# 处理 <span class="i">ə</span> 这样的音标变体
ipa_text = ''.join(uk_pron_tag.stripped_strings)
pron_dict['uk_ipa'] = ipa_text.strip('/ ') # 去掉斜杠
# 美式发音 IPA (可能在 AMEVARPRON 中)
us_pron_tag = dict_entry.find(class_='AMEVARPRON')
if us_pron_tag:
us_ipa_text = ''.join(us_pron_tag.stripped_strings)
pron_dict['us_ipa'] = us_ipa_text.strip('/ $ ') # 去掉斜杠和美元符号
# 英式音频 - 优先查找 data-src-mp3然后查找 href="sound://..."
uk_audio_tag = dict_entry.find('a', class_='speaker brefile', attrs={'data-src-mp3': lambda x: x and x.startswith('sound://')})
if not uk_audio_tag:
# 查找 href 属性以 sound:// 开头的
uk_audio_tag = dict_entry.find('a', class_='speaker brefile', href=lambda x: x and x.startswith('sound://'))
if not uk_audio_tag:
# 更宽松的查找,只要 class 包含 speaker 和 brefile
uk_audio_tag = dict_entry.find('a', class_=lambda x: x and 'speaker' in x and 'brefile' in x, attrs={'data-src-mp3': True})
if not uk_audio_tag:
uk_audio_tag = dict_entry.find('a', class_=lambda x: x and 'speaker' in x and 'brefile' in x, href=lambda x: x and x.startswith('sound://'))
if uk_audio_tag:
# 优先使用 data-src-mp3
uk_audio_src = uk_audio_tag.get('data-src-mp3')
if not uk_audio_src or not uk_audio_src.startswith('sound://'):
# 否则使用 href
uk_audio_href = uk_audio_tag.get('href', '')
if uk_audio_href.startswith('sound://'):
uk_audio_src = uk_audio_href
if uk_audio_src:
pron_dict['uk_audio'] = uk_audio_src.replace('sound://', '', 1)
pron_dict['uk_audio_title'] = uk_audio_tag.get('title', '')
# 美式音频 - 优先查找 data-src-mp3然后查找 href="sound://..."
us_audio_tag = dict_entry.find('a', class_='speaker amefile', attrs={'data-src-mp3': lambda x: x and x.startswith('sound://')})
if not us_audio_tag:
us_audio_tag = dict_entry.find('a', class_='speaker amefile', href=lambda x: x and x.startswith('sound://'))
if not us_audio_tag:
us_audio_tag = dict_entry.find('a', class_=lambda x: x and 'speaker' in x and 'amefile' in x, attrs={'data-src-mp3': True})
if not us_audio_tag:
us_audio_tag = dict_entry.find('a', class_=lambda x: x and 'speaker' in x and 'amefile' in x, href=lambda x: x and x.startswith('sound://'))
if us_audio_tag:
us_audio_src = us_audio_tag.get('data-src-mp3')
if not us_audio_src or not us_audio_src.startswith('sound://'):
us_audio_href = us_audio_tag.get('href', '')
if us_audio_href.startswith('sound://'):
us_audio_src = us_audio_href
if us_audio_src:
pron_dict['us_audio'] = us_audio_src.replace('sound://', '', 1)
pron_dict['us_audio_title'] = us_audio_tag.get('title', '')
if pron_dict:
entry['pronunciations'] = Pronunciation(**pron_dict)
# --- 3. 频率 Frequency ---
freq_dict = {}
freq_level_tag = dict_entry.find(class_='LEVEL')
if freq_level_tag:
freq_dict['level'] = freq_level_tag.get('title', '').strip()
freq_dict['level_tag'] = freq_level_tag.get_text(strip=True)
freq_spoken_tag = dict_entry.find(class_='FREQ', title=lambda x: x and 'spoken' in x.lower())
if freq_spoken_tag:
freq_dict['spoken'] = freq_spoken_tag.get('title', '').strip()
freq_dict['spoken_tag'] = freq_spoken_tag.get_text(strip=True)
freq_written_tag = dict_entry.find(class_='FREQ', title=lambda x: x and 'written' in x.lower())
if freq_written_tag:
freq_dict['written'] = freq_written_tag.get('title', '').strip()
freq_dict['written_tag'] = freq_written_tag.get_text(strip=True)
if freq_dict:
entry['frequency'] = Frequency(**freq_dict)
# --- 4. 话题 Topics ---
topics_list = []
topic_tags = dict_entry.find_all('a', class_='topic')
for topic_tag in topic_tags:
topic_text = topic_tag.get_text(strip=True)
topic_href = topic_tag.get('href', '')
if topic_text:
topics_list.append(Topic(name=topic_text, href=topic_href))
if topics_list:
entry['topics'] = topics_list
# --- 5. 词族 Word Family ---
word_fams_div = dict_entry.find(class_='LDOCE_word_family')
if word_fams_div:
families_list = []
current_pos = None
current_items = []
# 遍历子元素
for child in word_fams_div.children:
if isinstance(child, Tag):
if 'pos' in child.get('class', []):
# 如果遇到新的 pos先保存上一个
if current_pos and current_items:
families_list.append(WordFamily(pos=current_pos, items=current_items))
# 开始新的 pos 组
current_pos = child.get_text(strip=True)
current_items = []
elif 'w' in child.get('class', []): # 包括 'crossRef w' 和 'w'
item_text = child.get_text(strip=True)
item_href = child.get('href', '') if child.name == 'a' else None
current_items.append(FamilyItem(text=item_text, href=item_href))
# 保存最后一个 pos 组
if current_pos and current_items:
families_list.append(WordFamily(pos=current_pos, items=current_items))
if families_list:
entry['word_family'] = families_list
# --- 6. 义项 Senses 和 定义/例子 ---
senses_list = []
# 查找所有 Sense div (可能带有 newline 类)
sense_tags = dict_entry.find_all('span', class_=lambda x: x and 'Sense' in x)
for sense_tag in sense_tags:
if not isinstance(sense_tag, Tag):
continue
sense_id = sense_tag.get('id', '')
sense_dict: Dict[str, Any] = {'id': sense_id}
# Sense 编号 (sensenum)
sensenum_tag = sense_tag.find(class_='sensenum')
if sensenum_tag:
sense_dict['number'] = sensenum_tag.get_text(strip=True)
# GRAM 可数性
gram_tag = sense_tag.find(class_='GRAM')
if gram_tag:
full_text = ''.join(gram_tag.stripped_strings)
# 使用正则表达式匹配方括号内的内容,例如 [countable, uncountable]
match = re.search(r'\[([^\]]+)\]', full_text)
if match:
# 提取方括号内的文本,如 "countable, uncountable"
content = match.group(1)
# 按逗号分割,并清理每个词
sense_dict['countability'] = [item.strip().lower() for item in content.split(',')]
# --- 修改逻辑:精细化处理 Crossref 标签 ---
crossref_container_tags = sense_tag.find_all('span', class_=lambda x: x and 'Crossref' in x)
crossref_items_list = []
for container_tag in crossref_container_tags:
# 查找容器内所有的 crossRef 链接
crossref_link_tags = container_tag.find_all('a', class_='crossRef')
for link_tag in crossref_link_tags:
crossref_item_dict: Dict[str, Any] = {'sense_id': sense_id}
# 1. 尝试从 link_tag 前面的兄弟节点 (通常是 REFLEX) 获取描述性文本
# text_parts = []
# # 遍历 link_tag 之前的直接兄弟节点
# prev_sibling = link_tag.previous_sibling
# while prev_sibling and hasattr(prev_sibling, 'name') and prev_sibling.name != 'a':
# # 检查是否是包含文本的标签 (如 REFLEX, neutral span)
# if hasattr(prev_sibling, 'get_text'):
# txt = prev_sibling.get_text(strip=True)
# if txt:
# text_parts.append(txt)
# prev_sibling = prev_sibling.previous_sibling
# # 如果前面没找到描述性文本,则回退到 link_tag 自身的文本
# if not text_parts:
# link_text = link_tag.get_text(strip=True)
# if link_text:
# text_parts.append(link_text)
# # 组合找到的文本
# if text_parts:
# crossref_item_dict['text'] = ' '.join(reversed(text_parts)).strip() # 反转是因为我们是向前查找的
# 2. 获取 href
href = link_tag.get('href')
if href:
crossref_item_dict['entry_href'] = href
ref_hwd = link_tag.find('span', class_='REFHWD')
text = ref_hwd.get_text(strip=True)
if text:
crossref_item_dict['text'] = text
# 检查是否是图片相关的交叉引用 (ldoce-show-image)
if 'ldoce-show-image' in link_tag.get('class', []):
# 提取图片 ID
showid = link_tag.get('showid', '')
if showid:
crossref_item_dict['show_id'] = showid
# --- 修改逻辑:提取完整的 base64 字符串 ---
# 提取 base64 属性值 (可能包含前缀 data:image/...)
full_base64_data = link_tag.get('src', '')
if not full_base64_data:
full_base64_data = link_tag.get('base64', '')
if full_base64_data and full_base64_data.startswith('data:'):
# --- 新增逻辑:组合 image_filename 并准备图片信息 ---
# 为了文件名更安全,可以对 base64 字符串的一部分进行哈希或截取
# 这里简化处理,直接用 showid 和 base64 的一部分 (例如前50个字符) 组合
# 或者使用 base64 字符串的哈希值
import hashlib
# 使用 base64 字符串的 SHA1 哈希的前16位作为唯一标识符的一部分
base64_hash = hashlib.sha1(full_base64_data.encode('utf-8')).hexdigest()[:16]
# 组合 file_name
image_filename = f"{showid}_sha1_{base64_hash}" # 推荐使用哈希
crossref_item_dict['image_filename'] = image_filename
# 可以考虑从 base64 前缀提取 MIME 类型
mime_type = full_base64_data.split(';')[0].split(':')[1] if ';' in full_base64_data else 'image/jpeg'
# 准备图片信息字典,供后续存入 dict_media 表
images_info.append({
'sense_id': sense_id,
'filename': image_filename,
'src': f"crossref:{showid}", # 可以包含 showid 便于识别
'type': 'image_crossref',
'crossref_showid': showid,
# 存储完整的 base64 数据
'crossref_full_base64': full_base64_data,
# 提取图片标题
'crossref_title': link_tag.get('title', ''),
'mime_type': mime_type
})
else:
crossref_item_dict['image_filename'] = full_base64_data
# 提取图片标题 (title 属性)
image_title = link_tag.get('title', '')
if image_title:
crossref_item_dict['image_title'] = image_title
# 提取 LDOCE 版本信息 (从容器 span 标签上获取)
container_classes = container_tag.get('class', [])
version_classes = [cls for cls in container_classes if cls.startswith('LDOCEVERSION_')]
if version_classes:
crossref_item_dict['ldoce_version'] = version_classes[0]
# 如果提取到了任何信息,则添加到列表
if crossref_item_dict:
try:
crossref_item = CrossReference(**crossref_item_dict)
crossref_items_list.append(crossref_item)
except Exception as e:
print(f"创建 CrossReference 对象失败: {e}, 数据: {crossref_item_dict}")
if crossref_items_list:
sense_dict['cross_references'] = crossref_items_list
# Signpost 和其中文 (SIGNPOST)
signpost_tag = sense_tag.find(class_='SIGNPOST')
if signpost_tag:
# 英文部分是 SIGNPOST 标签本身的内容(不含子标签)
# signpost_en_text = signpost_tag.get_text(strip=True) # 这会包含子标签 cn_txt
# 更精确地获取英文部分
signpost_parts = []
for content in signpost_tag.contents:
if isinstance(content, str):
signpost_parts.append(content.strip())
elif content.name != 'span' or 'cn_txt' not in content.get('class', []):
signpost_parts.append(content.get_text(strip=True))
sense_dict['signpost_en'] = ' '.join(filter(None, signpost_parts))
cn_signpost_tag = signpost_tag.find(class_='cn_txt')
if cn_signpost_tag:
sense_dict['signpost_cn'] = cn_signpost_tag.get_text(strip=True)
# 定义 (DEF) - 可能有英文和中文
defs_list = []
def_tags = sense_tag.find_all(class_='DEF')
i = 0
while i < len(def_tags):
en_def_tag = def_tags[i]
cn_def_tag = None
# 检查下一个 DEF 是否是中文翻译
if i + 1 < len(def_tags) and def_tags[i + 1].find(class_='cn_txt'):
cn_def_tag = def_tags[i + 1].find(class_='cn_txt')
i += 2 # 跳过中英文一对
else:
i += 1 # 只处理英文定义
def_en_text = self._extract_text_with_links(en_def_tag) # 处理内部链接 a.defRef
def_cn_text = cn_def_tag.get_text(strip=True) if cn_def_tag else None
related_in_def_list = []
for content in en_def_tag.contents:
if hasattr(content, 'name'):
if content.name == 'a' and 'defRef' in content.get('class', []):
# 提取 href 属性中的链接词
href = content.get('href', '')
# 假设 href 格式为 entry://word 或类似,提取 word 部分
# 简单处理:去掉前缀,按 '#' 或 '/' 分割取第一部分
if href:
# 去掉协议部分
if '://' in href:
word_part = href.split('://', 1)[1]
else:
word_part = href
# 去掉锚点
word_part = word_part.split('#', 1)[0]
# 去掉查询参数 (如果有的话)
word_part = word_part.split('?', 1)[0]
# 去掉路径中的文件名部分,只保留词 (简单处理)
# 例如 entry://Food, dish-topic food -> Food, dish-topic food
# 例如 entry://red -> red
# 例如 entry://inside#inside__9__a -> inside
related_word = word_part.strip()
if related_word:
related_in_def_list.append(related_word)
# 过滤掉空定义
if def_en_text or def_cn_text:
defs_list.append(Definition(en=def_en_text, cn=def_cn_text, related_words=related_in_def_list))
if defs_list:
sense_dict['definitions'] = defs_list
# 例子 (EXAMPLE)
examples_list = []
example_tags = sense_tag.find_all(class_='EXAMPLE')
for ex_tag in example_tags:
if not isinstance(ex_tag, Tag):
continue
example_dict: Dict[str, Any] = {}
# 英文例句 (english)
en_span_tag = ex_tag.find(class_='english')
if en_span_tag:
example_dict['en'] = self._extract_text_with_links(en_span_tag) # 处理内部链接
# 中文翻译 (cn_txt)
cn_span_tag = ex_tag.find(class_='cn_txt')
if cn_span_tag:
example_dict['cn'] = cn_span_tag.get_text(strip=True)
# 搭配 (COLLOINEXA)
collocation_tag = ex_tag.find(class_='COLLOINEXA')
if collocation_tag:
# 搭配文本可能需要特殊处理,因为它可能在 en 文本中被高亮
# 这里简单提取文本
example_dict['collocation'] = collocation_tag.get_text(strip=True)
# 例子内链接词 (crossRef in example)
related_in_ex_list = []
# 查找例子文本内的 defRef 或 crossRef 链接
if en_span_tag:
ref_tags_in_ex = en_span_tag.find_all('a', class_=['defRef', 'crossRef'])
for ref_tag in ref_tags_in_ex:
ref_text = ref_tag.get_text(strip=True)
if ref_text:
related_in_ex_list.append(ref_text)
if related_in_ex_list:
example_dict['related_words_in_example'] = related_in_ex_list
# --- 示例音频提取 (关键修改点) ---
# 查找示例音频链接,匹配 href="sound://..."
ex_audio_tag = ex_tag.find('a', class_='speaker exafile', href=lambda x: x and x.startswith('sound://'))
if not ex_audio_tag:
# 更宽松的匹配 class 包含 speaker 和 exafile
ex_audio_tag = ex_tag.find('a', class_=lambda x: x and 'speaker' in x and 'exafile' in x, href=lambda x: x and x.startswith('sound://'))
if ex_audio_tag:
audio_href = ex_audio_tag.get('href', '')
if audio_href.startswith('sound://'):
example_dict['audio'] = audio_href.replace('sound://', '', 1)
if example_dict.get('en') or example_dict.get('cn'): # 只添加有内容的例子
examples_list.append(Example(**example_dict))
if examples_list:
sense_dict['examples'] = examples_list
if sense_dict.get('definitions') or sense_dict.get('examples'): # 只添加有定义或例子的 Sense
senses_list.append(Sense(**sense_dict))
if senses_list:
entry['senses'] = senses_list
word_metadata['dict_list'].append(entry)
# etym
etym_tag = soup.find('span', class_='etym')
if etym_tag:
etym_map: Dict[str, Any] = {'item': []}
asset_intro = etym_tag.find('span', class_='asset_intro')
if asset_intro:
etym_map['intro'] = asset_intro.get_text(strip=True)
head_tag = etym_tag.find('span', class_='Head')
if head_tag:
hw_tag = head_tag.find('span', class_='HWD')
if hw_tag:
etym_map['headword'] = hw_tag.get_text(strip=True)
hom_tag = head_tag.find('span', class_='HOMNUM')
if hom_tag:
etym_map['hom_num'] = hom_tag.get_text(strip=True)
sense_tags = etym_tag.find_all('span', class_='Sense')
for sense_tag in sense_tags:
item: Dict[str, Any] = {}
lang_tag = sense_tag.find('span', class_='LANG')
if lang_tag:
item['language'] = lang_tag.get_text(strip=True).strip()
origin_tag = sense_tag.find('span', class_='ORIGIN')
if origin_tag:
item['origin'] = origin_tag.get_text(strip=True).strip()
etym_map['item'].append(EtymologyItem(**item))
word_metadata['etymology'] = Etymology(**etym_map)
# --- 7. 创建 WordMetaData 对象 ---
if word_metadata:
try:
metadata = WordMetaData(**word_metadata)
return metadata, images_info # images_info 在此方法中未填充
except Exception as e:
print(f"WordMetaData 验证失败,原始数据: {json.dumps(word_metadata, ensure_ascii=False, indent=2)}")
print(f"验证错误: {e}")
# 可以选择返回 None 或者不验证的 dict
return None, images_info
else:
return None, images_info
except Exception as e:
print(f"解析 HTML 时出错: {e}")
import traceback
traceback.print_exc() # 打印详细错误信息
return None, images_info
def _extract_text_with_links(self, tag: Tag) -> str:
"""提取标签文本,保留内部链接词的文本,但不保留 HTML 结构。
例如: 'a hard round <a href="...">fruit</a>' -> 'a hard round fruit'
"""
if not tag:
return ""
parts = []
for content in tag.contents:
if isinstance(content, str):
parts.append(content.strip())
elif hasattr(content, 'name') and content.name == 'a' and 'defRef' in content.get('class', []):
# 提取链接词的文本
parts.append(content.get_text(strip=True))
elif hasattr(content, 'name'): # 其他标签,递归提取文本
parts.append(self._extract_text_with_links(content))
# 忽略其他非标签、非文本内容
return ' '.join(filter(None, parts)) # 过滤空字符串并用空格连接
def save_entry_images(self, entry_id: int, word: str, images_info: List[Dict]) -> None:
"""
保存词条的图片信息到 dict_media 表
"""
from psycopg2.extras import Json
import hashlib
cursor = self.conn.cursor()
try:
for img_info in images_info:
# 检查是否存在 crossref_full_base64 并尝试解码
image_data = None
if 'crossref_full_base64' in img_info:
try:
# Base64 字符串可能包含前缀 (如 data:image/jpeg;base64,...)
b64_string = img_info['crossref_full_base64']
if b64_string.startswith('data:'):
# 分割并获取实际的 base64 数据部分
header, b64_data = b64_string.split(',', 1)
else:
# 如果没有前缀,整个字符串就是 base64 数据
b64_data = b64_string
# 解码 Base64 字符串为二进制数据
image_data = base64.b64decode(b64_data)
# print(f"成功解码 crossref 图片: {img_info.get('filename', 'unknown')}")
except Exception as e:
print(
f"解码 crossref_full_base64 数据失败 (文件名: {img_info.get('filename', 'unknown')}): {e}")
# 如果解码失败,可以选择跳过这个图片或记录错误
# continue # 跳过当前图片
# 或者保留 image_data 为 None后续逻辑会处理
# 如果上面解码成功,使用解码后的 image_data否则检查是否已有 'image_data' (来自 extract_images_from_definition)
if image_data is None and 'image_data' in img_info:
image_data = img_info['image_data']
filename = img_info['filename']
src = img_info['src']
file_type = img_info['type']
details = {
'sense_id': img_info.get('sense_id'),
'src': src,
'word': word,
'mime_type': img_info.get('mime_type'),
'show_id': img_info.get('crossref_showid'),
'crossref_title': img_info.get('crossref_title'),
}
# 移除 details 中的 None 值 (可选,保持数据整洁)
details = {k: v for k, v in details.items() if v is not None}
# 检查是否已存在相同的图片记录
cursor.execute('''
SELECT id
FROM dict_media
WHERE file_name = %s
AND dict_id = %s
''', (filename, entry_id))
if cursor.fetchone() is None:
# 处理图片数据
if image_data:
# 有实际的图片二进制数据base64 解码后的数据)
file_hash = hashlib.sha256(image_data).hexdigest()
cursor.execute('''
INSERT INTO dict_media (dict_id, file_name, file_type, file_data, file_hash, details)
VALUES (%s, %s, %s, %s, %s, %s)
''', (entry_id, filename, file_type, psycopg2.Binary(image_data), file_hash, Json(details)))
else:
# 没有实际图片数据,可能是文件路径引用
file_hash = hashlib.sha256(src.encode()).hexdigest()
cursor.execute('''
INSERT INTO dict_media (dict_id, file_name, file_type, file_data, file_hash, details)
VALUES (%s, %s, %s, %s, %s)
''', (entry_id, filename, file_type, src, file_hash, Json(details)))
except Exception as e:
print(f"保存词条 '{word}' 的图片信息时出错: {e}")
self.conn.commit()
cursor.close()
def close(self):
"""关闭数据库连接"""
if self.conn:
self.conn.close()
# 使用示例
def main():
# 数据库配置
db_config = {
'host': 'localhost',
'database': 'postgres',
'user': 'root',
'password': 'root',
'port': 5432
}
# 文件路径
mdx_path = './LDOCE5.mdx'
mdd_path = './LDOCE5.mdd' # 可选
# 创建解析器实例
parser = DictionaryParser(db_config)
try:
# with open('./exported_media/kernel.html', 'r', encoding='utf-8') as file:
# html_str = file.read()
# de,image_info = parser.parse_definition_to_metadata(html_str)
# print(de)
# 解析词典文件
parser.parse_mdx_mdd(mdx_path, mdd_path)
# 可选:导出媒体文件到本地目录
# parser.export_media_files('./exported_media')
except Exception as e:
print(f"解析过程中出现错误: {e}")
finally:
parser.close()
if __name__ == "__main__":
main()