Files
diary-family/test_celery_email.py
xiaji 34b03c0e64 fix: 修复邮件后端导入并添加Django环境初始化
修复了core/tasks.py中错误的EmailBackend导入方式,改为从正确路径导入。同时在测试文件中添加了Django环境初始化代码,确保测试能够正确运行。
2026-01-18 21:32:34 +08:00

420 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
"""
Celery邮件发送测试脚本
用于在生产服务器上测试通过Celery异步发送邮件功能
"""
import os
import sys
import time
from pathlib import Path
from loguru import logger
def test_celery_email_config():
"""测试Celery邮件配置"""
logger.info("开始测试Celery邮件配置...")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from django.conf import settings
from diary_family.celery import app
from core.models import SystemConfig
# 检查Celery配置
celery_config = {
'CELERY_BROKER_URL': getattr(settings, 'CELERY_BROKER_URL', None),
'CELERY_RESULT_BACKEND': getattr(settings, 'CELERY_RESULT_BACKEND', None),
'CELERY_TIMEZONE': getattr(settings, 'CELERY_TIMEZONE', None),
}
logger.info("Celery配置信息:")
for key, value in celery_config.items():
if value is None:
logger.warning(f" {key}: 未配置")
else:
# 只显示URL的协议和主机部分隐藏密码
if 'redis://' in str(value):
# 隐藏密码
display_value = str(value).split('@')[0] + '@***'
logger.info(f" {key}: {display_value}")
else:
logger.info(f" {key}: {value}")
# 从数据库获取邮件配置
config = SystemConfig.get_config()
email_config = {
'smtp_server': config.smtp_server,
'smtp_port': config.smtp_port,
'smtp_username': config.smtp_username,
'sender_email': config.sender_email,
'recipient_email': config.recipient_email,
}
logger.info("邮件配置信息:")
for key, value in email_config.items():
if value is None:
logger.warning(f" {key}: 未配置")
else:
logger.info(f" {key}: {value}")
# 验证必要配置
missing_configs = []
if not getattr(settings, 'CELERY_BROKER_URL', None):
missing_configs.append('CELERY_BROKER_URL')
if not config.smtp_server:
missing_configs.append('smtp_server')
if not config.smtp_username:
missing_configs.append('smtp_username')
if not config.smtp_password:
missing_configs.append('smtp_password')
if missing_configs:
logger.error(f"缺少必要的配置: {', '.join(missing_configs)}")
return False
# 验证邮箱格式
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
# 验证发件人邮箱格式优先使用sender_email其次使用smtp_username
sender_email = config.sender_email or config.smtp_username
if sender_email and not re.match(email_pattern, sender_email):
logger.error(f"发件人邮箱格式不正确: {sender_email}")
logger.error("请在系统配置页面输入有效的邮箱地址")
return False
# 验证收件人邮箱格式(如果已配置)
if config.recipient_email and not re.match(email_pattern, config.recipient_email):
logger.error(f"收件人邮箱格式不正确: {config.recipient_email}")
logger.error("请在系统配置页面输入有效的邮箱地址")
return False
# 测试Celery连接
logger.info("测试Celery连接...")
try:
result = app.control.ping(timeout=10)
logger.info(f"Celery连接结果: {result}")
if result:
logger.success("Celery连接成功")
else:
logger.warning("Celery连接测试未返回结果worker可能未运行")
except Exception as e:
logger.warning(f"Celery连接测试失败: {e}")
logger.info("这可能表示worker未运行但测试任务仍可发送到队列")
logger.success("Celery邮件配置测试通过")
return True
except Exception as e:
logger.error(f"Celery邮件配置测试失败: {e}")
return False
def test_celery_worker_status():
"""测试Celery Worker状态"""
logger.info("检查Celery Worker状态...")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from diary_family.celery import app
# 获取Worker统计
try:
stats = app.control.inspect().stats()
if stats:
logger.success(f"找到 {len(stats)} 个运行中的worker:")
for worker, info in stats.items():
pool = info.get('pool', {})
concurrency = pool.get('max-concurrency', '未知')
prefetch = pool.get('prefetch_size', '默认')
logger.info(f" Worker: {worker}")
logger.info(f" 并发数: {concurrency}")
logger.info(f" 预取大小: {prefetch}")
logger.info(f" 任务超时: {info.get('task_timeout', '默认')}")
else:
logger.warning("未找到运行中的Celery worker")
logger.info("请启动Celery worker:")
logger.info(" celery -A diary_family worker -l info")
logger.info(" 或使用supervisor管理:")
logger.info(" sudo supervisorctl start celery_worker")
except Exception as e:
logger.warning(f"获取Worker状态失败: {e}")
# 获取活跃任务
try:
active = app.control.inspect().active()
if active:
total_tasks = sum(len(tasks) for tasks in active.values())
logger.info(f"当前活跃任务数: {total_tasks}")
for worker, tasks in active.items():
if tasks:
logger.info(f" Worker {worker}:")
for task in tasks:
logger.info(f" - {task.get('name', '未知')}: {task.get('id', '未知')}")
else:
logger.info("当前没有活跃任务")
except Exception as e:
logger.warning(f"获取活跃任务失败: {e}")
# 获取计划任务
try:
scheduled = app.control.inspect().scheduled()
if scheduled:
total_scheduled = sum(len(tasks) for tasks in scheduled.values())
logger.info(f"计划任务数: {total_scheduled}")
else:
logger.info("当前没有计划任务")
except Exception as e:
logger.warning(f"获取计划任务失败: {e}")
return True
except Exception as e:
logger.error(f"检查Worker状态失败: {e}")
return False
def test_celery_email_task():
"""测试Celery异步邮件任务"""
logger.info("开始测试Celery异步邮件任务...")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from django.utils import timezone
from core.tasks import celery_send_test_email, celery_send_html_report_email
# 发送测试任务
logger.info("发送Celery邮件任务到队列...")
# 发送简单文本邮件任务
task1 = celery_send_test_email.delay(test_mode=True)
logger.info(f"简单邮件任务已发送: {task1.id}")
# 发送HTML报告邮件任务
task2 = celery_send_html_report_email.delay(include_attachment=False)
logger.info(f"HTML报告邮件任务已发送: {task2.id}")
# 等待任务完成
logger.info("等待任务执行最多30秒...")
results = []
tasks = [
('简单邮件', task1),
('HTML报告邮件', task2)
]
for name, task in tasks:
try:
# 等待任务完成
result = task.get(timeout=30)
logger.info(f"{name}任务结果: {result}")
results.append((name, 'success', result))
except Exception as e:
logger.warning(f"{name}任务获取结果失败(可能是超时): {e}")
# 检查任务状态
try:
state = task.state
logger.info(f"{name}任务状态: {state}")
results.append((name, state, None))
except Exception as state_e:
logger.error(f"获取任务状态失败: {state_e}")
results.append((name, 'unknown', None))
# 统计结果
success_count = sum(1 for _, status, _ in results if status == 'success')
logger.info(f"\n任务执行结果: {success_count}/{len(tasks)} 成功")
return success_count == len(tasks)
except Exception as e:
logger.error(f"Celery邮件任务测试失败: {e}")
logger.info("可能的原因:")
logger.info("1. Celery worker未运行")
logger.info("2. Redis连接问题")
logger.info("3. 任务执行超时")
return False
def main():
"""主测试函数"""
logger.info("=" * 60)
logger.info("=== Celery邮件发送功能测试开始 ===")
logger.info("测试环境: Ubuntu + Celery + Redis + SMTP")
logger.info("=" * 60)
tests_passed = 0
total_tests = 4
# 测试1: Celery邮件配置
logger.info("\n[测试1] Celery邮件配置测试")
if test_celery_email_config():
tests_passed += 1
# 测试2: Worker状态
logger.info("\n[测试2] Celery Worker状态检查")
if test_celery_worker_status():
tests_passed += 1
# 测试3: Celery邮件任务
logger.info("\n[测试3] Celery异步邮件任务测试")
if test_celery_email_task():
tests_passed += 1
# 测试4: 发送单个测试邮件(同步)
logger.info("\n[测试4] 同步发送测试邮件")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from django.conf import settings
from django.core.mail import EmailMessage
from django.utils import timezone
from core.models import SystemConfig
from django.core.mail.backends.smtp import EmailBackend
# 从数据库获取邮件配置
config = SystemConfig.get_config()
# 优先使用sender_email其次使用smtp_username作为发件人
from_email = config.sender_email or config.smtp_username
if not from_email:
raise ValueError("未配置发件邮箱")
to_email = config.recipient_email or from_email
recipient_list = [to_email] if isinstance(to_email, str) else to_email
# 获取SMTP配置
host = config.smtp_server or 'localhost'
port = config.smtp_port or 587
username = config.smtp_username or ''
password = config.smtp_password or ''
use_tls = True # 默认使用TLS
# 创建邮件后端
backend = EmailBackend(
host=host,
port=port,
username=username,
password=password,
use_tls=use_tls,
fail_silently=False
)
subject = "[直接测试] Celery邮件测试 - {}".format(timezone.now().strftime('%H:%M:%S'))
body = '''
这是直接发送的测试邮件用于验证SMTP配置
发送时间: {send_time}
发送方式: 直接发送非Celery
如果Celery异步任务测试失败但此测试成功
说明SMTP配置正确问题可能在Celery或Redis配置
---
家庭日报系统
'''.format(
send_time=timezone.now().isoformat()
)
email = EmailMessage(
subject=subject,
body=body,
from_email=from_email,
to=recipient_list,
connection=backend
)
sent = email.send(fail_silently=False)
if sent > 0:
logger.success("同步邮件发送成功!")
tests_passed += 1
else:
logger.error("同步邮件发送失败")
except Exception as e:
logger.error(f"同步邮件测试失败: {e}")
# 测试总结
logger.info("\n" + "=" * 60)
logger.info("测试总结:")
logger.info(f"通过测试: {tests_passed}/{total_tests}")
logger.info("=" * 60)
if tests_passed == total_tests:
logger.success("所有测试通过Celery邮件功能正常。")
logger.info("\n✅ Celery配置正确")
logger.info("✅ Redis连接正常")
logger.info("✅ SMTP配置正常")
logger.info("✅ 异步邮件任务正常")
logger.info("\n建议:")
logger.info("1. 定期检查Celery worker状态")
logger.info("2. 监控任务队列长度")
logger.info("3. 设置任务超时和重试机制")
logger.info("4. 配置任务监控如Flower")
return 0
elif tests_passed >= 2:
logger.warning("部分测试通过Celery邮件功能基本可用。")
logger.info("\n需要检查:")
logger.info("1. Celery worker是否运行")
logger.info("2. Redis服务是否正常")
logger.info("3. SMTP配置是否正确")
return 1
else:
logger.error("多数测试失败Celery邮件功能异常。")
logger.info("\n紧急处理:")
logger.info("1. ❌ 检查Celery worker: sudo supervisorctl status celery_worker")
logger.info("2. ❌ 检查Redis: sudo systemctl status redis-server")
logger.info("3. ❌ 检查SMTP配置")
logger.info("4. ❌ 查看日志: tail -f /var/log/celery/worker.log")
return 1
if __name__ == "__main__":
# 配置日志
logger.remove()
# 创建日志目录
log_dir = Path("/var/log/celery")
log_dir.mkdir(parents=True, exist_ok=True)
log_dir.chmod(0o755)
# 添加控制台输出
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="INFO"
)
# 添加日志文件输出
log_file = log_dir / "test_celery_email.log"
logger.add(
log_file,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
level="INFO",
rotation="1 day",
retention="7 days",
encoding="utf-8"
)
logger.info(f"Celery邮件测试日志将同时输出到控制台和 {log_file}")
logger.info("=" * 60)
try:
exit_code = main()
sys.exit(exit_code)
except KeyboardInterrupt:
logger.warning("测试被用户中断")
sys.exit(1)
except Exception as e:
logger.error(f"测试过程中发生未预期错误: {e}")
sys.exit(1)