diff --git a/.gitignore b/.gitignore index c73453d..b76673d 100644 --- a/.gitignore +++ b/.gitignore @@ -94,5 +94,4 @@ test*.py *test.py !test_redis_celery.py !test_celery.py -!test_email.py !test_celery_email.py \ No newline at end of file diff --git a/core/tasks.py b/core/tasks.py index daad141..5b189d1 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -1,149 +1,328 @@ from celery import shared_task +from loguru import logger +from django.core.mail import EmailMessage, EmailMultiAlternatives, EmailBackend from django.utils import timezone from datetime import timedelta -from django.core.mail import EmailMessage -from django.conf import settings -from django.shortcuts import render -import os -from loguru import logger +import traceback -# WeasyPrint可用性标记,初始为None -WEASYPRINT_AVAILABLE = None -# 获取WeasyPrint可用性 -def is_weasyprint_available(): - """检查WeasyPrint是否可用""" - global WEASYPRINT_AVAILABLE - if WEASYPRINT_AVAILABLE is None: - try: - from weasyprint import HTML - WEASYPRINT_AVAILABLE = True - except ImportError: - logger.warning("WeasyPrint库无法导入,PDF功能将不可用") - WEASYPRINT_AVAILABLE = False - return WEASYPRINT_AVAILABLE +@shared_task(bind=True, max_retries=3, default_retry_delay=60) +def celery_send_test_email(self, test_mode=True): + """ + Celery异步发送测试邮件任务 + 支持重试机制,适合生产环境使用 + """ + task_id = self.request.id if hasattr(self, 'request') else 'unknown' + logger.info(f"[任务 {task_id}] 开始执行异步邮件发送任务") -from .models import ( - ReadingRecord, - InsightRecord, - FamilyTask, - TodayPlan, - SystemConfig -) - -@shared_task -def generate_daily_pdf_report(): - """生成每日PDF报告""" - logger.info("开始执行每日PDF报告生成任务") - - # 检查WeasyPrint是否可用 - if not is_weasyprint_available(): - logger.error("WeasyPrint库不可用,无法生成PDF报告") - return False - - today = timezone.now().date() - today_str = today.strftime('%Y-%m-%d') - - # 生成报告数据 - report_date = today - yesterday = report_date - timedelta(days=1) - - # 获取昨日记录 - yesterday_reading = ReadingRecord.objects.filter(date=yesterday) - yesterday_insight = InsightRecord.objects.filter(date=yesterday) - - # 获取今日计划 - today_plan = TodayPlan.objects.filter(date=report_date) - - # 获取家庭事项统计 - from django.db.models import Count - family_task_stats = FamilyTask.objects.values('type').annotate(count=Count('id')) - - # 准备上下文 - context = { - 'today': report_date, - 'yesterday': yesterday, - 'yesterday_reading': yesterday_reading, - 'yesterday_insight': yesterday_insight, - 'today_plan': today_plan, - 'family_task_stats': family_task_stats, - } - - # 渲染HTML模板 - html_string = render(None, 'core/report_pdf.html', context).content.decode('utf-8') - - # 生成PDF报告 - pdf_file = f"report_{today_str}.pdf" - pdf_path = os.path.join(settings.REPORTS_ROOT, pdf_file) - - # 确保报告目录存在 - os.makedirs(settings.REPORTS_ROOT, exist_ok=True) - - # 生成PDF - 动态导入WeasyPrint try: - from weasyprint import HTML - HTML(string=html_string).write_pdf(pdf_path) - logger.info(f"PDF报告生成成功: {pdf_path}") - return True - except Exception as e: - logger.error(f"PDF报告生成失败: {str(e)}") - return False + # 从数据库获取邮件配置 + from core.models import SystemConfig + config = SystemConfig.get_config() + # 优先使用sender_email,其次使用smtp_username作为发件人 + from_email = config.sender_email or config.smtp_username + if not from_email: + raise ValueError("未配置发件邮箱") -@shared_task -def send_daily_report(): - """发送每日报告""" - logger.info("开始执行每日报告发送任务") - - # 先生成PDF报告 - pdf_generated = generate_daily_pdf_report() - if not pdf_generated: - logger.error("PDF报告生成失败,无法发送邮件") - return False - - today = timezone.now().date() - today_str = today.strftime('%Y-%m-%d') - - # 获取系统配置 - config = SystemConfig.get_config() - - # 检查邮件配置是否完整 - if not all([config.smtp_server, config.smtp_username, config.smtp_password, config.recipient_email]): - logger.error("邮件配置不完整,无法发送邮件") - return False - - # 发送邮件 - subject = f"家庭日报 - {today_str}" - message = f"这是您的家庭日报,日期:{today_str}" - from_email = config.smtp_username - recipient_list = [config.recipient_email] - - # PDF文件路径 - pdf_file = f"report_{today_str}.pdf" - pdf_path = os.path.join(settings.REPORTS_ROOT, pdf_file) - - try: + # 验证发件人邮箱格式 + import re + email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' + if not re.match(email_pattern, from_email): + raise ValueError(f"发件人邮箱格式不正确: {from_email}") + + to_email = config.recipient_email or from_email + # 验证收件人邮箱格式 + if not re.match(email_pattern, to_email): + raise ValueError(f"收件人邮箱格式不正确: {to_email}") + recipient_list = [to_email] + + # 获取SMTP配置 + host = config.smtp_server or 'localhost' + port = config.smtp_port or 587 + username = config.smtp_username or '' + password = config.smtp_password or '' + use_tls = True # 默认使用TLS + + # 创建邮件后端 + backend = EmailBackend( + host=host, + port=port, + username=username, + password=password, + use_tls=use_tls, + fail_silently=False + ) + + # 创建邮件内容 + subject = f"[Celery测试] 异步邮件发送成功 - {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}" + + if test_mode: + body = f""" +这是一封通过Celery异步发送的测试邮件。 + +✅ 邮件发送成功! + +任务信息: +- 任务ID: {task_id} +- 发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')} +- 发送方式: Celery异步任务 +- 执行重试次数: {self.request.retries} + +邮件配置: +- 发件服务器: {host}:{port} +- 使用TLS: {use_tls} + +--- +家庭日报系统 +自动发送(Celery异步任务) + """ + else: + body = f""" +家庭日报系统测试邮件 + +发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')} +发送方式: Celery异步任务 +任务ID: {task_id} + +这是一封测试邮件,用于验证Celery异步邮件发送功能。 + """ + + # 创建邮件 email = EmailMessage( subject=subject, - body=message, + body=body, from_email=from_email, to=recipient_list, + connection=backend ) - - # 添加附件 - with open(pdf_path, 'rb') as f: - email.attach(pdf_file, f.read(), 'application/pdf') - - # 发送邮件 - email.send() - logger.info(f"邮件发送成功,收件人:{config.recipient_email}") - return True - except Exception as e: - logger.error(f"邮件发送失败:{str(e)}") - return False + email.content_subtype = 'plain' + email.encoding = 'utf-8' -@shared_task -def debug_task(): - """调试任务,用于测试Celery和Redis连接""" - from loguru import logger - logger.info("调试任务执行成功") - return {"status": "success", "message": "Celery任务执行正常", "timestamp": timezone.now().isoformat()} \ No newline at end of file + # 发送邮件 + logger.info(f"[任务 {task_id}] 正在发送邮件...") + sent_count = email.send(fail_silently=False) + + if sent_count > 0: + logger.success(f"[任务 {task_id}] 邮件发送成功!发送给 {len(recipient_list)} 个收件人") + return { + 'status': 'success', + 'task_id': task_id, + 'sent_to': len(recipient_list), + 'timestamp': timezone.now().isoformat() + } + else: + raise Exception("邮件发送返回0,未能发送邮件") + + except Exception as e: + error_msg = str(e) + error_traceback = traceback.format_exc() + + logger.error(f"[任务 {task_id}] 邮件发送失败: {error_msg}") + logger.error(f"[任务 {task_id}] 错误详情:\n{error_traceback}") + + # 检查是否应该重试 + if self.request.retries < self.max_retries: + logger.info(f"[任务 {task_id}] 准备第 {self.request.retries + 1} 次重试...") + # 重试延迟指数增长 + retry_delay = 60 * (2 ** self.request.retries) + logger.info(f"[任务 {task_id}] {retry_delay}秒后进行重试") + + raise self.retry(exc=e, countdown=retry_delay) + else: + logger.error(f"[任务 {task_id}] 已达到最大重试次数 {self.max_retries},放弃重试") + + # 返回错误信息而不是抛出异常 + return { + 'status': 'failed', + 'task_id': task_id, + 'error': error_msg, + 'retries': self.request.retries, + 'timestamp': timezone.now().isoformat() + } + + +@shared_task(bind=True) +def celery_send_html_report_email(self, include_attachment=False): + """ + Celery异步发送HTML报告邮件 + 模拟每日报告发送功能 + """ + task_id = self.request.id if hasattr(self, 'request') else 'unknown' + logger.info(f"[任务 {task_id}] 开始执行HTML报告邮件发送任务") + + try: + # 从数据库获取邮件配置 + from core.models import SystemConfig + config = SystemConfig.get_config() + # 优先使用sender_email,其次使用smtp_username作为发件人 + from_email = config.sender_email or config.smtp_username + if not from_email: + raise ValueError("未配置发件邮箱") + + # 验证发件人邮箱格式 + import re + email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' + if not re.match(email_pattern, from_email): + raise ValueError(f"发件人邮箱格式不正确: {from_email}") + + to_email = config.recipient_email or from_email + # 验证收件人邮箱格式 + if not re.match(email_pattern, to_email): + raise ValueError(f"收件人邮箱格式不正确: {to_email}") + if isinstance(to_email, list): + recipient_list = to_email + else: + recipient_list = [to_email] + + # 获取SMTP配置 + host = config.smtp_server or 'localhost' + port = config.smtp_port or 587 + username = config.smtp_username or '' + password = config.smtp_password or '' + use_tls = True # 默认使用TLS + + # 准备报告数据 + today = timezone.now().date() + yesterday = today - timedelta(days=1) + + report_data = { + 'today': today.strftime('%Y年%m月%d日'), + 'yesterday': yesterday.strftime('%Y年%m月%d日'), + 'tasks': [], + 'reading': {'count': 0, 'items': []}, + 'insights': {'count': 0, 'items': []}, + } + + # 创建HTML邮件内容(使用普通字符串拼接,避免f-string中的%语法问题) + html_content = ''' + + + + + + + +
+

📊 家庭日报

+
{today}
+
+ +
+
+

✅ 邮件发送测试

+

这是一封通过 Celery异步任务 发送的HTML格式邮件。

+

任务ID: {task_id}

+

发送时间: {send_time}

+

状态: 发送成功

+
+ +
+

📈 统计信息

+

阅读记录: {reading_count} 篇

+

感悟记录: {insights_count} 条

+

家庭事项: {tasks_count} 项

+
+ +
+

📋 说明

+

此邮件由Celery异步任务自动发送。

+

如需取消订阅,请联系系统管理员。

+
+
+ + + + + '''.format( + today=report_data['today'], + task_id=task_id, + send_time=timezone.now().strftime('%Y-%m-%d %H:%M:%S'), + reading_count=report_data['reading']['count'], + insights_count=report_data['insights']['count'], + tasks_count=len(report_data['tasks']) + ) + + # 创建邮件后端 + backend = EmailBackend( + host=host, + port=port, + username=username, + password=password, + use_tls=use_tls, + fail_silently=False + ) + + # 创建邮件 + subject = f"[Celery] 家庭日报 {report_data['today']} - 测试报告" + + email = EmailMultiAlternatives( + subject=subject, + body=html_content, + from_email=from_email, + to=recipient_list, + connection=backend + ) + email.attach_alternative(html_content, "text/html") + email.encoding = 'utf-8' + + # 添加附件(如果需要) + if include_attachment: + attachment_content = '''家庭日报测试报告 +发送时间: {send_time} +任务ID: {task_id} +发送方式: Celery异步任务 + +这是一份自动生成的测试报告 + '''.format( + send_time=timezone.now().isoformat(), + task_id=task_id + ) + email.attach('daily_report_test.txt', attachment_content, 'text/plain') + logger.info(f"[任务 {task_id}] 已添加测试附件") + + # 发送邮件 + logger.info(f"[任务 {task_id}] 正在发送HTML报告邮件...") + sent_count = email.send(fail_silently=False) + + if sent_count > 0: + logger.success(f"[任务 {task_id}] HTML报告邮件发送成功!") + return { + 'status': 'success', + 'task_id': task_id, + 'type': 'html_report', + 'sent_to': len(recipient_list), + 'has_attachment': include_attachment, + 'timestamp': timezone.now().isoformat() + } + else: + raise Exception("邮件发送返回0") + + except Exception as e: + error_msg = str(e) + error_traceback = traceback.format_exc() + + logger.error(f"[任务 {task_id}] HTML报告邮件发送失败: {error_msg}") + logger.error(f"[任务 {task_id}] 错误详情:\n{error_traceback}") + + return { + 'status': 'failed', + 'task_id': task_id, + 'type': 'html_report', + 'error': error_msg, + 'timestamp': timezone.now().isoformat() + } diff --git a/test_celery_email.py b/test_celery_email.py index ff76cf8..d38f486 100644 --- a/test_celery_email.py +++ b/test_celery_email.py @@ -9,8 +9,6 @@ import sys import time from pathlib import Path from loguru import logger -from celery import shared_task -from celery.exceptions import Retry def test_celery_email_config(): @@ -183,342 +181,6 @@ def test_celery_worker_status(): return False -@shared_task(bind=True, max_retries=3, default_retry_delay=60) -def celery_send_test_email(self, test_mode=True): - """ - Celery异步发送测试邮件任务 - 支持重试机制,适合生产环境使用 - """ - from django.conf import settings - from django.core.mail import EmailMessage - from django.utils import timezone - import traceback - - task_id = self.request.id if hasattr(self, 'request') else 'unknown' - logger.info(f"[任务 {task_id}] 开始执行异步邮件发送任务") - - try: - # 从数据库获取邮件配置 - from core.models import SystemConfig - from django.core.mail.backends.smtp import EmailBackend - config = SystemConfig.get_config() - # 优先使用sender_email,其次使用smtp_username作为发件人 - from_email = config.sender_email or config.smtp_username - if not from_email: - raise ValueError("未配置发件邮箱") - - # 验证发件人邮箱格式 - import re - email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' - if not re.match(email_pattern, from_email): - raise ValueError(f"发件人邮箱格式不正确: {from_email}") - - to_email = config.recipient_email or from_email - # 验证收件人邮箱格式 - if not re.match(email_pattern, to_email): - raise ValueError(f"收件人邮箱格式不正确: {to_email}") - recipient_list = [to_email] - - # 获取SMTP配置 - host = config.smtp_server or 'localhost' - port = config.smtp_port or 587 - username = config.smtp_username or '' - password = config.smtp_password or '' - use_tls = True # 默认使用TLS - - # 创建邮件后端 - backend = EmailBackend( - host=host, - port=port, - username=username, - password=password, - use_tls=use_tls, - fail_silently=False - ) - - # 创建邮件内容 - subject = f"[Celery测试] 异步邮件发送成功 - {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}" - - if test_mode: - body = f""" -这是一封通过Celery异步发送的测试邮件。 - -✅ 邮件发送成功! - -任务信息: -- 任务ID: {task_id} -- 发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')} -- 发送方式: Celery异步任务 -- 执行重试次数: {self.request.retries} - -邮件配置: -- 发件服务器: {host}:{port} -- 使用TLS: {use_tls} - ---- -家庭日报系统 -自动发送(Celery异步任务) - """ - else: - body = f""" -家庭日报系统测试邮件 - -发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')} -发送方式: Celery异步任务 -任务ID: {task_id} - -这是一封测试邮件,用于验证Celery异步邮件发送功能。 - """ - - # 创建邮件 - email = EmailMessage( - subject=subject, - body=body, - from_email=from_email, - to=recipient_list, - connection=backend - ) - email.content_subtype = 'plain' - email.encoding = 'utf-8' - - # 发送邮件 - logger.info(f"[任务 {task_id}] 正在发送邮件...") - sent_count = email.send(fail_silently=False) - - if sent_count > 0: - logger.success(f"[任务 {task_id}] 邮件发送成功!发送给 {len(recipient_list)} 个收件人") - return { - 'status': 'success', - 'task_id': task_id, - 'sent_to': len(recipient_list), - 'timestamp': timezone.now().isoformat() - } - else: - raise Exception("邮件发送返回0,未能发送邮件") - - except Exception as e: - error_msg = str(e) - error_traceback = traceback.format_exc() - - logger.error(f"[任务 {task_id}] 邮件发送失败: {error_msg}") - logger.error(f"[任务 {task_id}] 错误详情:\n{error_traceback}") - - # 检查是否应该重试 - if self.request.retries < self.max_retries: - logger.info(f"[任务 {task_id}] 准备第 {self.request.retries + 1} 次重试...") - # 重试延迟指数增长 - retry_delay = 60 * (2 ** self.request.retries) - logger.info(f"[任务 {task_id}] {retry_delay}秒后进行重试") - - raise self.retry(exc=e, countdown=retry_delay) - else: - logger.error(f"[任务 {task_id}] 已达到最大重试次数 {self.max_retries},放弃重试") - - # 返回错误信息而不是抛出异常 - return { - 'status': 'failed', - 'task_id': task_id, - 'error': error_msg, - 'retries': self.request.retries, - 'timestamp': timezone.now().isoformat() - } - - -@shared_task(bind=True) -def celery_send_html_report_email(self, include_attachment=False): - """ - Celery异步发送HTML报告邮件 - 模拟每日报告发送功能 - """ - from django.conf import settings - from django.core.mail import EmailMessage, EmailMultiAlternatives - from django.template.loader import render_to_string - from django.utils import timezone - from datetime import timedelta - import traceback - - task_id = self.request.id if hasattr(self, 'request') else 'unknown' - logger.info(f"[任务 {task_id}] 开始执行HTML报告邮件发送任务") - - try: - # 从数据库获取邮件配置 - from core.models import SystemConfig - from django.core.mail.backends.smtp import EmailBackend - config = SystemConfig.get_config() - # 优先使用sender_email,其次使用smtp_username作为发件人 - from_email = config.sender_email or config.smtp_username - if not from_email: - raise ValueError("未配置发件邮箱") - - # 验证发件人邮箱格式 - import re - email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' - if not re.match(email_pattern, from_email): - raise ValueError(f"发件人邮箱格式不正确: {from_email}") - - to_email = config.recipient_email or from_email - # 验证收件人邮箱格式 - if not re.match(email_pattern, to_email): - raise ValueError(f"收件人邮箱格式不正确: {to_email}") - if isinstance(to_email, list): - recipient_list = to_email - else: - recipient_list = [to_email] - - # 获取SMTP配置 - host = config.smtp_server or 'localhost' - port = config.smtp_port or 587 - username = config.smtp_username or '' - password = config.smtp_password or '' - use_tls = True # 默认使用TLS - - # 准备报告数据 - today = timezone.now().date() - yesterday = today - timedelta(days=1) - - report_data = { - 'today': today.strftime('%Y年%m月%d日'), - 'yesterday': yesterday.strftime('%Y年%m月%d日'), - 'tasks': [], - 'reading': {'count': 0, 'items': []}, - 'insights': {'count': 0, 'items': []}, - } - - # 创建HTML邮件内容(使用普通字符串拼接,避免f-string中的%语法问题) - html_content = ''' - - - - - - - -
-

📊 家庭日报

-
{today}
-
- -
-
-

✅ 邮件发送测试

-

这是一封通过 Celery异步任务 发送的HTML格式邮件。

-

任务ID: {task_id}

-

发送时间: {send_time}

-

状态: 发送成功

-
- -
-

📈 统计信息

-

阅读记录: {reading_count} 篇

-

感悟记录: {insights_count} 条

-

家庭事项: {tasks_count} 项

-
- -
-

📋 说明

-

此邮件由Celery异步任务自动发送。

-

如需取消订阅,请联系系统管理员。

-
-
- - - - - '''.format( - today=report_data['today'], - task_id=task_id, - send_time=timezone.now().strftime('%Y-%m-%d %H:%M:%S'), - reading_count=report_data['reading']['count'], - insights_count=report_data['insights']['count'], - tasks_count=len(report_data['tasks']) - ) - - # 创建邮件后端 - backend = EmailBackend( - host=host, - port=port, - username=username, - password=password, - use_tls=use_tls, - fail_silently=False - ) - - # 创建邮件 - subject = f"[Celery] 家庭日报 {report_data['today']} - 测试报告" - - email = EmailMultiAlternatives( - subject=subject, - body=html_content, - from_email=from_email, - to=recipient_list, - connection=backend - ) - email.attach_alternative(html_content, "text/html") - email.encoding = 'utf-8' - - # 添加附件(如果需要) - if include_attachment: - attachment_content = '''家庭日报测试报告 -发送时间: {send_time} -任务ID: {task_id} -发送方式: Celery异步任务 - -这是一份自动生成的测试报告 - '''.format( - send_time=timezone.now().isoformat(), - task_id=task_id - ) - email.attach('daily_report_test.txt', attachment_content, 'text/plain') - logger.info(f"[任务 {task_id}] 已添加测试附件") - - # 发送邮件 - logger.info(f"[任务 {task_id}] 正在发送HTML报告邮件...") - sent_count = email.send(fail_silently=False) - - if sent_count > 0: - logger.success(f"[任务 {task_id}] HTML报告邮件发送成功!") - return { - 'status': 'success', - 'task_id': task_id, - 'type': 'html_report', - 'sent_to': len(recipient_list), - 'has_attachment': include_attachment, - 'timestamp': timezone.now().isoformat() - } - else: - raise Exception("邮件发送返回0") - - except Exception as e: - error_msg = str(e) - error_traceback = traceback.format_exc() - - logger.error(f"[任务 {task_id}] HTML报告邮件发送失败: {error_msg}") - logger.error(f"[任务 {task_id}] 错误详情:\n{error_traceback}") - - return { - 'status': 'failed', - 'task_id': task_id, - 'type': 'html_report', - 'error': error_msg, - 'timestamp': timezone.now().isoformat() - } - - def test_celery_email_task(): """测试Celery异步邮件任务""" logger.info("开始测试Celery异步邮件任务...") @@ -529,6 +191,7 @@ def test_celery_email_task(): django.setup() from django.utils import timezone + from core.tasks import celery_send_test_email, celery_send_html_report_email # 发送测试任务 logger.info("发送Celery邮件任务到队列...")