✅ 邮件发送测试
这是一封通过 Celery异步任务 发送的HTML格式邮件。
任务ID: {task_id}
发送时间: {send_time}
状态: 发送成功
📈 统计信息
阅读记录: {reading_count} 篇
感悟记录: {insights_count} 条
家庭事项: {tasks_count} 项
📋 说明
此邮件由Celery异步任务自动发送。
如需取消订阅,请联系系统管理员。
#!/usr/bin/env python """ Celery邮件发送测试脚本 用于在生产服务器上测试通过Celery异步发送邮件功能 """ import os import sys import time from pathlib import Path from loguru import logger from celery import shared_task from celery.exceptions import Retry def test_celery_email_config(): """测试Celery邮件配置""" logger.info("开始测试Celery邮件配置...") try: # 初始化Django环境 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings') import django django.setup() from django.conf import settings from diary_family.celery import app from core.models import SystemConfig # 检查Celery配置 celery_config = { 'CELERY_BROKER_URL': getattr(settings, 'CELERY_BROKER_URL', None), 'CELERY_RESULT_BACKEND': getattr(settings, 'CELERY_RESULT_BACKEND', None), 'CELERY_TIMEZONE': getattr(settings, 'CELERY_TIMEZONE', None), } logger.info("Celery配置信息:") for key, value in celery_config.items(): if value is None: logger.warning(f" {key}: 未配置") else: # 只显示URL的协议和主机部分,隐藏密码 if 'redis://' in str(value): # 隐藏密码 display_value = str(value).split('@')[0] + '@***' logger.info(f" {key}: {display_value}") else: logger.info(f" {key}: {value}") # 从数据库获取邮件配置 config = SystemConfig.get_config() email_config = { 'smtp_server': config.smtp_server, 'smtp_port': config.smtp_port, 'smtp_username': config.smtp_username, 'sender_email': config.sender_email, 'recipient_email': config.recipient_email, } logger.info("邮件配置信息:") for key, value in email_config.items(): if value is None: logger.warning(f" {key}: 未配置") else: logger.info(f" {key}: {value}") # 验证必要配置 missing_configs = [] if not getattr(settings, 'CELERY_BROKER_URL', None): missing_configs.append('CELERY_BROKER_URL') if not config.smtp_server: missing_configs.append('smtp_server') if not config.smtp_username: missing_configs.append('smtp_username') if not config.smtp_password: missing_configs.append('smtp_password') if missing_configs: logger.error(f"缺少必要的配置: {', '.join(missing_configs)}") return False # 验证邮箱格式 import re email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' # 验证发件人邮箱格式(优先使用sender_email,其次使用smtp_username) sender_email = config.sender_email or config.smtp_username if sender_email and not re.match(email_pattern, sender_email): logger.error(f"发件人邮箱格式不正确: {sender_email}") logger.error("请在系统配置页面输入有效的邮箱地址") return False # 验证收件人邮箱格式(如果已配置) if config.recipient_email and not re.match(email_pattern, config.recipient_email): logger.error(f"收件人邮箱格式不正确: {config.recipient_email}") logger.error("请在系统配置页面输入有效的邮箱地址") return False # 测试Celery连接 logger.info("测试Celery连接...") try: result = app.control.ping(timeout=10) logger.info(f"Celery连接结果: {result}") if result: logger.success("Celery连接成功!") else: logger.warning("Celery连接测试未返回结果,worker可能未运行") except Exception as e: logger.warning(f"Celery连接测试失败: {e}") logger.info("这可能表示worker未运行,但测试任务仍可发送到队列") logger.success("Celery邮件配置测试通过!") return True except Exception as e: logger.error(f"Celery邮件配置测试失败: {e}") return False def test_celery_worker_status(): """测试Celery Worker状态""" logger.info("检查Celery Worker状态...") try: # 初始化Django环境 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings') import django django.setup() from diary_family.celery import app # 获取Worker统计 try: stats = app.control.inspect().stats() if stats: logger.success(f"找到 {len(stats)} 个运行中的worker:") for worker, info in stats.items(): pool = info.get('pool', {}) concurrency = pool.get('max-concurrency', '未知') prefetch = pool.get('prefetch_size', '默认') logger.info(f" Worker: {worker}") logger.info(f" 并发数: {concurrency}") logger.info(f" 预取大小: {prefetch}") logger.info(f" 任务超时: {info.get('task_timeout', '默认')}") else: logger.warning("未找到运行中的Celery worker") logger.info("请启动Celery worker:") logger.info(" celery -A diary_family worker -l info") logger.info(" 或使用supervisor管理:") logger.info(" sudo supervisorctl start celery_worker") except Exception as e: logger.warning(f"获取Worker状态失败: {e}") # 获取活跃任务 try: active = app.control.inspect().active() if active: total_tasks = sum(len(tasks) for tasks in active.values()) logger.info(f"当前活跃任务数: {total_tasks}") for worker, tasks in active.items(): if tasks: logger.info(f" Worker {worker}:") for task in tasks: logger.info(f" - {task.get('name', '未知')}: {task.get('id', '未知')}") else: logger.info("当前没有活跃任务") except Exception as e: logger.warning(f"获取活跃任务失败: {e}") # 获取计划任务 try: scheduled = app.control.inspect().scheduled() if scheduled: total_scheduled = sum(len(tasks) for tasks in scheduled.values()) logger.info(f"计划任务数: {total_scheduled}") else: logger.info("当前没有计划任务") except Exception as e: logger.warning(f"获取计划任务失败: {e}") return True except Exception as e: logger.error(f"检查Worker状态失败: {e}") return False @shared_task(bind=True, max_retries=3, default_retry_delay=60) def celery_send_test_email(self, test_mode=True): """ Celery异步发送测试邮件任务 支持重试机制,适合生产环境使用 """ from django.conf import settings from django.core.mail import EmailMessage from django.utils import timezone import traceback task_id = self.request.id if hasattr(self, 'request') else 'unknown' logger.info(f"[任务 {task_id}] 开始执行异步邮件发送任务") try: # 从数据库获取邮件配置 from core.models import SystemConfig from django.core.mail.backends.smtp import EmailBackend config = SystemConfig.get_config() # 优先使用sender_email,其次使用smtp_username作为发件人 from_email = config.sender_email or config.smtp_username if not from_email: raise ValueError("未配置发件邮箱") # 验证发件人邮箱格式 import re email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_pattern, from_email): raise ValueError(f"发件人邮箱格式不正确: {from_email}") to_email = config.recipient_email or from_email # 验证收件人邮箱格式 if not re.match(email_pattern, to_email): raise ValueError(f"收件人邮箱格式不正确: {to_email}") recipient_list = [to_email] # 获取SMTP配置 host = config.smtp_server or 'localhost' port = config.smtp_port or 587 username = config.smtp_username or '' password = config.smtp_password or '' use_tls = True # 默认使用TLS # 创建邮件后端 backend = EmailBackend( host=host, port=port, username=username, password=password, use_tls=use_tls, fail_silently=False ) # 创建邮件内容 subject = f"[Celery测试] 异步邮件发送成功 - {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}" if test_mode: body = f""" 这是一封通过Celery异步发送的测试邮件。 ✅ 邮件发送成功! 任务信息: - 任务ID: {task_id} - 发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')} - 发送方式: Celery异步任务 - 执行重试次数: {self.request.retries} 邮件配置: - 发件服务器: {host}:{port} - 使用TLS: {use_tls} --- 家庭日报系统 自动发送(Celery异步任务) """ else: body = f""" 家庭日报系统测试邮件 发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')} 发送方式: Celery异步任务 任务ID: {task_id} 这是一封测试邮件,用于验证Celery异步邮件发送功能。 """ # 创建邮件 email = EmailMessage( subject=subject, body=body, from_email=from_email, to=recipient_list, connection=backend ) email.content_subtype = 'plain' email.encoding = 'utf-8' # 发送邮件 logger.info(f"[任务 {task_id}] 正在发送邮件...") sent_count = email.send(fail_silently=False) if sent_count > 0: logger.success(f"[任务 {task_id}] 邮件发送成功!发送给 {len(recipient_list)} 个收件人") return { 'status': 'success', 'task_id': task_id, 'sent_to': len(recipient_list), 'timestamp': timezone.now().isoformat() } else: raise Exception("邮件发送返回0,未能发送邮件") except Exception as e: error_msg = str(e) error_traceback = traceback.format_exc() logger.error(f"[任务 {task_id}] 邮件发送失败: {error_msg}") logger.error(f"[任务 {task_id}] 错误详情:\n{error_traceback}") # 检查是否应该重试 if self.request.retries < self.max_retries: logger.info(f"[任务 {task_id}] 准备第 {self.request.retries + 1} 次重试...") # 重试延迟指数增长 retry_delay = 60 * (2 ** self.request.retries) logger.info(f"[任务 {task_id}] {retry_delay}秒后进行重试") raise self.retry(exc=e, countdown=retry_delay) else: logger.error(f"[任务 {task_id}] 已达到最大重试次数 {self.max_retries},放弃重试") # 返回错误信息而不是抛出异常 return { 'status': 'failed', 'task_id': task_id, 'error': error_msg, 'retries': self.request.retries, 'timestamp': timezone.now().isoformat() } @shared_task(bind=True) def celery_send_html_report_email(self, include_attachment=False): """ Celery异步发送HTML报告邮件 模拟每日报告发送功能 """ from django.conf import settings from django.core.mail import EmailMessage, EmailMultiAlternatives from django.template.loader import render_to_string from django.utils import timezone from datetime import timedelta import traceback task_id = self.request.id if hasattr(self, 'request') else 'unknown' logger.info(f"[任务 {task_id}] 开始执行HTML报告邮件发送任务") try: # 从数据库获取邮件配置 from core.models import SystemConfig from django.core.mail.backends.smtp import EmailBackend config = SystemConfig.get_config() # 优先使用sender_email,其次使用smtp_username作为发件人 from_email = config.sender_email or config.smtp_username if not from_email: raise ValueError("未配置发件邮箱") # 验证发件人邮箱格式 import re email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_pattern, from_email): raise ValueError(f"发件人邮箱格式不正确: {from_email}") to_email = config.recipient_email or from_email # 验证收件人邮箱格式 if not re.match(email_pattern, to_email): raise ValueError(f"收件人邮箱格式不正确: {to_email}") if isinstance(to_email, list): recipient_list = to_email else: recipient_list = [to_email] # 获取SMTP配置 host = config.smtp_server or 'localhost' port = config.smtp_port or 587 username = config.smtp_username or '' password = config.smtp_password or '' use_tls = True # 默认使用TLS # 准备报告数据 today = timezone.now().date() yesterday = today - timedelta(days=1) report_data = { 'today': today.strftime('%Y年%m月%d日'), 'yesterday': yesterday.strftime('%Y年%m月%d日'), 'tasks': [], 'reading': {'count': 0, 'items': []}, 'insights': {'count': 0, 'items': []}, } # 创建HTML邮件内容(使用普通字符串拼接,避免f-string中的%语法问题) html_content = '''
这是一封通过 Celery异步任务 发送的HTML格式邮件。
任务ID: {task_id}
发送时间: {send_time}
状态: 发送成功
阅读记录: {reading_count} 篇
感悟记录: {insights_count} 条
家庭事项: {tasks_count} 项
此邮件由Celery异步任务自动发送。
如需取消订阅,请联系系统管理员。