from celery import shared_task from loguru import logger from django.core.mail import EmailMessage, EmailMultiAlternatives, EmailBackend from django.utils import timezone from datetime import timedelta import traceback @shared_task(bind=True, max_retries=3, default_retry_delay=60) def celery_send_test_email(self, test_mode=True): """ Celery异步发送测试邮件任务 支持重试机制,适合生产环境使用 """ task_id = self.request.id if hasattr(self, 'request') else 'unknown' logger.info(f"[任务 {task_id}] 开始执行异步邮件发送任务") try: # 从数据库获取邮件配置 from core.models import SystemConfig config = SystemConfig.get_config() # 优先使用sender_email,其次使用smtp_username作为发件人 from_email = config.sender_email or config.smtp_username if not from_email: raise ValueError("未配置发件邮箱") # 验证发件人邮箱格式 import re email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_pattern, from_email): raise ValueError(f"发件人邮箱格式不正确: {from_email}") to_email = config.recipient_email or from_email # 验证收件人邮箱格式 if not re.match(email_pattern, to_email): raise ValueError(f"收件人邮箱格式不正确: {to_email}") recipient_list = [to_email] # 获取SMTP配置 host = config.smtp_server or 'localhost' port = config.smtp_port or 587 username = config.smtp_username or '' password = config.smtp_password or '' use_tls = True # 默认使用TLS # 创建邮件后端 backend = EmailBackend( host=host, port=port, username=username, password=password, use_tls=use_tls, fail_silently=False ) # 创建邮件内容 subject = f"[Celery测试] 异步邮件发送成功 - {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}" if test_mode: body = f""" 这是一封通过Celery异步发送的测试邮件。 ✅ 邮件发送成功! 任务信息: - 任务ID: {task_id} - 发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')} - 发送方式: Celery异步任务 - 执行重试次数: {self.request.retries} 邮件配置: - 发件服务器: {host}:{port} - 使用TLS: {use_tls} --- 家庭日报系统 自动发送(Celery异步任务) """ else: body = f""" 家庭日报系统测试邮件 发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')} 发送方式: Celery异步任务 任务ID: {task_id} 这是一封测试邮件,用于验证Celery异步邮件发送功能。 """ # 创建邮件 email = EmailMessage( subject=subject, body=body, from_email=from_email, to=recipient_list, connection=backend ) email.content_subtype = 'plain' email.encoding = 'utf-8' # 发送邮件 logger.info(f"[任务 {task_id}] 正在发送邮件...") sent_count = email.send(fail_silently=False) if sent_count > 0: logger.success(f"[任务 {task_id}] 邮件发送成功!发送给 {len(recipient_list)} 个收件人") return { 'status': 'success', 'task_id': task_id, 'sent_to': len(recipient_list), 'timestamp': timezone.now().isoformat() } else: raise Exception("邮件发送返回0,未能发送邮件") except Exception as e: error_msg = str(e) error_traceback = traceback.format_exc() logger.error(f"[任务 {task_id}] 邮件发送失败: {error_msg}") logger.error(f"[任务 {task_id}] 错误详情:\n{error_traceback}") # 检查是否应该重试 if self.request.retries < self.max_retries: logger.info(f"[任务 {task_id}] 准备第 {self.request.retries + 1} 次重试...") # 重试延迟指数增长 retry_delay = 60 * (2 ** self.request.retries) logger.info(f"[任务 {task_id}] {retry_delay}秒后进行重试") raise self.retry(exc=e, countdown=retry_delay) else: logger.error(f"[任务 {task_id}] 已达到最大重试次数 {self.max_retries},放弃重试") # 返回错误信息而不是抛出异常 return { 'status': 'failed', 'task_id': task_id, 'error': error_msg, 'retries': self.request.retries, 'timestamp': timezone.now().isoformat() } @shared_task(bind=True) def celery_send_html_report_email(self, include_attachment=False): """ Celery异步发送HTML报告邮件 模拟每日报告发送功能 """ task_id = self.request.id if hasattr(self, 'request') else 'unknown' logger.info(f"[任务 {task_id}] 开始执行HTML报告邮件发送任务") try: # 从数据库获取邮件配置 from core.models import SystemConfig config = SystemConfig.get_config() # 优先使用sender_email,其次使用smtp_username作为发件人 from_email = config.sender_email or config.smtp_username if not from_email: raise ValueError("未配置发件邮箱") # 验证发件人邮箱格式 import re email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_pattern, from_email): raise ValueError(f"发件人邮箱格式不正确: {from_email}") to_email = config.recipient_email or from_email # 验证收件人邮箱格式 if not re.match(email_pattern, to_email): raise ValueError(f"收件人邮箱格式不正确: {to_email}") if isinstance(to_email, list): recipient_list = to_email else: recipient_list = [to_email] # 获取SMTP配置 host = config.smtp_server or 'localhost' port = config.smtp_port or 587 username = config.smtp_username or '' password = config.smtp_password or '' use_tls = True # 默认使用TLS # 准备报告数据 today = timezone.now().date() yesterday = today - timedelta(days=1) report_data = { 'today': today.strftime('%Y年%m月%d日'), 'yesterday': yesterday.strftime('%Y年%m月%d日'), 'tasks': [], 'reading': {'count': 0, 'items': []}, 'insights': {'count': 0, 'items': []}, } # 创建HTML邮件内容(使用普通字符串拼接,避免f-string中的%语法问题) html_content = '''

📊 家庭日报

{today}

✅ 邮件发送测试

这是一封通过 Celery异步任务 发送的HTML格式邮件。

任务ID: {task_id}

发送时间: {send_time}

状态: 发送成功

📈 统计信息

阅读记录: {reading_count} 篇

感悟记录: {insights_count} 条

家庭事项: {tasks_count} 项

📋 说明

此邮件由Celery异步任务自动发送。

如需取消订阅,请联系系统管理员。

'''.format( today=report_data['today'], task_id=task_id, send_time=timezone.now().strftime('%Y-%m-%d %H:%M:%S'), reading_count=report_data['reading']['count'], insights_count=report_data['insights']['count'], tasks_count=len(report_data['tasks']) ) # 创建邮件后端 backend = EmailBackend( host=host, port=port, username=username, password=password, use_tls=use_tls, fail_silently=False ) # 创建邮件 subject = f"[Celery] 家庭日报 {report_data['today']} - 测试报告" email = EmailMultiAlternatives( subject=subject, body=html_content, from_email=from_email, to=recipient_list, connection=backend ) email.attach_alternative(html_content, "text/html") email.encoding = 'utf-8' # 添加附件(如果需要) if include_attachment: attachment_content = '''家庭日报测试报告 发送时间: {send_time} 任务ID: {task_id} 发送方式: Celery异步任务 这是一份自动生成的测试报告 '''.format( send_time=timezone.now().isoformat(), task_id=task_id ) email.attach('daily_report_test.txt', attachment_content, 'text/plain') logger.info(f"[任务 {task_id}] 已添加测试附件") # 发送邮件 logger.info(f"[任务 {task_id}] 正在发送HTML报告邮件...") sent_count = email.send(fail_silently=False) if sent_count > 0: logger.success(f"[任务 {task_id}] HTML报告邮件发送成功!") return { 'status': 'success', 'task_id': task_id, 'type': 'html_report', 'sent_to': len(recipient_list), 'has_attachment': include_attachment, 'timestamp': timezone.now().isoformat() } else: raise Exception("邮件发送返回0") except Exception as e: error_msg = str(e) error_traceback = traceback.format_exc() logger.error(f"[任务 {task_id}] HTML报告邮件发送失败: {error_msg}") logger.error(f"[任务 {task_id}] 错误详情:\n{error_traceback}") return { 'status': 'failed', 'task_id': task_id, 'type': 'html_report', 'error': error_msg, 'timestamp': timezone.now().isoformat() }