fix code
This commit is contained in:
@@ -1,20 +0,0 @@
|
||||
import urllib.request
|
||||
import ssl
|
||||
|
||||
API_URL = "https://dict.youdao.com/jsonapi?q=word"
|
||||
|
||||
|
||||
def main():
|
||||
# 创建不验证 SSL 的上下文
|
||||
context = ssl._create_unverified_context()
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(API_URL, context=context) as response:
|
||||
body = response.read().decode('utf-8')
|
||||
print(body)
|
||||
except Exception as e:
|
||||
print(f"请求出错: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,469 +0,0 @@
|
||||
import base64
|
||||
import os
|
||||
import re
|
||||
import mysql.connector
|
||||
import hashlib
|
||||
from typing import List, Tuple, Dict, Optional, Any
|
||||
from readmdict import MDX, MDD
|
||||
from bs4 import BeautifulSoup, Tag
|
||||
import json
|
||||
from mysql.connector import Error
|
||||
|
||||
from backend.app.admin.schema.dict import Example, Frequency, Pronunciation, FamilyItem, WordFamily, \
|
||||
WordMetaData, Sense, Definition, Topic, CrossReference, DictEntry, Etymology, EtymologyItem
|
||||
|
||||
|
||||
class DictionaryParser:
|
||||
def __init__(self, db_config: Dict):
|
||||
"""初始化数据库连接"""
|
||||
self.db_config = db_config
|
||||
self.conn = None
|
||||
self.connect_db()
|
||||
|
||||
def connect_db(self):
|
||||
"""连接到MySQL数据库"""
|
||||
try:
|
||||
self.conn = mysql.connector.connect(**self.db_config)
|
||||
except Error as e:
|
||||
print(f"数据库连接失败: {e}")
|
||||
raise
|
||||
|
||||
def parse_mdx_mdd(self, mdx_path: str, mdd_path: str = None) -> None:
|
||||
"""解析MDX和MDD文件"""
|
||||
try:
|
||||
# 解析MDX文件
|
||||
entries, media_references = self.parse_mdx_file_mdict(mdx_path)
|
||||
|
||||
# 保存词汇条目
|
||||
entry_ids = self.save_entries(entries)
|
||||
|
||||
# 如果有MDD文件,解析媒体文件
|
||||
if mdd_path and os.path.exists(mdd_path):
|
||||
self.parse_mdd_file(mdd_path, media_references, entry_ids)
|
||||
else:
|
||||
print("未提供MDD文件或文件不存在")
|
||||
|
||||
print(f"解析完成,共处理 {len(entries)} 个词汇条目")
|
||||
|
||||
except Error as e:
|
||||
print(f"解析词典文件失败: {e}")
|
||||
raise
|
||||
|
||||
def parse_mdx_file_mdict(self, mdx_path: str) -> Tuple[List[Tuple[str, str]], List[Dict]]:
|
||||
"""使用 mdict_reader 解析 MDX 文件"""
|
||||
print(f"正在解析MDX文件: {mdx_path}")
|
||||
|
||||
try:
|
||||
mdx = MDX(mdx_path)
|
||||
entries = []
|
||||
media_references = []
|
||||
|
||||
for key, value in mdx.items():
|
||||
word = key.decode('utf-8') if isinstance(key, bytes) else str(key)
|
||||
definition = value.decode('utf-8') if isinstance(value, bytes) else str(value)
|
||||
|
||||
if word and definition:
|
||||
entries.append((word, definition))
|
||||
# 提取媒体文件引用
|
||||
media_refs = self.extract_media_references(definition, word)
|
||||
media_references.extend(media_refs)
|
||||
|
||||
return entries, media_references
|
||||
|
||||
except Error as e:
|
||||
print(f"解析MDX文件失败: {e}")
|
||||
raise
|
||||
|
||||
def parse_mdd_file(self, mdd_path: str, media_references: List[Dict], entry_ids: Dict[str, int]) -> None:
|
||||
"""解析MDD文件中的媒体资源 - 使用 mdict_reader"""
|
||||
print(f"正在解析MDD文件: {mdd_path}")
|
||||
|
||||
try:
|
||||
# 使用 mdict_reader 解析 MDD 文件
|
||||
mdd = MDD(mdd_path)
|
||||
|
||||
# 创建文件名到媒体数据的映射
|
||||
dict_media = {}
|
||||
for key, value in mdd.items():
|
||||
filename = key.decode('utf-8') if isinstance(key, bytes) else str(key)
|
||||
# 确保文件名格式统一
|
||||
filename = filename.replace('\\', '/').lstrip('/')
|
||||
dict_media[filename] = value
|
||||
|
||||
# 保存媒体文件
|
||||
self.save_dict_media(dict_media, media_references, entry_ids)
|
||||
|
||||
except Error as e:
|
||||
print(f"解析MDD文件失败: {e}")
|
||||
raise
|
||||
|
||||
def extract_media_references(self, definition: str, word: str) -> List[Dict]:
|
||||
"""从定义中提取媒体文件引用"""
|
||||
media_refs = []
|
||||
|
||||
# 提取音频文件引用 - 更通用的模式,匹配 sound:// 或 href="sound://..."
|
||||
# 这个模式应该能覆盖 aeroplane.txt 中的 sound://media/english/... 链接
|
||||
audio_patterns = [
|
||||
r'sound://([^"\s>]+\.mp3)', # 直接 sound:// 开头,后跟非空格/"/>字符直到 .mp3
|
||||
r'href\s*=\s*["\']sound://([^"\'>]+\.mp3)["\']', # href="sound://..."
|
||||
r'href\s*=\s*["\']sound://([^"\'>]+)["\']', # 更宽松的 href="sound://...",不一定以.mp3结尾
|
||||
r'data-src-mp3\s*=\s*["\']sound://([^"\'>]+\.mp3)["\']', # data-src-mp3="sound://..."
|
||||
r'data-src-mp3\s*=\s*["\']([^"\'>]+\.mp3)["\']', # data-src-mp3="..." (相对路径)
|
||||
r'audio\s*=\s*["\']([^"\']+)["\']', # audio="..."
|
||||
]
|
||||
|
||||
for pattern in audio_patterns:
|
||||
matches = re.findall(pattern, definition, re.IGNORECASE)
|
||||
for match in matches:
|
||||
# 清理可能的多余字符(如结尾的引号或空格,虽然正则应该已经避免了)
|
||||
clean_filename = match.strip()#.rstrip('"\'')
|
||||
if clean_filename:
|
||||
media_refs.append({
|
||||
'filename': clean_filename,
|
||||
'type': 'audio',
|
||||
'word': word
|
||||
})
|
||||
|
||||
# 提取图片文件引用
|
||||
image_patterns = [
|
||||
r'<img[^>]*src\s*=\s*["\']([^"\']+\.(?:jpg|jpeg|png|gif|bmp))["\']', # src="..."
|
||||
r'\[image:([^\]]+\.(?:jpg|jpeg|png|gif|bmp))\]', # [image:...]
|
||||
r'src\s*=\s*["\']([^"\']+\.(?:jpg|jpeg|png|gif|bmp))["\']' # 更宽松的 src="..."
|
||||
]
|
||||
|
||||
for pattern in image_patterns:
|
||||
matches = re.findall(pattern, definition, re.IGNORECASE)
|
||||
for match in matches:
|
||||
# 清理可能的多余字符
|
||||
clean_filename = match.strip()#.rstrip('"\'')
|
||||
if clean_filename:
|
||||
media_refs.append({
|
||||
'filename': clean_filename,
|
||||
'type': 'image',
|
||||
'word': word
|
||||
})
|
||||
|
||||
return media_refs
|
||||
|
||||
def save_entries(self, entries: List[Tuple[str, str]]) -> Dict[str, int]:
|
||||
"""保存词汇条目到数据库,并更新 details 字段"""
|
||||
from mysql.connector import Error
|
||||
import hashlib
|
||||
|
||||
cursor = self.conn.cursor(dictionary=True)
|
||||
entry_ids = {}
|
||||
|
||||
for word, definition in entries:
|
||||
try:
|
||||
# 检查数据库中是否已存在该词条
|
||||
cursor.execute('SELECT id, definition, details FROM dict_entry WHERE word = %s', (word,))
|
||||
existing_record = cursor.fetchone()
|
||||
|
||||
metadata = None
|
||||
existing_details = None
|
||||
final_definition = definition # 默认使用当前 definition
|
||||
|
||||
# 如果存在现有记录
|
||||
if existing_record:
|
||||
entry_id, existing_definition, existing_details_json = existing_record['id'], existing_record['definition'], existing_record['details']
|
||||
|
||||
# 获取现有的 details
|
||||
if existing_details_json:
|
||||
try:
|
||||
existing_details = WordMetaData(**existing_details_json)
|
||||
except:
|
||||
existing_details = None
|
||||
|
||||
# 如果当前 definition 是以 @@@ 开头的引用链接
|
||||
if definition.startswith('@@@'):
|
||||
# 保留现有的 definition,只更新 details 中的 ref_link
|
||||
final_definition = existing_definition # 保持原有的 definition
|
||||
|
||||
# 提取新的 @@@ 链接
|
||||
lines = definition.split('\n')
|
||||
new_ref_links = []
|
||||
for line in lines:
|
||||
if line.startswith('@@@'):
|
||||
link = line[3:].strip()
|
||||
if link:
|
||||
new_ref_links.append(link)
|
||||
else:
|
||||
break
|
||||
|
||||
# 合并链接信息
|
||||
if new_ref_links:
|
||||
if existing_details:
|
||||
# 如果已有 details,合并 ref_link
|
||||
if existing_details.ref_link:
|
||||
# 合并现有链接和新链接,去重但保持顺序
|
||||
combined_links = existing_details.ref_link[:]
|
||||
for link in new_ref_links:
|
||||
if link not in combined_links:
|
||||
combined_links.append(link)
|
||||
else:
|
||||
combined_links = new_ref_links
|
||||
else:
|
||||
combined_links = new_ref_links
|
||||
|
||||
# 更新 details
|
||||
if existing_details:
|
||||
metadata = existing_details.model_copy(update={"ref_link": combined_links})
|
||||
else:
|
||||
metadata = WordMetaData(ref_link=combined_links)
|
||||
|
||||
# 如果是新词条或需要更新 details
|
||||
if not existing_record or metadata:
|
||||
# 如果是新词条,创建默认 metadata
|
||||
if not existing_record:
|
||||
metadata = WordMetaData()
|
||||
|
||||
# 准备 details 数据
|
||||
details_dict = metadata.model_dump() if metadata else None
|
||||
|
||||
if existing_record:
|
||||
# 更新现有记录
|
||||
cursor.execute('''
|
||||
UPDATE dict_entry
|
||||
SET definition = %s, details = %s
|
||||
WHERE word = %s
|
||||
''', (final_definition, json.dumps(details_dict) if details_dict else None, word))
|
||||
entry_id = existing_record['id']
|
||||
else:
|
||||
# 插入新记录
|
||||
cursor.execute('''
|
||||
INSERT INTO dict_entry (word, definition, details)
|
||||
VALUES (%s, %s, %s)
|
||||
''', (word, final_definition, json.dumps(details_dict) if details_dict else None))
|
||||
entry_id = cursor.lastrowid
|
||||
|
||||
else:
|
||||
# 保持现有记录不变
|
||||
entry_id = existing_record['id']
|
||||
|
||||
entry_ids[word] = entry_id
|
||||
|
||||
except Error as e:
|
||||
print(f"保存词条 '{word}' 时出错: {e}")
|
||||
|
||||
self.conn.commit()
|
||||
cursor.close()
|
||||
return entry_ids
|
||||
|
||||
def parse_definition_to_metadata(self, html_str: str) -> Tuple[Dict, List[Dict]]:
|
||||
"""解析HTML定义字符串,提取元数据"""
|
||||
soup = BeautifulSoup(html_str, 'html.parser')
|
||||
|
||||
# 提取发音
|
||||
pronunciations = []
|
||||
pron_links = soup.find_all('a', class_='pronounce')
|
||||
for link in pron_links:
|
||||
pron_type = link.get('data-rel', '')
|
||||
pron_url = link.get('href', '')
|
||||
if pron_type and pron_url:
|
||||
pronunciations.append(Pronunciation(type=pron_type, url=pron_url))
|
||||
|
||||
# 提取词性
|
||||
pos_elements = soup.find_all('span', class_='pos')
|
||||
pos_list = [pos.get_text().strip() for pos in pos_elements]
|
||||
|
||||
# 提取释义
|
||||
definitions = []
|
||||
sense_elements = soup.find_all('span', class_='def')
|
||||
for sense in sense_elements:
|
||||
definition_text = sense.get_text().strip()
|
||||
if definition_text:
|
||||
definitions.append(Definition(text=definition_text))
|
||||
|
||||
# 提取例句
|
||||
examples = []
|
||||
example_elements = soup.find_all('span', class_='example')
|
||||
for example in example_elements:
|
||||
example_text = example.get_text().strip()
|
||||
if example_text:
|
||||
examples.append(Example(text=example_text))
|
||||
|
||||
# 构建元数据
|
||||
metadata = {
|
||||
"pronunciations": [p.model_dump() for p in pronunciations],
|
||||
"parts_of_speech": pos_list,
|
||||
"definitions": [d.model_dump() for d in definitions],
|
||||
"examples": [e.model_dump() for e in examples]
|
||||
}
|
||||
|
||||
# 提取媒体信息
|
||||
media_info = []
|
||||
img_elements = soup.find_all('img')
|
||||
for img in img_elements:
|
||||
src = img.get('src', '')
|
||||
if src:
|
||||
media_info.append({
|
||||
'type': 'image',
|
||||
'src': src
|
||||
})
|
||||
|
||||
return metadata, media_info
|
||||
|
||||
def save_dict_media(self, dict_media: Dict[str, bytes], media_references: List[Dict], entry_ids: Dict[str, int]) -> None:
|
||||
"""保存词典媒体文件到数据库"""
|
||||
from mysql.connector import Error
|
||||
import hashlib
|
||||
|
||||
cursor = self.conn.cursor(dictionary=True)
|
||||
|
||||
try:
|
||||
for media_ref in media_references:
|
||||
word = media_ref['word']
|
||||
filename = media_ref['filename']
|
||||
file_type = media_ref['type']
|
||||
|
||||
# 查找对应的 entry_id
|
||||
entry_id = entry_ids.get(word)
|
||||
if not entry_id:
|
||||
continue
|
||||
|
||||
# 查找文件数据
|
||||
# 处理文件名,确保与 dict_media 中的键匹配
|
||||
normalized_filename = filename.replace('\\', '/').lstrip('/')
|
||||
file_data = dict_media.get(normalized_filename)
|
||||
if not file_data:
|
||||
# 尝试其他可能的文件名变体
|
||||
alt_filename = filename.lstrip('/')
|
||||
file_data = dict_media.get(alt_filename)
|
||||
if not file_data:
|
||||
print(f"警告: 找不到媒体文件 {filename} 的数据")
|
||||
continue
|
||||
|
||||
# 计算文件哈希
|
||||
file_hash = hashlib.md5(file_data).hexdigest()
|
||||
|
||||
# 检查数据库中是否已存在相同的文件
|
||||
cursor.execute('''
|
||||
SELECT id FROM dict_media
|
||||
WHERE file_hash = %s AND file_type = %s
|
||||
''', (file_hash, file_type))
|
||||
existing_record = cursor.fetchone()
|
||||
|
||||
if existing_record:
|
||||
# 如果文件已存在,只需关联到当前词条
|
||||
media_id = existing_record['id']
|
||||
cursor.execute('''
|
||||
INSERT IGNORE INTO dict_entry_media (entry_id, media_id)
|
||||
VALUES (%s, %s)
|
||||
''', (entry_id, media_id))
|
||||
else:
|
||||
# 插入新文件记录
|
||||
cursor.execute('''
|
||||
INSERT INTO dict_media (filename, file_type, file_data, file_hash)
|
||||
VALUES (%s, %s, %s, %s)
|
||||
''', (filename, file_type, file_data, file_hash))
|
||||
media_id = cursor.lastrowid
|
||||
|
||||
# 关联到词条
|
||||
cursor.execute('''
|
||||
INSERT IGNORE INTO dict_entry_media (entry_id, media_id)
|
||||
VALUES (%s, %s)
|
||||
''', (entry_id, media_id))
|
||||
|
||||
# 提取详细信息(如果有的话)
|
||||
details = {}
|
||||
if file_type == 'image':
|
||||
# 对于图片,可以提取一些基本信息
|
||||
details['size'] = len(file_data)
|
||||
# 这里可以添加更多图片处理逻辑
|
||||
|
||||
# 更新媒体记录的详细信息
|
||||
if details:
|
||||
cursor.execute('''
|
||||
UPDATE dict_media
|
||||
SET details = %s
|
||||
WHERE id = %s
|
||||
''', (json.dumps(details), media_id))
|
||||
|
||||
except Error as e:
|
||||
print(f"保存媒体文件时出错: {e}")
|
||||
|
||||
self.conn.commit()
|
||||
cursor.close()
|
||||
|
||||
def export_media_files(self, export_dir: str) -> None:
|
||||
"""导出媒体文件到本地目录"""
|
||||
cursor = self.conn.cursor(dictionary=True)
|
||||
|
||||
try:
|
||||
# 创建导出目录
|
||||
os.makedirs(export_dir, exist_ok=True)
|
||||
|
||||
# 查询所有媒体文件
|
||||
cursor.execute('SELECT id, filename, file_type, file_data FROM dict_media')
|
||||
media_records = cursor.fetchall()
|
||||
|
||||
for record in media_records:
|
||||
media_id, filename, file_type, file_data = record['id'], record['filename'], record['file_type'], record['file_data']
|
||||
if file_data:
|
||||
# 确保文件名安全
|
||||
safe_filename = self._sanitize_filename(filename)
|
||||
file_path = os.path.join(export_dir, safe_filename)
|
||||
|
||||
# 写入文件
|
||||
with open(file_path, 'wb') as f:
|
||||
f.write(file_data)
|
||||
|
||||
print(f"导出媒体文件: {file_path}")
|
||||
|
||||
except Error as e:
|
||||
print(f"导出媒体文件时出错: {e}")
|
||||
|
||||
cursor.close()
|
||||
|
||||
def _sanitize_filename(self, filename: str) -> str:
|
||||
"""清理文件名,确保安全"""
|
||||
# 移除或替换不安全的字符
|
||||
unsafe_chars = '<>:"/\\|?*'
|
||||
for char in unsafe_chars:
|
||||
filename = filename.replace(char, '_')
|
||||
return filename
|
||||
|
||||
def close(self):
|
||||
"""关闭数据库连接"""
|
||||
if self.conn:
|
||||
self.conn.close()
|
||||
|
||||
|
||||
# 使用示例
|
||||
def main():
|
||||
# 数据库配置
|
||||
db_config = {
|
||||
'host': 'localhost',
|
||||
'database': 'mysql',
|
||||
'user': 'root',
|
||||
'password': 'root',
|
||||
'port': 3306
|
||||
}
|
||||
|
||||
# 文件路径
|
||||
mdx_path = './LDOCE5.mdx'
|
||||
mdd_path = './LDOCE5.mdd' # 可选
|
||||
|
||||
# 创建解析器实例
|
||||
parser = DictionaryParser(db_config)
|
||||
|
||||
try:
|
||||
# with open('./exported_media/kernel.html', 'r', encoding='utf-8') as file:
|
||||
# html_str = file.read()
|
||||
# de,image_info = parser.parse_definition_to_metadata(html_str)
|
||||
# print(de)
|
||||
|
||||
# 解析词典文件
|
||||
parser.parse_mdx_mdd(mdx_path, mdd_path)
|
||||
|
||||
# 可选:导出媒体文件到本地目录
|
||||
# parser.export_media_files('./exported_media')
|
||||
|
||||
except Error as e:
|
||||
print(f"解析过程中出现错误: {e}")
|
||||
finally:
|
||||
parser.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,179 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Script to generate and save coupons to the database
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import random
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
# Add the backend directory to the path so we can import modules
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
# Import required modules
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from backend.app.admin.model.coupon import Coupon
|
||||
from backend.utils.snowflake import snowflake
|
||||
from backend.core.conf import settings, get_db_uri
|
||||
|
||||
|
||||
def generate_coupon_codes(prefix: str, quantity: int):
|
||||
"""
|
||||
Generate coupon codes with specified prefix and quantity.
|
||||
|
||||
Format: [PREFIX][NUMBER] - Total 6 characters
|
||||
Example: A12345, TEST0, XYZ999
|
||||
|
||||
Args:
|
||||
prefix (str): The letter prefix for the coupon codes (should be uppercase)
|
||||
quantity (int): Number of coupon codes to generate
|
||||
|
||||
Returns:
|
||||
list: List of generated coupon codes
|
||||
"""
|
||||
if not prefix.isalpha() or not prefix.isupper():
|
||||
raise ValueError("Prefix must be uppercase letters only")
|
||||
|
||||
if len(prefix) == 0 or len(prefix) > 5:
|
||||
raise ValueError("Prefix must be 1-5 characters long")
|
||||
|
||||
if quantity <= 0:
|
||||
raise ValueError("Quantity must be greater than 0")
|
||||
|
||||
# Calculate number of digits based on prefix length (total 6 characters)
|
||||
num_digits = 6 - len(prefix)
|
||||
|
||||
# Maximum possible combinations
|
||||
max_combinations = 10 ** num_digits
|
||||
|
||||
if quantity > max_combinations:
|
||||
raise ValueError(f"With prefix '{prefix}' (length {len(prefix)}), can only generate {max_combinations} unique codes (0 to {max_combinations - 1})")
|
||||
|
||||
codes = []
|
||||
# Generate incremental numbers starting from 0
|
||||
for i in range(quantity):
|
||||
# Format with leading zeros to make it the required number of digits
|
||||
formatted_number = f"{i:0{num_digits}d}"
|
||||
# Combine prefix with formatted number
|
||||
coupon_code = f"{prefix}{formatted_number}"
|
||||
codes.append(coupon_code)
|
||||
|
||||
return codes
|
||||
|
||||
|
||||
def save_coupons_to_db(prefix: str, quantity: int, coupon_type: str, points: int, expire_days: int = None):
|
||||
"""
|
||||
Generate and save coupons to the database.
|
||||
|
||||
Coupon codes are always 6 characters total:
|
||||
- 1-letter prefix: 5 digits (up to 100000 codes: A00000-A99999)
|
||||
- 4-letter prefix: 2 digits (up to 100 codes: TEST00-TEST99)
|
||||
- 5-letter prefix: 1 digit (up to 10 codes: ABCDE0-ABCDE9)
|
||||
|
||||
Args:
|
||||
prefix (str): The letter prefix for the coupon codes
|
||||
quantity (int): Number of coupon codes to generate
|
||||
coupon_type (str): Type of the coupons
|
||||
points (int): Points value of the coupons
|
||||
expire_days (int, optional): Days until expiration. If None, no expiration.
|
||||
"""
|
||||
# Create database engine and session
|
||||
db_url = get_db_uri(settings)
|
||||
# Replace asyncmy with mysql+mysqlconnector for synchronous connection
|
||||
sync_db_url = db_url.replace('mysql+asyncmy', 'mysql+mysqlconnector')
|
||||
|
||||
try:
|
||||
engine = create_engine(sync_db_url, echo=False)
|
||||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||
db = SessionLocal()
|
||||
|
||||
# Generate coupon codes
|
||||
codes = generate_coupon_codes(prefix, quantity)
|
||||
|
||||
# Create coupon objects
|
||||
coupons = []
|
||||
for code in codes:
|
||||
# Generate snowflake ID
|
||||
coupon_id = snowflake.generate()
|
||||
|
||||
# Calculate expiration date if needed
|
||||
expires_at = None
|
||||
if expire_days is not None and expire_days > 0:
|
||||
expires_at = datetime.now() + timedelta(days=expire_days)
|
||||
|
||||
# Create coupon object
|
||||
# Note: id is auto-generated by snowflake, but we want to use our own snowflake generator
|
||||
coupon = Coupon(
|
||||
code=code,
|
||||
type=coupon_type,
|
||||
points=points,
|
||||
expires_at=expires_at
|
||||
)
|
||||
# Set the id manually after creation
|
||||
coupon.id = coupon_id
|
||||
coupons.append(coupon)
|
||||
|
||||
# Bulk insert coupons
|
||||
db.add_all(coupons)
|
||||
db.commit()
|
||||
|
||||
print(f"Successfully saved {len(coupons)} coupons to the database.")
|
||||
print(f"Prefix: {prefix}, Type: {coupon_type}, Points: {points}")
|
||||
if expire_days:
|
||||
print(f"Expires in: {expire_days} days")
|
||||
|
||||
# Display first 5 coupons as examples
|
||||
print("\nSample coupons generated:")
|
||||
for coupon in coupons[:5]:
|
||||
print(f" ID: {coupon.id}, Code: {coupon.code}")
|
||||
|
||||
db.close()
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
print(f"Database error: {e}")
|
||||
if 'db' in locals():
|
||||
db.rollback()
|
||||
db.close()
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
if 'db' in locals():
|
||||
db.close()
|
||||
|
||||
|
||||
def main():
|
||||
"""Main function to demonstrate usage"""
|
||||
print("Coupon Generator and Database Saver")
|
||||
print("=" * 40)
|
||||
|
||||
# Example: Generate and save coupons with different prefixes
|
||||
try:
|
||||
# Single character prefix (5 digits, incremental from 00000)
|
||||
# print("Generating coupons with single character prefix 'A'...")
|
||||
# save_coupons_to_db('A', 5, 'NORMAL', 100, 30)
|
||||
# print("\n" + "-" * 40 + "\n")
|
||||
|
||||
# 4-character prefix (2 digits, incremental from 00)
|
||||
print("Generating coupons with 4-character prefix 'TEST'...")
|
||||
save_coupons_to_db('VIP', 5, 'test', 1000, 60)
|
||||
print("\n" + "-" * 40 + "\n")
|
||||
|
||||
# 3-character prefix (3 digits, incremental from 000)
|
||||
# print("Generating coupons with 3-character prefix 'XYZ'...")
|
||||
# save_coupons_to_db('XYZ', 3, 'SPECIAL', 500, 15)
|
||||
# print("\n" + "-" * 40 + "\n")
|
||||
|
||||
# 5-character prefix (1 digit, incremental from 0)
|
||||
# print("Generating coupons with 5-character prefix 'ABCDE'...")
|
||||
# save_coupons_to_db('ABCDE', 5, 'PREMIUM', 2000, 90)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error in main: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user