Files
diary-family/test_celery_email.py

420 lines
15 KiB
Python
Raw Normal View History

#!/usr/bin/env python
"""
Celery邮件发送测试脚本
用于在生产服务器上测试通过Celery异步发送邮件功能
"""
import os
import sys
import time
from pathlib import Path
from loguru import logger
def test_celery_email_config():
"""测试Celery邮件配置"""
logger.info("开始测试Celery邮件配置...")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from django.conf import settings
from diary_family.celery import app
from core.models import SystemConfig
# 检查Celery配置
celery_config = {
'CELERY_BROKER_URL': getattr(settings, 'CELERY_BROKER_URL', None),
'CELERY_RESULT_BACKEND': getattr(settings, 'CELERY_RESULT_BACKEND', None),
'CELERY_TIMEZONE': getattr(settings, 'CELERY_TIMEZONE', None),
}
logger.info("Celery配置信息:")
for key, value in celery_config.items():
if value is None:
logger.warning(f" {key}: 未配置")
else:
# 只显示URL的协议和主机部分隐藏密码
if 'redis://' in str(value):
# 隐藏密码
display_value = str(value).split('@')[0] + '@***'
logger.info(f" {key}: {display_value}")
else:
logger.info(f" {key}: {value}")
# 从数据库获取邮件配置
config = SystemConfig.get_config()
email_config = {
'smtp_server': config.smtp_server,
'smtp_port': config.smtp_port,
'smtp_username': config.smtp_username,
'sender_email': config.sender_email,
'recipient_email': config.recipient_email,
}
logger.info("邮件配置信息:")
for key, value in email_config.items():
if value is None:
logger.warning(f" {key}: 未配置")
else:
logger.info(f" {key}: {value}")
# 验证必要配置
missing_configs = []
if not getattr(settings, 'CELERY_BROKER_URL', None):
missing_configs.append('CELERY_BROKER_URL')
if not config.smtp_server:
missing_configs.append('smtp_server')
if not config.smtp_username:
missing_configs.append('smtp_username')
if not config.smtp_password:
missing_configs.append('smtp_password')
if missing_configs:
logger.error(f"缺少必要的配置: {', '.join(missing_configs)}")
return False
# 验证邮箱格式
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
# 验证发件人邮箱格式优先使用sender_email其次使用smtp_username
sender_email = config.sender_email or config.smtp_username
if sender_email and not re.match(email_pattern, sender_email):
logger.error(f"发件人邮箱格式不正确: {sender_email}")
logger.error("请在系统配置页面输入有效的邮箱地址")
return False
# 验证收件人邮箱格式(如果已配置)
if config.recipient_email and not re.match(email_pattern, config.recipient_email):
logger.error(f"收件人邮箱格式不正确: {config.recipient_email}")
logger.error("请在系统配置页面输入有效的邮箱地址")
return False
# 测试Celery连接
logger.info("测试Celery连接...")
try:
result = app.control.ping(timeout=10)
logger.info(f"Celery连接结果: {result}")
if result:
logger.success("Celery连接成功")
else:
logger.warning("Celery连接测试未返回结果worker可能未运行")
except Exception as e:
logger.warning(f"Celery连接测试失败: {e}")
logger.info("这可能表示worker未运行但测试任务仍可发送到队列")
logger.success("Celery邮件配置测试通过")
return True
except Exception as e:
logger.error(f"Celery邮件配置测试失败: {e}")
return False
def test_celery_worker_status():
"""测试Celery Worker状态"""
logger.info("检查Celery Worker状态...")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from diary_family.celery import app
# 获取Worker统计
try:
stats = app.control.inspect().stats()
if stats:
logger.success(f"找到 {len(stats)} 个运行中的worker:")
for worker, info in stats.items():
pool = info.get('pool', {})
concurrency = pool.get('max-concurrency', '未知')
prefetch = pool.get('prefetch_size', '默认')
logger.info(f" Worker: {worker}")
logger.info(f" 并发数: {concurrency}")
logger.info(f" 预取大小: {prefetch}")
logger.info(f" 任务超时: {info.get('task_timeout', '默认')}")
else:
logger.warning("未找到运行中的Celery worker")
logger.info("请启动Celery worker:")
logger.info(" celery -A diary_family worker -l info")
logger.info(" 或使用supervisor管理:")
logger.info(" sudo supervisorctl start celery_worker")
except Exception as e:
logger.warning(f"获取Worker状态失败: {e}")
# 获取活跃任务
try:
active = app.control.inspect().active()
if active:
total_tasks = sum(len(tasks) for tasks in active.values())
logger.info(f"当前活跃任务数: {total_tasks}")
for worker, tasks in active.items():
if tasks:
logger.info(f" Worker {worker}:")
for task in tasks:
logger.info(f" - {task.get('name', '未知')}: {task.get('id', '未知')}")
else:
logger.info("当前没有活跃任务")
except Exception as e:
logger.warning(f"获取活跃任务失败: {e}")
# 获取计划任务
try:
scheduled = app.control.inspect().scheduled()
if scheduled:
total_scheduled = sum(len(tasks) for tasks in scheduled.values())
logger.info(f"计划任务数: {total_scheduled}")
else:
logger.info("当前没有计划任务")
except Exception as e:
logger.warning(f"获取计划任务失败: {e}")
return True
except Exception as e:
logger.error(f"检查Worker状态失败: {e}")
return False
def test_celery_email_task():
"""测试Celery异步邮件任务"""
logger.info("开始测试Celery异步邮件任务...")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from django.utils import timezone
from core.tasks import celery_send_test_email, celery_send_html_report_email
# 发送测试任务
logger.info("发送Celery邮件任务到队列...")
# 发送简单文本邮件任务
task1 = celery_send_test_email.delay(test_mode=True)
logger.info(f"简单邮件任务已发送: {task1.id}")
# 发送HTML报告邮件任务
task2 = celery_send_html_report_email.delay(include_attachment=False)
logger.info(f"HTML报告邮件任务已发送: {task2.id}")
# 等待任务完成
logger.info("等待任务执行最多30秒...")
results = []
tasks = [
('简单邮件', task1),
('HTML报告邮件', task2)
]
for name, task in tasks:
try:
# 等待任务完成
result = task.get(timeout=30)
logger.info(f"{name}任务结果: {result}")
results.append((name, 'success', result))
except Exception as e:
logger.warning(f"{name}任务获取结果失败(可能是超时): {e}")
# 检查任务状态
try:
state = task.state
logger.info(f"{name}任务状态: {state}")
results.append((name, state, None))
except Exception as state_e:
logger.error(f"获取任务状态失败: {state_e}")
results.append((name, 'unknown', None))
# 统计结果
success_count = sum(1 for _, status, _ in results if status == 'success')
logger.info(f"\n任务执行结果: {success_count}/{len(tasks)} 成功")
return success_count == len(tasks)
except Exception as e:
logger.error(f"Celery邮件任务测试失败: {e}")
logger.info("可能的原因:")
logger.info("1. Celery worker未运行")
logger.info("2. Redis连接问题")
logger.info("3. 任务执行超时")
return False
def main():
"""主测试函数"""
logger.info("=" * 60)
logger.info("=== Celery邮件发送功能测试开始 ===")
logger.info("测试环境: Ubuntu + Celery + Redis + SMTP")
logger.info("=" * 60)
tests_passed = 0
total_tests = 4
# 测试1: Celery邮件配置
logger.info("\n[测试1] Celery邮件配置测试")
if test_celery_email_config():
tests_passed += 1
# 测试2: Worker状态
logger.info("\n[测试2] Celery Worker状态检查")
if test_celery_worker_status():
tests_passed += 1
# 测试3: Celery邮件任务
logger.info("\n[测试3] Celery异步邮件任务测试")
if test_celery_email_task():
tests_passed += 1
# 测试4: 发送单个测试邮件(同步)
logger.info("\n[测试4] 同步发送测试邮件")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from django.conf import settings
from django.core.mail import EmailMessage
from django.utils import timezone
from core.models import SystemConfig
from django.core.mail.backends.smtp import EmailBackend
# 从数据库获取邮件配置
config = SystemConfig.get_config()
# 优先使用sender_email其次使用smtp_username作为发件人
from_email = config.sender_email or config.smtp_username
if not from_email:
raise ValueError("未配置发件邮箱")
to_email = config.recipient_email or from_email
recipient_list = [to_email] if isinstance(to_email, str) else to_email
# 获取SMTP配置
host = config.smtp_server or 'localhost'
port = config.smtp_port or 587
username = config.smtp_username or ''
password = config.smtp_password or ''
use_tls = True # 默认使用TLS
# 创建邮件后端
backend = EmailBackend(
host=host,
port=port,
username=username,
password=password,
use_tls=use_tls,
fail_silently=False
)
subject = "[直接测试] Celery邮件测试 - {}".format(timezone.now().strftime('%H:%M:%S'))
body = '''
这是直接发送的测试邮件用于验证SMTP配置
发送时间: {send_time}
发送方式: 直接发送非Celery
如果Celery异步任务测试失败但此测试成功
说明SMTP配置正确问题可能在Celery或Redis配置
---
家庭日报系统
'''.format(
send_time=timezone.now().isoformat()
)
email = EmailMessage(
subject=subject,
body=body,
from_email=from_email,
to=recipient_list,
connection=backend
)
sent = email.send(fail_silently=False)
if sent > 0:
logger.success("同步邮件发送成功!")
tests_passed += 1
else:
logger.error("同步邮件发送失败")
except Exception as e:
logger.error(f"同步邮件测试失败: {e}")
# 测试总结
logger.info("\n" + "=" * 60)
logger.info("测试总结:")
logger.info(f"通过测试: {tests_passed}/{total_tests}")
logger.info("=" * 60)
if tests_passed == total_tests:
logger.success("所有测试通过Celery邮件功能正常。")
logger.info("\n✅ Celery配置正确")
logger.info("✅ Redis连接正常")
logger.info("✅ SMTP配置正常")
logger.info("✅ 异步邮件任务正常")
logger.info("\n建议:")
logger.info("1. 定期检查Celery worker状态")
logger.info("2. 监控任务队列长度")
logger.info("3. 设置任务超时和重试机制")
logger.info("4. 配置任务监控如Flower")
return 0
elif tests_passed >= 2:
logger.warning("部分测试通过Celery邮件功能基本可用。")
logger.info("\n需要检查:")
logger.info("1. Celery worker是否运行")
logger.info("2. Redis服务是否正常")
logger.info("3. SMTP配置是否正确")
return 1
else:
logger.error("多数测试失败Celery邮件功能异常。")
logger.info("\n紧急处理:")
logger.info("1. ❌ 检查Celery worker: sudo supervisorctl status celery_worker")
logger.info("2. ❌ 检查Redis: sudo systemctl status redis-server")
logger.info("3. ❌ 检查SMTP配置")
logger.info("4. ❌ 查看日志: tail -f /var/log/celery/worker.log")
return 1
if __name__ == "__main__":
# 配置日志
logger.remove()
# 创建日志目录
log_dir = Path("/var/log/celery")
log_dir.mkdir(parents=True, exist_ok=True)
log_dir.chmod(0o755)
# 添加控制台输出
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="INFO"
)
# 添加日志文件输出
log_file = log_dir / "test_celery_email.log"
logger.add(
log_file,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
level="INFO",
rotation="1 day",
retention="7 days",
encoding="utf-8"
)
logger.info(f"Celery邮件测试日志将同时输出到控制台和 {log_file}")
logger.info("=" * 60)
try:
exit_code = main()
sys.exit(exit_code)
except KeyboardInterrupt:
logger.warning("测试被用户中断")
sys.exit(1)
except Exception as e:
logger.error(f"测试过程中发生未预期错误: {e}")
sys.exit(1)