#!/usr/bin/env python
"""
Celery邮件发送测试脚本
用于在生产服务器上测试通过Celery异步发送邮件功能
"""
import os
import sys
import time
from pathlib import Path
from loguru import logger
from celery import shared_task
from celery.exceptions import Retry
def test_celery_email_config():
"""测试Celery邮件配置"""
logger.info("开始测试Celery邮件配置...")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from django.conf import settings
from diary_family.celery import app
# 检查Celery配置
celery_config = {
'CELERY_BROKER_URL': getattr(settings, 'CELERY_BROKER_URL', None),
'CELERY_RESULT_BACKEND': getattr(settings, 'CELERY_RESULT_BACKEND', None),
'CELERY_TIMEZONE': getattr(settings, 'CELERY_TIMEZONE', None),
}
logger.info("Celery配置信息:")
for key, value in celery_config.items():
if value is None:
logger.warning(f" {key}: 未配置")
else:
# 只显示URL的协议和主机部分,隐藏密码
if 'redis://' in str(value):
# 隐藏密码
display_value = str(value).split('@')[0] + '@***'
logger.info(f" {key}: {display_value}")
else:
logger.info(f" {key}: {value}")
# 检查邮件配置
email_config = {
'EMAIL_BACKEND': getattr(settings, 'EMAIL_BACKEND', None),
'EMAIL_HOST': getattr(settings, 'EMAIL_HOST', None),
'EMAIL_PORT': getattr(settings, 'EMAIL_PORT', None),
'EMAIL_HOST_USER': getattr(settings, 'EMAIL_HOST_USER', None),
}
logger.info("邮件配置信息:")
for key, value in email_config.items():
if value is None:
logger.warning(f" {key}: 未配置")
else:
if 'password' in key.lower():
logger.info(f" {key}: ***")
else:
logger.info(f" {key}: {value}")
# 验证必要配置
missing_configs = []
if not getattr(settings, 'CELERY_BROKER_URL', None):
missing_configs.append('CELERY_BROKER_URL')
if not getattr(settings, 'EMAIL_HOST', None):
missing_configs.append('EMAIL_HOST')
if not getattr(settings, 'EMAIL_HOST_USER', None):
missing_configs.append('EMAIL_HOST_USER')
if missing_configs:
logger.error(f"缺少必要的配置: {', '.join(missing_configs)}")
return False
# 测试Celery连接
logger.info("测试Celery连接...")
try:
result = app.control.ping(timeout=10)
logger.info(f"Celery连接结果: {result}")
if result:
logger.success("Celery连接成功!")
else:
logger.warning("Celery连接测试未返回结果,worker可能未运行")
except Exception as e:
logger.warning(f"Celery连接测试失败: {e}")
logger.info("这可能表示worker未运行,但测试任务仍可发送到队列")
logger.success("Celery邮件配置测试通过!")
return True
except Exception as e:
logger.error(f"Celery邮件配置测试失败: {e}")
return False
def test_celery_worker_status():
"""测试Celery Worker状态"""
logger.info("检查Celery Worker状态...")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from diary_family.celery import app
# 获取Worker统计
try:
stats = app.control.inspect().stats()
if stats:
logger.success(f"找到 {len(stats)} 个运行中的worker:")
for worker, info in stats.items():
pool = info.get('pool', {})
concurrency = pool.get('max-concurrency', '未知')
prefetch = pool.get('prefetch_size', '默认')
logger.info(f" Worker: {worker}")
logger.info(f" 并发数: {concurrency}")
logger.info(f" 预取大小: {prefetch}")
logger.info(f" 任务超时: {info.get('task_timeout', '默认')}")
else:
logger.warning("未找到运行中的Celery worker")
logger.info("请启动Celery worker:")
logger.info(" celery -A diary_family worker -l info")
logger.info(" 或使用supervisor管理:")
logger.info(" sudo supervisorctl start celery_worker")
except Exception as e:
logger.warning(f"获取Worker状态失败: {e}")
# 获取活跃任务
try:
active = app.control.inspect().active()
if active:
total_tasks = sum(len(tasks) for tasks in active.values())
logger.info(f"当前活跃任务数: {total_tasks}")
for worker, tasks in active.items():
if tasks:
logger.info(f" Worker {worker}:")
for task in tasks:
logger.info(f" - {task.get('name', '未知')}: {task.get('id', '未知')}")
else:
logger.info("当前没有活跃任务")
except Exception as e:
logger.warning(f"获取活跃任务失败: {e}")
# 获取计划任务
try:
scheduled = app.control.inspect().scheduled()
if scheduled:
total_scheduled = sum(len(tasks) for tasks in scheduled.values())
logger.info(f"计划任务数: {total_scheduled}")
else:
logger.info("当前没有计划任务")
except Exception as e:
logger.warning(f"获取计划任务失败: {e}")
return True
except Exception as e:
logger.error(f"检查Worker状态失败: {e}")
return False
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def celery_send_test_email(self, test_mode=True):
"""
Celery异步发送测试邮件任务
支持重试机制,适合生产环境使用
"""
from django.conf import settings
from django.core.mail import EmailMessage
from django.utils import timezone
import traceback
task_id = self.request.id if hasattr(self, 'request') else 'unknown'
logger.info(f"[任务 {task_id}] 开始执行异步邮件发送任务")
try:
# 获取邮件配置
from_email = getattr(settings, 'EMAIL_HOST_USER', None)
if not from_email:
raise ValueError("未配置发件邮箱 (EMAIL_HOST_USER)")
to_email = getattr(settings, 'EMAIL_HOST_USER', from_email)
if isinstance(to_email, list):
recipient_list = to_email
else:
recipient_list = [to_email]
# 创建邮件内容
subject = f"[Celery测试] 异步邮件发送成功 - {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}"
if test_mode:
body = f"""
这是一封通过Celery异步发送的测试邮件。
✅ 邮件发送成功!
任务信息:
- 任务ID: {task_id}
- 发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}
- 发送方式: Celery异步任务
- 执行重试次数: {self.request.retries}
邮件配置:
- 发件服务器: {getattr(settings, 'EMAIL_HOST', '未知')}:{getattr(settings, 'EMAIL_PORT', '未知')}
- 使用TLS: {getattr(settings, 'EMAIL_USE_TLS', '未知')}
---
家庭日报系统
自动发送(Celery异步任务)
"""
else:
body = f"""
家庭日报系统测试邮件
发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}
发送方式: Celery异步任务
任务ID: {task_id}
这是一封测试邮件,用于验证Celery异步邮件发送功能。
"
# 创建邮件
email = EmailMessage(
subject=subject,
body=body,
from_email=from_email,
to=recipient_list,
)
email.content_subtype = 'plain'
email.encoding = 'utf-8'
# 发送邮件
logger.info(f"[任务 {task_id}] 正在发送邮件...")
sent_count = email.send(fail_silently=False)
if sent_count > 0:
logger.success(f"[任务 {task_id}] 邮件发送成功!发送给 {len(recipient_list)} 个收件人")
return {
'status': 'success',
'task_id': task_id,
'sent_to': len(recipient_list),
'timestamp': timezone.now().isoformat()
}
else:
raise Exception("邮件发送返回0,未能发送邮件")
except Exception as e:
error_msg = str(e)
error_traceback = traceback.format_exc()
logger.error(f"[任务 {task_id}] 邮件发送失败: {error_msg}")
logger.error(f"[任务 {task_id}] 错误详情:\n{error_traceback}")
# 检查是否应该重试
if self.request.retries < self.max_retries:
logger.info(f"[任务 {task_id}] 准备第 {self.request.retries + 1} 次重试...")
# 重试延迟指数增长
retry_delay = 60 * (2 ** self.request.retries)
logger.info(f"[任务 {task_id}] {retry_delay}秒后进行重试")
raise self.retry(exc=e, countdown=retry_delay)
else:
logger.error(f"[任务 {task_id}] 已达到最大重试次数 {self.max_retries},放弃重试")
# 返回错误信息而不是抛出异常
return {
'status': 'failed',
'task_id': task_id,
'error': error_msg,
'retries': self.request.retries,
'timestamp': timezone.now().isoformat()
}
@shared_task(bind=True)
def celery_send_html_report_email(self, include_attachment=False):
"""
Celery异步发送HTML报告邮件
模拟每日报告发送功能
"""
from django.conf import settings
from django.core.mail import EmailMessage, EmailMultiAlternatives
from django.template.loader import render_to_string
from django.utils import timezone
from datetime import timedelta
import traceback
task_id = self.request.id if hasattr(self, 'request') else 'unknown'
logger.info(f"[任务 {task_id}] 开始执行HTML报告邮件发送任务")
try:
# 获取邮件配置
from_email = getattr(settings, 'EMAIL_HOST_USER', None)
if not from_email:
raise ValueError("未配置发件邮箱")
to_email = getattr(settings, 'EMAIL_HOST_USER', from_email)
if isinstance(to_email, list):
recipient_list = to_email
else:
recipient_list = [to_email]
# 准备报告数据
today = timezone.now().date()
yesterday = today - timedelta(days=1)
report_data = {
'today': today.strftime('%Y年%m月%d日'),
'yesterday': yesterday.strftime('%Y年%m月%d日'),
'tasks': [],
'reading': {'count': 0, 'items': []},
'insights': {'count': 0, 'items': []},
}
# 创建HTML邮件内容
html_content = f"""
✅ 邮件发送测试
这是一封通过 Celery异步任务 发送的HTML格式邮件。
任务ID: {task_id}
发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}
状态: 发送成功
📈 统计信息
阅读记录: {report_data['reading']['count']} 篇
感悟记录: {report_data['insights']['count']} 条
家庭事项: {len(report_data['tasks'])} 项
📋 说明
此邮件由Celery异步任务自动发送。
如需取消订阅,请联系系统管理员。
"""
# 创建邮件
subject = f"[Celery] 家庭日报 {report_data['today']} - 测试报告"
email = EmailMultiAlternatives(
subject=subject,
body=html_content,
from_email=from_email,
to=recipient_list,
)
email.attach_alternative(html_content, "text/html")
email.encoding = 'utf-8'
# 添加附件(如果需要)
if include_attachment:
attachment_content = f"""家庭日报测试报告
发送时间: {timezone.now().isoformat()}
任务ID: {task_id}
发送方式: Celery异步任务
这是一份自动生成的测试报告。
"""
email.attach('daily_report_test.txt', attachment_content, 'text/plain')
logger.info(f"[任务 {task_id}] 已添加测试附件")
# 发送邮件
logger.info(f"[任务 {task_id}] 正在发送HTML报告邮件...")
sent_count = email.send(fail_silently=False)
if sent_count > 0:
logger.success(f"[任务 {task_id}] HTML报告邮件发送成功!")
return {
'status': 'success',
'task_id': task_id,
'type': 'html_report',
'sent_to': len(recipient_list),
'has_attachment': include_attachment,
'timestamp': timezone.now().isoformat()
}
else:
raise Exception("邮件发送返回0")
except Exception as e:
error_msg = str(e)
error_traceback = traceback.format_exc()
logger.error(f"[任务 {task_id}] HTML报告邮件发送失败: {error_msg}")
logger.error(f"[任务 {task_id}] 错误详情:\n{error_traceback}")
return {
'status': 'failed',
'task_id': task_id,
'type': 'html_report',
'error': error_msg,
'timestamp': timezone.now().isoformat()
}
def test_celery_email_task():
"""测试Celery异步邮件任务"""
logger.info("开始测试Celery异步邮件任务...")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from django.utils import timezone
# 发送测试任务
logger.info("发送Celery邮件任务到队列...")
# 发送简单文本邮件任务
task1 = celery_send_test_email.delay(test_mode=True)
logger.info(f"简单邮件任务已发送: {task1.id}")
# 发送HTML报告邮件任务
task2 = celery_send_html_report_email.delay(include_attachment=False)
logger.info(f"HTML报告邮件任务已发送: {task2.id}")
# 等待任务完成
logger.info("等待任务执行(最多30秒)...")
results = []
tasks = [
('简单邮件', task1),
('HTML报告邮件', task2)
]
for name, task in tasks:
try:
# 等待任务完成
result = task.get(timeout=30)
logger.info(f"{name}任务结果: {result}")
results.append((name, 'success', result))
except Exception as e:
logger.warning(f"{name}任务获取结果失败(可能是超时): {e}")
# 检查任务状态
try:
state = task.state
logger.info(f"{name}任务状态: {state}")
results.append((name, state, None))
except Exception as state_e:
logger.error(f"获取任务状态失败: {state_e}")
results.append((name, 'unknown', None))
# 统计结果
success_count = sum(1 for _, status, _ in results if status == 'success')
logger.info(f"\n任务执行结果: {success_count}/{len(tasks)} 成功")
return success_count == len(tasks)
except Exception as e:
logger.error(f"Celery邮件任务测试失败: {e}")
logger.info("可能的原因:")
logger.info("1. Celery worker未运行")
logger.info("2. Redis连接问题")
logger.info("3. 任务执行超时")
return False
def main():
"""主测试函数"""
logger.info("=" * 60)
logger.info("=== Celery邮件发送功能测试开始 ===")
logger.info("测试环境: Ubuntu + Celery + Redis + SMTP")
logger.info("=" * 60)
tests_passed = 0
total_tests = 4
# 测试1: Celery邮件配置
logger.info("\n[测试1] Celery邮件配置测试")
if test_celery_email_config():
tests_passed += 1
# 测试2: Worker状态
logger.info("\n[测试2] Celery Worker状态检查")
if test_celery_worker_status():
tests_passed += 1
# 测试3: Celery邮件任务
logger.info("\n[测试3] Celery异步邮件任务测试")
if test_celery_email_task():
tests_passed += 1
# 测试4: 发送单个测试邮件(同步)
logger.info("\n[测试4] 同步发送测试邮件")
try:
from django.conf import settings
from django.core.mail import EmailMessage
from django.utils import timezone
from_email = getattr(settings, 'EMAIL_HOST_USER', None)
to_email = getattr(settings, 'EMAIL_HOST_USER', from_email)
subject = f"[直接测试] Celery邮件测试 - {timezone.now().strftime('%H:%M:%S')}"
body = f"""
这是直接发送的测试邮件,用于验证SMTP配置。
发送时间: {timezone.now().isoformat()}
发送方式: 直接发送(非Celery)
如果Celery异步任务测试失败,但此测试成功,
说明SMTP配置正确,问题可能在Celery或Redis配置。
---
家庭日报系统
"""
email = EmailMessage(
subject=subject,
body=body,
from_email=from_email,
to=[to_email] if isinstance(to_email, str) else to_email,
)
sent = email.send(fail_silently=False)
if sent > 0:
logger.success("同步邮件发送成功!")
tests_passed += 1
else:
logger.error("同步邮件发送失败")
except Exception as e:
logger.error(f"同步邮件测试失败: {e}")
# 测试总结
logger.info("\n" + "=" * 60)
logger.info("测试总结:")
logger.info(f"通过测试: {tests_passed}/{total_tests}")
logger.info("=" * 60)
if tests_passed == total_tests:
logger.success("所有测试通过!Celery邮件功能正常。")
logger.info("\n✅ Celery配置正确")
logger.info("✅ Redis连接正常")
logger.info("✅ SMTP配置正常")
logger.info("✅ 异步邮件任务正常")
logger.info("\n建议:")
logger.info("1. 定期检查Celery worker状态")
logger.info("2. 监控任务队列长度")
logger.info("3. 设置任务超时和重试机制")
logger.info("4. 配置任务监控(如Flower)")
return 0
elif tests_passed >= 2:
logger.warning("部分测试通过,Celery邮件功能基本可用。")
logger.info("\n需要检查:")
logger.info("1. Celery worker是否运行")
logger.info("2. Redis服务是否正常")
logger.info("3. SMTP配置是否正确")
return 1
else:
logger.error("多数测试失败,Celery邮件功能异常。")
logger.info("\n紧急处理:")
logger.info("1. ❌ 检查Celery worker: sudo supervisorctl status celery_worker")
logger.info("2. ❌ 检查Redis: sudo systemctl status redis-server")
logger.info("3. ❌ 检查SMTP配置")
logger.info("4. ❌ 查看日志: tail -f /var/log/celery/worker.log")
return 1
if __name__ == "__main__":
# 配置日志
logger.remove()
# 创建日志目录
log_dir = Path("/var/log/celery")
log_dir.mkdir(parents=True, exist_ok=True)
log_dir.chmod(0o755)
# 添加控制台输出
logger.add(
sys.stdout,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
level="INFO"
)
# 添加日志文件输出
log_file = log_dir / "test_celery_email.log"
logger.add(
log_file,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
level="INFO",
rotation="1 day",
retention="7 days",
encoding="utf-8"
)
logger.info(f"Celery邮件测试日志将同时输出到控制台和 {log_file}")
logger.info("=" * 60)
try:
exit_code = main()
sys.exit(exit_code)
except KeyboardInterrupt:
logger.warning("测试被用户中断")
sys.exit(1)
except Exception as e:
logger.error(f"测试过程中发生未预期错误: {e}")
sys.exit(1)