Files
diary-family/core/tasks.py

569 lines
22 KiB
Python
Raw Normal View History

2026-01-04 19:17:33 +08:00
from celery import shared_task
from loguru import logger
from django.core.mail import EmailMessage, EmailMultiAlternatives
from django.core.mail.backends.smtp import EmailBackend
2026-01-04 19:17:33 +08:00
from django.utils import timezone
from datetime import timedelta
import traceback
import os
import re
from django.db.models import F
2026-01-04 19:17:33 +08:00
@shared_task(bind=True, max_retries=2, default_retry_delay=30, time_limit=600, soft_time_limit=500)
def celery_send_test_email(self, test_mode=True):
"""
Celery异步发送测试邮件任务
支持重试机制适合生产环境使用
"""
task_id = self.request.id if hasattr(self, 'request') else 'unknown'
logger.info(f"[任务 {task_id}] 开始执行异步邮件发送任务")
2026-01-04 19:17:33 +08:00
try:
# 从数据库获取邮件配置
from core.models import SystemConfig
config = SystemConfig.get_config()
# 优先使用sender_email其次使用smtp_username作为发件人
from_email = config.sender_email or config.smtp_username
if not from_email:
raise ValueError("未配置发件邮箱")
# 验证发件人邮箱格式
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, from_email):
raise ValueError(f"发件人邮箱格式不正确: {from_email}")
to_email = config.recipient_email or from_email
# 验证收件人邮箱格式
if not re.match(email_pattern, to_email):
raise ValueError(f"收件人邮箱格式不正确: {to_email}")
recipient_list = [to_email]
# 获取SMTP配置
host = config.smtp_server or 'localhost'
port = config.smtp_port or 587
username = config.smtp_username or ''
password = config.smtp_password or ''
use_tls = True # 默认使用TLS
# 创建邮件后端
backend = EmailBackend(
host=host,
port=port,
username=username,
password=password,
use_tls=use_tls,
fail_silently=False
)
# 创建邮件内容
subject = f"[Celery测试] 异步邮件发送成功 - {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}"
if test_mode:
body = f"""
这是一封通过Celery异步发送的测试邮件
邮件发送成功
任务信息:
- 任务ID: {task_id}
- 发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}
- 发送方式: Celery异步任务
- 执行重试次数: {self.request.retries}
邮件配置:
- 发件服务器: {host}:{port}
- 使用TLS: {use_tls}
---
家庭日报系统
自动发送Celery异步任务
"""
else:
body = f"""
家庭日报系统测试邮件
发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}
发送方式: Celery异步任务
任务ID: {task_id}
这是一封测试邮件用于验证Celery异步邮件发送功能
"""
# 创建邮件
2026-01-04 19:17:33 +08:00
email = EmailMessage(
subject=subject,
body=body,
2026-01-04 19:17:33 +08:00
from_email=from_email,
to=recipient_list,
connection=backend
2026-01-04 19:17:33 +08:00
)
email.content_subtype = 'plain'
email.encoding = 'utf-8'
# 发送邮件
logger.info(f"[任务 {task_id}] 正在发送邮件...")
sent_count = email.send(fail_silently=False)
if sent_count > 0:
logger.success(f"[任务 {task_id}] 邮件发送成功!发送给 {len(recipient_list)} 个收件人")
return {
'status': 'success',
'task_id': task_id,
'sent_to': len(recipient_list),
'timestamp': timezone.now().isoformat()
}
else:
raise Exception("邮件发送返回0未能发送邮件")
except Exception as e:
error_msg = str(e)
error_traceback = traceback.format_exc()
2026-01-04 19:17:33 +08:00
logger.error(f"[任务 {task_id}] 邮件发送失败: {error_msg}")
logger.error(f"[任务 {task_id}] 错误详情:\n{error_traceback}")
# 检查是否应该重试
if self.request.retries < self.max_retries:
logger.info(f"[任务 {task_id}] 准备第 {self.request.retries + 1} 次重试...")
# 重试延迟指数增长
retry_delay = 60 * (2 ** self.request.retries)
logger.info(f"[任务 {task_id}] {retry_delay}秒后进行重试")
raise self.retry(exc=e, countdown=retry_delay)
else:
logger.error(f"[任务 {task_id}] 已达到最大重试次数 {self.max_retries},放弃重试")
# 返回错误信息而不是抛出异常
return {
'status': 'failed',
'task_id': task_id,
'error': error_msg,
'retries': self.request.retries,
'timestamp': timezone.now().isoformat()
}
@shared_task(bind=True)
def celery_send_html_report_email(self, include_attachment=False):
"""
Celery异步发送HTML报告邮件
模拟每日报告发送功能
"""
task_id = self.request.id if hasattr(self, 'request') else 'unknown'
logger.info(f"[任务 {task_id}] 开始执行HTML报告邮件发送任务")
try:
# 从数据库获取邮件配置
from core.models import SystemConfig
config = SystemConfig.get_config()
# 优先使用sender_email其次使用smtp_username作为发件人
from_email = config.sender_email or config.smtp_username
if not from_email:
raise ValueError("未配置发件邮箱")
# 验证发件人邮箱格式
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, from_email):
raise ValueError(f"发件人邮箱格式不正确: {from_email}")
to_email = config.recipient_email or from_email
# 验证收件人邮箱格式
if not re.match(email_pattern, to_email):
raise ValueError(f"收件人邮箱格式不正确: {to_email}")
if isinstance(to_email, list):
recipient_list = to_email
else:
recipient_list = [to_email]
# 获取SMTP配置
host = config.smtp_server or 'localhost'
port = config.smtp_port or 587
username = config.smtp_username or ''
password = config.smtp_password or ''
use_tls = True # 默认使用TLS
# 准备报告数据
today = timezone.now().date()
yesterday = today - timedelta(days=1)
report_data = {
'today': today.strftime('%Y年%m月%d'),
'yesterday': yesterday.strftime('%Y年%m月%d'),
'tasks': [],
'reading': {'count': 0, 'items': []},
'insights': {'count': 0, 'items': []},
}
# 创建HTML邮件内容使用普通字符串拼接避免f-string中的%语法问题)
html_content = '''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<style>
body { font-family: 'Microsoft YaHei', Arial, sans-serif; line-height: 1.6; color: #333; max-width: 600px; margin: 0 auto; }
.header { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 30px 20px; text-align: center; }
.header h1 { margin: 0; font-size: 24px; }
.header .date { font-size: 14px; opacity: 0.9; margin-top: 10px; }
.content { padding: 20px; background: #f9f9f9; }
.section { background: white; border-radius: 8px; padding: 20px; margin-bottom: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
.section h2 { color: #667eea; font-size: 18px; margin-top: 0; border-bottom: 2px solid #667eea; padding-bottom: 10px; }
.stat { display: inline-block; background: #667eea; color: white; padding: 5px 15px; border-radius: 20px; margin: 5px; }
.footer { text-align: center; padding: 20px; color: #666; font-size: 12px; }
.success-badge { background: #28a745; color: white; padding: 10px 20px; border-radius: 5px; display: inline-block; }
</style>
</head>
<body>
<div class="header">
<h1>📊 家庭日报</h1>
<div class="date">{today}</div>
</div>
<div class="content">
<div class="section">
<h2> 邮件发送测试</h2>
<p>这是一封通过 <strong>Celery异步任务</strong> 发送的HTML格式邮件</p>
<p><strong>任务ID:</strong> {task_id}</p>
<p><strong>发送时间:</strong> {send_time}</p>
<p><strong>状态:</strong> <span class="success-badge">发送成功</span></p>
</div>
2026-01-04 19:17:33 +08:00
<div class="section">
<h2>📈 统计信息</h2>
<p>阅读记录: <span class="stat">{reading_count} </span></p>
<p>感悟记录: <span class="stat">{insights_count} </span></p>
<p>家庭事项: <span class="stat">{tasks_count} </span></p>
</div>
<div class="section">
<h2>📋 说明</h2>
<p>此邮件由Celery异步任务自动发送</p>
<p>如需取消订阅请联系系统管理员</p>
</div>
</div>
<div class="footer">
<p>---</p>
<p>家庭日报系统 | 自动发送</p>
<p>任务ID: {task_id}</p>
</div>
</body>
</html>
'''.format(
today=report_data['today'],
task_id=task_id,
send_time=timezone.now().strftime('%Y-%m-%d %H:%M:%S'),
reading_count=report_data['reading']['count'],
insights_count=report_data['insights']['count'],
tasks_count=len(report_data['tasks'])
)
# 创建邮件后端
backend = EmailBackend(
host=host,
port=port,
username=username,
password=password,
use_tls=use_tls,
fail_silently=False
)
# 创建邮件
subject = f"[Celery] 家庭日报 {report_data['today']} - 测试报告"
email = EmailMultiAlternatives(
subject=subject,
body=html_content,
from_email=from_email,
to=recipient_list,
connection=backend
)
email.attach_alternative(html_content, "text/html")
email.encoding = 'utf-8'
# 添加附件(如果需要)
if include_attachment:
attachment_content = '''家庭日报测试报告
发送时间: {send_time}
任务ID: {task_id}
发送方式: Celery异步任务
这是一份自动生成的测试报告
'''.format(
send_time=timezone.now().isoformat(),
task_id=task_id
)
email.attach('daily_report_test.txt', attachment_content, 'text/plain')
logger.info(f"[任务 {task_id}] 已添加测试附件")
2026-01-04 19:17:33 +08:00
# 发送邮件
logger.info(f"[任务 {task_id}] 正在发送HTML报告邮件...")
sent_count = email.send(fail_silently=False)
if sent_count > 0:
logger.success(f"[任务 {task_id}] HTML报告邮件发送成功")
return {
'status': 'success',
'task_id': task_id,
'type': 'html_report',
'sent_to': len(recipient_list),
'has_attachment': include_attachment,
'timestamp': timezone.now().isoformat()
}
else:
raise Exception("邮件发送返回0")
2026-01-04 19:17:33 +08:00
except Exception as e:
error_msg = str(e)
error_traceback = traceback.format_exc()
logger.error(f"[任务 {task_id}] HTML报告邮件发送失败: {error_msg}")
logger.error(f"[任务 {task_id}] 错误详情:\n{error_traceback}")
return {
'status': 'failed',
'task_id': task_id,
'type': 'html_report',
'error': error_msg,
'timestamp': timezone.now().isoformat()
}
@shared_task(bind=True, max_retries=2, default_retry_delay=30, time_limit=600, soft_time_limit=500)
def celery_send_pdf_report_email(self):
"""
Celery异步生成PDF报告并发送邮件
1. 检查WeasyPrint是否可用
2. 生成PDF报告
3. 发送包含PDF附件的邮件
"""
task_id = self.request.id if hasattr(self, 'request') else 'unknown'
logger.info(f"[任务 {task_id}] 开始执行PDF报告邮件发送任务")
logger.info(f"[任务 {task_id}] 重试次数: {self.request.retries if hasattr(self, 'request') else 0}")
try:
# 检查WeasyPrint是否可用
weasyprint_available = False
try:
from weasyprint import HTML
weasyprint_available = True
logger.info(f"[任务 {task_id}] ✅ WeasyPrint库导入成功")
except ImportError as e:
logger.error(f"[任务 {task_id}] ❌ WeasyPrint库无法导入: {e}")
raise ValueError("WeasyPrint库无法导入PDF功能将不可用")
# 从数据库获取邮件配置
from core.models import SystemConfig, ReadingRecord, InsightRecord, TodayPlan, FamilyTask
from django.conf import settings
from django.template.loader import render_to_string
config = SystemConfig.get_config()
logger.info(f"[任务 {task_id}] ✅ 成功获取系统配置")
# 优先使用sender_email其次使用smtp_username作为发件人
from_email = config.sender_email or config.smtp_username
if not from_email:
logger.error(f"[任务 {task_id}] ❌ 未配置发件邮箱")
raise ValueError("未配置发件邮箱")
# 验证发件人邮箱格式
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, from_email):
logger.error(f"[任务 {task_id}] ❌ 发件人邮箱格式不正确: {from_email}")
raise ValueError(f"发件人邮箱格式不正确: {from_email}")
to_email = config.recipient_email or from_email
# 验证收件人邮箱格式
if not re.match(email_pattern, to_email):
logger.error(f"[任务 {task_id}] ❌ 收件人邮箱格式不正确: {to_email}")
raise ValueError(f"收件人邮箱格式不正确: {to_email}")
recipient_list = [to_email] if isinstance(to_email, str) else to_email
logger.info(f"[任务 {task_id}] ✅ 邮箱配置验证通过,发件人: {from_email}, 收件人: {recipient_list}")
# 获取SMTP配置
host = config.smtp_server or 'localhost'
port = config.smtp_port or 587
username = config.smtp_username or ''
password = config.smtp_password or ''
use_tls = True # 默认使用TLS
logger.info(f"[任务 {task_id}] ✅ SMTP配置: 服务器={host}, 端口={port}, 用户名={username}")
# 准备报告数据
today = timezone.now().date()
yesterday = today - timedelta(days=1)
logger.info(f"[任务 {task_id}] ✅ 报告日期: {today}, 昨日: {yesterday}")
# 获取昨日记录
logger.info(f"[任务 {task_id}] 📊 获取昨日阅读记录...")
yesterday_reading = ReadingRecord.objects.filter(date=yesterday)
logger.info(f"[任务 {task_id}] ✅ 昨日阅读记录: {yesterday_reading.count()}")
logger.info(f"[任务 {task_id}] 📊 获取昨日感悟记录...")
yesterday_insight = InsightRecord.objects.filter(date=yesterday)
logger.info(f"[任务 {task_id}] ✅ 昨日感悟记录: {yesterday_insight.count()}")
# 获取今日计划
logger.info(f"[任务 {task_id}] 📋 获取今日计划...")
today_plan = TodayPlan.objects.filter(date=today)
logger.info(f"[任务 {task_id}] ✅ 今日计划: {today_plan.count()}")
# 获取家庭事项统计
logger.info(f"[任务 {task_id}] 📊 获取家庭事项统计...")
from django.db.models import Count
family_task_stats = FamilyTask.objects.values('type').annotate(count=Count('id'))
logger.info(f"[任务 {task_id}] ✅ 家庭事项统计: {len(family_task_stats)} 个类型")
# 准备上下文数据
context = {
'today': today,
'yesterday': yesterday,
'yesterday_reading': yesterday_reading,
'yesterday_insight': yesterday_insight,
'today_plan': today_plan,
'family_task_stats': family_task_stats,
}
# 渲染HTML模板
logger.info(f"[任务 {task_id}] 🎨 渲染HTML模板...")
html_string = render_to_string('core/report_pdf.html', context)
logger.info(f"[任务 {task_id}] ✅ HTML模板渲染成功")
# 生成PDF
today_str = today.strftime('%Y-%m-%d')
pdf_file = f"report_{today_str}.pdf"
pdf_path = os.path.join(settings.REPORTS_ROOT, pdf_file)
logger.info(f"[任务 {task_id}] 📄 准备生成PDF保存路径: {pdf_path}")
# 确保报告目录存在
os.makedirs(settings.REPORTS_ROOT, exist_ok=True)
logger.info(f"[任务 {task_id}] ✅ 报告目录已准备好")
# 使用WeasyPrint生成PDF设置超时
logger.info(f"[任务 {task_id}] 📄 正在生成PDF...")
HTML(string=html_string).write_pdf(pdf_path, timeout=60) # 设置60秒超时
logger.info(f"[任务 {task_id}] ✅ PDF报告生成成功: {pdf_path}")
# 检查PDF文件大小
file_size = 0
if os.path.exists(pdf_path):
file_size = os.path.getsize(pdf_path) / (1024 * 1024) # MB
logger.info(f"[任务 {task_id}] 📄 PDF文件大小: {file_size:.2f} MB")
# 创建邮件后端,设置超时
logger.info(f"[任务 {task_id}] 📧 正在创建SMTP连接...")
backend = EmailBackend(
host=host,
port=port,
username=username,
password=password,
use_tls=use_tls,
fail_silently=False,
timeout=30, # 设置30秒连接超时
ssl_certfile=None,
ssl_keyfile=None
)
logger.info(f"[任务 {task_id}] ✅ SMTP连接创建成功")
# 创建邮件
subject = f"[Celery] 家庭日报 {today_str} - PDF报告"
body = f"""
这是一封包含PDF报告的邮件
报告日期: {today_str}
发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}
任务ID: {task_id}
报告包含以下内容:
- 昨日阅读记录 ({yesterday_reading.count()} )
- 昨日感悟记录 ({yesterday_insight.count()} )
- 今日计划 ({today_plan.count()} )
- 家庭事项统计
请查看附件获取完整报告
---
家庭日报系统
自动发送Celery异步任务
"""
email = EmailMessage(
subject=subject,
body=body,
from_email=from_email,
to=recipient_list,
connection=backend
)
email.content_subtype = 'plain'
email.encoding = 'utf-8'
logger.info(f"[任务 {task_id}] ✅ 邮件对象创建成功")
# 添加PDF附件
logger.info(f"[任务 {task_id}] 📎 正在添加PDF附件...")
with open(pdf_path, 'rb') as f:
email.attach(pdf_file, f.read(), 'application/pdf')
logger.info(f"[任务 {task_id}] ✅ PDF附件添加成功")
# 发送邮件
logger.info(f"[任务 {task_id}] 📧 正在发送邮件...")
logger.info(f"[任务 {task_id}] 发件人: {from_email}")
logger.info(f"[任务 {task_id}] 收件人: {recipient_list}")
logger.info(f"[任务 {task_id}] 主题: {subject}")
sent_count = email.send(fail_silently=False)
logger.info(f"[任务 {task_id}] 📧 邮件发送返回结果: {sent_count}")
if sent_count > 0:
logger.success(f"[任务 {task_id}] ✅ 包含PDF附件的邮件发送成功")
# 清理生成的PDF文件
if os.path.exists(pdf_path):
os.remove(pdf_path)
logger.info(f"[任务 {task_id}] 🧹 已清理生成的PDF文件: {pdf_path}")
return {
'status': 'success',
'task_id': task_id,
'type': 'pdf_report',
'sent_to': len(recipient_list),
'pdf_file': pdf_file,
'pdf_size': file_size,
'timestamp': timezone.now().isoformat()
}
else:
logger.error(f"[任务 {task_id}] ❌ 邮件发送返回0")
raise Exception("邮件发送返回0")
except Exception as e:
error_msg = str(e)
error_traceback = traceback.format_exc()
logger.error(f"[任务 {task_id}] ❌ PDF报告邮件发送失败: {error_msg}")
logger.error(f"[任务 {task_id}] 📋 错误详情:\n{error_traceback}")
# 检查是否应该重试
if hasattr(self, 'request') and self.request.retries < self.max_retries:
logger.info(f"[任务 {task_id}] 🔄 准备第 {self.request.retries + 1} 次重试...")
# 重试延迟指数增长
retry_delay = 30 * (2 ** self.request.retries)
logger.info(f"[任务 {task_id}] ⏱️ {retry_delay}秒后进行重试")
raise self.retry(exc=e, countdown=retry_delay)
else:
logger.error(f"[任务 {task_id}] 💥 已达到最大重试次数 {self.max_retries},放弃重试")
# 返回错误信息而不是抛出异常
return {
'status': 'failed',
'task_id': task_id,
'type': 'pdf_report',
'error': error_msg,
'error_detail': error_traceback[:500] if error_traceback else '',
'retries': self.request.retries if hasattr(self, 'request') else 0,
'timestamp': timezone.now().isoformat()
}