Files
diary-family/test_celery_email.py
xiaji 9215de5a3d refactor(email): 重构邮件配置从settings.py迁移到数据库
将邮件配置从Django的settings.py迁移到数据库的SystemConfig模型
更新测试文件以使用新的配置方式
添加邮件后端连接配置以提高灵活性
2026-01-18 17:10:12 +08:00

701 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
"""
Celery邮件发送测试脚本
用于在生产服务器上测试通过Celery异步发送邮件功能
"""
import os
import sys
import time
from pathlib import Path
from loguru import logger
from celery import shared_task
from celery.exceptions import Retry
def test_celery_email_config():
"""测试Celery邮件配置"""
logger.info("开始测试Celery邮件配置...")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from django.conf import settings
from diary_family.celery import app
from core.models import SystemConfig
# 检查Celery配置
celery_config = {
'CELERY_BROKER_URL': getattr(settings, 'CELERY_BROKER_URL', None),
'CELERY_RESULT_BACKEND': getattr(settings, 'CELERY_RESULT_BACKEND', None),
'CELERY_TIMEZONE': getattr(settings, 'CELERY_TIMEZONE', None),
}
logger.info("Celery配置信息:")
for key, value in celery_config.items():
if value is None:
logger.warning(f" {key}: 未配置")
else:
# 只显示URL的协议和主机部分隐藏密码
if 'redis://' in str(value):
# 隐藏密码
display_value = str(value).split('@')[0] + '@***'
logger.info(f" {key}: {display_value}")
else:
logger.info(f" {key}: {value}")
# 从数据库获取邮件配置
config = SystemConfig.get_config()
email_config = {
'smtp_server': config.smtp_server,
'smtp_port': config.smtp_port,
'smtp_username': config.smtp_username,
'recipient_email': config.recipient_email,
}
logger.info("邮件配置信息:")
for key, value in email_config.items():
if value is None:
logger.warning(f" {key}: 未配置")
else:
logger.info(f" {key}: {value}")
# 验证必要配置
missing_configs = []
if not getattr(settings, 'CELERY_BROKER_URL', None):
missing_configs.append('CELERY_BROKER_URL')
if not config.smtp_server:
missing_configs.append('smtp_server')
if not config.smtp_username:
missing_configs.append('smtp_username')
if not config.smtp_password:
missing_configs.append('smtp_password')
if missing_configs:
logger.error(f"缺少必要的配置: {', '.join(missing_configs)}")
return False
# 测试Celery连接
logger.info("测试Celery连接...")
try:
result = app.control.ping(timeout=10)
logger.info(f"Celery连接结果: {result}")
if result:
logger.success("Celery连接成功")
else:
logger.warning("Celery连接测试未返回结果worker可能未运行")
except Exception as e:
logger.warning(f"Celery连接测试失败: {e}")
logger.info("这可能表示worker未运行但测试任务仍可发送到队列")
logger.success("Celery邮件配置测试通过")
return True
except Exception as e:
logger.error(f"Celery邮件配置测试失败: {e}")
return False
def test_celery_worker_status():
"""测试Celery Worker状态"""
logger.info("检查Celery Worker状态...")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from diary_family.celery import app
# 获取Worker统计
try:
stats = app.control.inspect().stats()
if stats:
logger.success(f"找到 {len(stats)} 个运行中的worker:")
for worker, info in stats.items():
pool = info.get('pool', {})
concurrency = pool.get('max-concurrency', '未知')
prefetch = pool.get('prefetch_size', '默认')
logger.info(f" Worker: {worker}")
logger.info(f" 并发数: {concurrency}")
logger.info(f" 预取大小: {prefetch}")
logger.info(f" 任务超时: {info.get('task_timeout', '默认')}")
else:
logger.warning("未找到运行中的Celery worker")
logger.info("请启动Celery worker:")
logger.info(" celery -A diary_family worker -l info")
logger.info(" 或使用supervisor管理:")
logger.info(" sudo supervisorctl start celery_worker")
except Exception as e:
logger.warning(f"获取Worker状态失败: {e}")
# 获取活跃任务
try:
active = app.control.inspect().active()
if active:
total_tasks = sum(len(tasks) for tasks in active.values())
logger.info(f"当前活跃任务数: {total_tasks}")
for worker, tasks in active.items():
if tasks:
logger.info(f" Worker {worker}:")
for task in tasks:
logger.info(f" - {task.get('name', '未知')}: {task.get('id', '未知')}")
else:
logger.info("当前没有活跃任务")
except Exception as e:
logger.warning(f"获取活跃任务失败: {e}")
# 获取计划任务
try:
scheduled = app.control.inspect().scheduled()
if scheduled:
total_scheduled = sum(len(tasks) for tasks in scheduled.values())
logger.info(f"计划任务数: {total_scheduled}")
else:
logger.info("当前没有计划任务")
except Exception as e:
logger.warning(f"获取计划任务失败: {e}")
return True
except Exception as e:
logger.error(f"检查Worker状态失败: {e}")
return False
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def celery_send_test_email(self, test_mode=True):
"""
Celery异步发送测试邮件任务
支持重试机制,适合生产环境使用
"""
from django.conf import settings
from django.core.mail import EmailMessage
from django.utils import timezone
import traceback
task_id = self.request.id if hasattr(self, 'request') else 'unknown'
logger.info(f"[任务 {task_id}] 开始执行异步邮件发送任务")
try:
# 从数据库获取邮件配置
from core.models import SystemConfig
from django.core.mail.backends.smtp import EmailBackend
config = SystemConfig.get_config()
from_email = config.smtp_username
if not from_email:
raise ValueError("未配置发件邮箱 (smtp_username)")
to_email = config.recipient_email or from_email
recipient_list = [to_email]
# 获取SMTP配置
host = config.smtp_server or 'localhost'
port = config.smtp_port or 587
username = config.smtp_username or ''
password = config.smtp_password or ''
use_tls = True # 默认使用TLS
# 创建邮件后端
backend = EmailBackend(
host=host,
port=port,
username=username,
password=password,
use_tls=use_tls,
fail_silently=False
)
# 创建邮件内容
subject = f"[Celery测试] 异步邮件发送成功 - {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}"
if test_mode:
body = f"""
这是一封通过Celery异步发送的测试邮件。
✅ 邮件发送成功!
任务信息:
- 任务ID: {task_id}
- 发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}
- 发送方式: Celery异步任务
- 执行重试次数: {self.request.retries}
邮件配置:
- 发件服务器: {host}:{port}
- 使用TLS: {use_tls}
---
家庭日报系统
自动发送Celery异步任务
"""
else:
body = f"""
家庭日报系统测试邮件
发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}
发送方式: Celery异步任务
任务ID: {task_id}
这是一封测试邮件用于验证Celery异步邮件发送功能。
""
# 创建邮件
email = EmailMessage(
subject=subject,
body=body,
from_email=from_email,
to=recipient_list,
connection=backend
)
email.content_subtype = 'plain'
email.encoding = 'utf-8'
# 发送邮件
logger.info(f"[任务 {task_id}] 正在发送邮件...")
sent_count = email.send(fail_silently=False)
if sent_count > 0:
logger.success(f"[任务 {task_id}] 邮件发送成功!发送给 {len(recipient_list)} 个收件人")
return {
'status': 'success',
'task_id': task_id,
'sent_to': len(recipient_list),
'timestamp': timezone.now().isoformat()
}
else:
raise Exception("邮件发送返回0未能发送邮件")
except Exception as e:
error_msg = str(e)
error_traceback = traceback.format_exc()
logger.error(f"[任务 {task_id}] 邮件发送失败: {error_msg}")
logger.error(f"[任务 {task_id}] 错误详情:\n{error_traceback}")
# 检查是否应该重试
if self.request.retries < self.max_retries:
logger.info(f"[任务 {task_id}] 准备第 {self.request.retries + 1} 次重试...")
# 重试延迟指数增长
retry_delay = 60 * (2 ** self.request.retries)
logger.info(f"[任务 {task_id}] {retry_delay}秒后进行重试")
raise self.retry(exc=e, countdown=retry_delay)
else:
logger.error(f"[任务 {task_id}] 已达到最大重试次数 {self.max_retries},放弃重试")
# 返回错误信息而不是抛出异常
return {
'status': 'failed',
'task_id': task_id,
'error': error_msg,
'retries': self.request.retries,
'timestamp': timezone.now().isoformat()
}
@shared_task(bind=True)
def celery_send_html_report_email(self, include_attachment=False):
"""
Celery异步发送HTML报告邮件
模拟每日报告发送功能
"""
from django.conf import settings
from django.core.mail import EmailMessage, EmailMultiAlternatives
from django.template.loader import render_to_string
from django.utils import timezone
from datetime import timedelta
import traceback
task_id = self.request.id if hasattr(self, 'request') else 'unknown'
logger.info(f"[任务 {task_id}] 开始执行HTML报告邮件发送任务")
try:
# 从数据库获取邮件配置
from core.models import SystemConfig
from django.core.mail.backends.smtp import EmailBackend
config = SystemConfig.get_config()
from_email = config.smtp_username
if not from_email:
raise ValueError("未配置发件邮箱")
to_email = config.recipient_email or from_email
if isinstance(to_email, list):
recipient_list = to_email
else:
recipient_list = [to_email]
# 获取SMTP配置
host = config.smtp_server or 'localhost'
port = config.smtp_port or 587
username = config.smtp_username or ''
password = config.smtp_password or ''
use_tls = True # 默认使用TLS
# 准备报告数据
today = timezone.now().date()
yesterday = today - timedelta(days=1)
report_data = {
'today': today.strftime('%Y年%m月%d'),
'yesterday': yesterday.strftime('%Y年%m月%d'),
'tasks': [],
'reading': {'count': 0, 'items': []},
'insights': {'count': 0, 'items': []},
}
# 创建HTML邮件内容
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<style>
body {{ font-family: 'Microsoft YaHei', Arial, sans-serif; line-height: 1.6; color: #333; max-width: 600px; margin: 0 auto; }}
.header {{ background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 30px 20px; text-align: center; }}
.header h1 {{ margin: 0; font-size: 24px; }}
.header .date {{ font-size: 14px; opacity: 0.9; margin-top: 10px; }}
.content {{ padding: 20px; background: #f9f9f9; }}
.section {{ background: white; border-radius: 8px; padding: 20px; margin-bottom: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
.section h2 {{ color: #667eea; font-size: 18px; margin-top: 0; border-bottom: 2px solid #667eea; padding-bottom: 10px; }}
.stat {{ display: inline-block; background: #667eea; color: white; padding: 5px 15px; border-radius: 20px; margin: 5px; }}
.footer {{ text-align: center; padding: 20px; color: #666; font-size: 12px; }}
.success-badge {{ background: #28a745; color: white; padding: 10px 20px; border-radius: 5px; display: inline-block; }}
</style>
</head>
<body>
<div class="header">
<h1>📊 家庭日报</h1>
<div class="date">{report_data['today']}</div>
</div>
<div class="content">
<div class="section">
<h2> 邮件发送测试</h2>
<p>这是一封通过 <strong>Celery异步任务</strong> 发送的HTML格式邮件</p>
<p><strong>任务ID:</strong> {task_id}</p>
<p><strong>发送时间:</strong> {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<p><strong>状态:</strong> <span class="success-badge">发送成功</span></p>
</div>
<div class="section">
<h2>📈 统计信息</h2>
<p>阅读记录: <span class="stat">{report_data['reading']['count']} </span></p>
<p>感悟记录: <span class="stat">{report_data['insights']['count']} </span></p>
<p>家庭事项: <span class="stat">{len(report_data['tasks'])} </span></p>
</div>
<div class="section">
<h2>📋 说明</h2>
<p>此邮件由Celery异步任务自动发送</p>
<p>如需取消订阅请联系系统管理员</p>
</div>
</div>
<div class="footer">
<p>---</p>
<p>家庭日报系统 | 自动发送</p>
<p>任务ID: {task_id}</p>
</div>
</body>
</html>
"""
# 创建邮件后端
backend = EmailBackend(
host=host,
port=port,
username=username,
password=password,
use_tls=use_tls,
fail_silently=False
)
# 创建邮件
subject = f"[Celery] 家庭日报 {report_data['today']} - 测试报告"
email = EmailMultiAlternatives(
subject=subject,
body=html_content,
from_email=from_email,
to=recipient_list,
connection=backend
)
email.attach_alternative(html_content, "text/html")
email.encoding = 'utf-8'
# 添加附件(如果需要)
if include_attachment:
attachment_content = f"""家庭日报测试报告
发送时间: {timezone.now().isoformat()}
任务ID: {task_id}
发送方式: Celery异步任务
这是一份自动生成的测试报告
"""
email.attach('daily_report_test.txt', attachment_content, 'text/plain')
logger.info(f"[任务 {task_id}] 已添加测试附件")
# 发送邮件
logger.info(f"[任务 {task_id}] 正在发送HTML报告邮件...")
sent_count = email.send(fail_silently=False)
if sent_count > 0:
logger.success(f"[任务 {task_id}] HTML报告邮件发送成功")
return {
'status': 'success',
'task_id': task_id,
'type': 'html_report',
'sent_to': len(recipient_list),
'has_attachment': include_attachment,
'timestamp': timezone.now().isoformat()
}
else:
raise Exception("邮件发送返回0")
except Exception as e:
error_msg = str(e)
error_traceback = traceback.format_exc()
logger.error(f"[任务 {task_id}] HTML报告邮件发送失败: {error_msg}")
logger.error(f"[任务 {task_id}] 错误详情:\n{error_traceback}")
return {
'status': 'failed',
'task_id': task_id,
'type': 'html_report',
'error': error_msg,
'timestamp': timezone.now().isoformat()
}
def test_celery_email_task():
"""测试Celery异步邮件任务"""
logger.info("开始测试Celery异步邮件任务...")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from django.utils import timezone
# 发送测试任务
logger.info("发送Celery邮件任务到队列...")
# 发送简单文本邮件任务
task1 = celery_send_test_email.delay(test_mode=True)
logger.info(f"简单邮件任务已发送: {task1.id}")
# 发送HTML报告邮件任务
task2 = celery_send_html_report_email.delay(include_attachment=False)
logger.info(f"HTML报告邮件任务已发送: {task2.id}")
# 等待任务完成
logger.info("等待任务执行最多30秒...")
results = []
tasks = [
('简单邮件', task1),
('HTML报告邮件', task2)
]
for name, task in tasks:
try:
# 等待任务完成
result = task.get(timeout=30)
logger.info(f"{name}任务结果: {result}")
results.append((name, 'success', result))
except Exception as e:
logger.warning(f"{name}任务获取结果失败(可能是超时): {e}")
# 检查任务状态
try:
state = task.state
logger.info(f"{name}任务状态: {state}")
results.append((name, state, None))
except Exception as state_e:
logger.error(f"获取任务状态失败: {state_e}")
results.append((name, 'unknown', None))
# 统计结果
success_count = sum(1 for _, status, _ in results if status == 'success')
logger.info(f"\n任务执行结果: {success_count}/{len(tasks)} 成功")
return success_count == len(tasks)
except Exception as e:
logger.error(f"Celery邮件任务测试失败: {e}")
logger.info("可能的原因:")
logger.info("1. Celery worker未运行")
logger.info("2. Redis连接问题")
logger.info("3. 任务执行超时")
return False
def main():
"""主测试函数"""
logger.info("=" * 60)
logger.info("=== Celery邮件发送功能测试开始 ===")
logger.info("测试环境: Ubuntu + Celery + Redis + SMTP")
logger.info("=" * 60)
tests_passed = 0
total_tests = 4
# 测试1: Celery邮件配置
logger.info("\n[测试1] Celery邮件配置测试")
if test_celery_email_config():
tests_passed += 1
# 测试2: Worker状态
logger.info("\n[测试2] Celery Worker状态检查")
if test_celery_worker_status():
tests_passed += 1
# 测试3: Celery邮件任务
logger.info("\n[测试3] Celery异步邮件任务测试")
if test_celery_email_task():
tests_passed += 1
# 测试4: 发送单个测试邮件(同步)
logger.info("\n[测试4] 同步发送测试邮件")
try:
from django.conf import settings
from django.core.mail import EmailMessage
from django.utils import timezone
from core.models import SystemConfig
from django.core.mail.backends.smtp import EmailBackend
# 从数据库获取邮件配置
config = SystemConfig.get_config()
from_email = config.smtp_username
if not from_email:
raise ValueError("未配置发件邮箱")
to_email = config.recipient_email or from_email
recipient_list = [to_email] if isinstance(to_email, str) else to_email
# 获取SMTP配置
host = config.smtp_server or 'localhost'
port = config.smtp_port or 587
username = config.smtp_username or ''
password = config.smtp_password or ''
use_tls = True # 默认使用TLS
# 创建邮件后端
backend = EmailBackend(
host=host,
port=port,
username=username,
password=password,
use_tls=use_tls,
fail_silently=False
)
subject = f"[直接测试] Celery邮件测试 - {timezone.now().strftime('%H:%M:%S')}"
body = f"""
这是直接发送的测试邮件用于验证SMTP配置
发送时间: {timezone.now().isoformat()}
发送方式: 直接发送非Celery
如果Celery异步任务测试失败但此测试成功
说明SMTP配置正确问题可能在Celery或Redis配置
---
家庭日报系统
"""
email = EmailMessage(
subject=subject,
body=body,
from_email=from_email,
to=recipient_list,
connection=backend
)
sent = email.send(fail_silently=False)
if sent > 0:
logger.success("同步邮件发送成功!")
tests_passed += 1
else:
logger.error("同步邮件发送失败")
except Exception as e:
logger.error(f"同步邮件测试失败: {e}")
# 测试总结
logger.info("\n" + "=" * 60)
logger.info("测试总结:")
logger.info(f"通过测试: {tests_passed}/{total_tests}")
logger.info("=" * 60)
if tests_passed == total_tests:
logger.success("所有测试通过Celery邮件功能正常。")
logger.info("\n✅ Celery配置正确")
logger.info("✅ Redis连接正常")
logger.info("✅ SMTP配置正常")
logger.info("✅ 异步邮件任务正常")
logger.info("\n建议:")
logger.info("1. 定期检查Celery worker状态")
logger.info("2. 监控任务队列长度")
logger.info("3. 设置任务超时和重试机制")
logger.info("4. 配置任务监控如Flower")
return 0
elif tests_passed >= 2:
logger.warning("部分测试通过Celery邮件功能基本可用。")
logger.info("\n需要检查:")
logger.info("1. Celery worker是否运行")
logger.info("2. Redis服务是否正常")
logger.info("3. SMTP配置是否正确")
return 1
else:
logger.error("多数测试失败Celery邮件功能异常。")
logger.info("\n紧急处理:")
logger.info("1. ❌ 检查Celery worker: sudo supervisorctl status celery_worker")
logger.info("2. ❌ 检查Redis: sudo systemctl status redis-server")
logger.info("3. ❌ 检查SMTP配置")
logger.info("4. ❌ 查看日志: tail -f /var/log/celery/worker.log")
return 1
if __name__ == "__main__":
# 配置日志
logger.remove()
# 创建日志目录
log_dir = Path("/var/log/celery")
log_dir.mkdir(parents=True, exist_ok=True)
log_dir.chmod(0o755)
# 添加控制台输出
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="INFO"
)
# 添加日志文件输出
log_file = log_dir / "test_celery_email.log"
logger.add(
log_file,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
level="INFO",
rotation="1 day",
retention="7 days",
encoding="utf-8"
)
logger.info(f"Celery邮件测试日志将同时输出到控制台和 {log_file}")
logger.info("=" * 60)
try:
exit_code = main()
sys.exit(exit_code)
except KeyboardInterrupt:
logger.warning("测试被用户中断")
sys.exit(1)
except Exception as e:
logger.error(f"测试过程中发生未预期错误: {e}")
sys.exit(1)