from celery import shared_task
from loguru import logger
from django.core.mail import EmailMessage, EmailMultiAlternatives
from django.core.mail.backends.smtp import EmailBackend
from django.utils import timezone
from datetime import timedelta
import traceback
import os
import re
from django.db.models import F
@shared_task(bind=True, max_retries=2, default_retry_delay=30, time_limit=600, soft_time_limit=500)
def celery_send_test_email(self, test_mode=True):
"""
Celery异步发送测试邮件任务
支持重试机制,适合生产环境使用
"""
task_id = self.request.id if hasattr(self, 'request') else 'unknown'
logger.info(f"[任务 {task_id}] 开始执行异步邮件发送任务")
try:
# 从数据库获取邮件配置
from core.models import SystemConfig
config = SystemConfig.get_config()
# 优先使用sender_email,其次使用smtp_username作为发件人
from_email = config.sender_email or config.smtp_username
if not from_email:
raise ValueError("未配置发件邮箱")
# 验证发件人邮箱格式
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, from_email):
raise ValueError(f"发件人邮箱格式不正确: {from_email}")
to_email = config.recipient_email or from_email
# 验证收件人邮箱格式
if not re.match(email_pattern, to_email):
raise ValueError(f"收件人邮箱格式不正确: {to_email}")
recipient_list = [to_email]
# 获取SMTP配置
host = config.smtp_server or 'localhost'
port = config.smtp_port or 587
username = config.smtp_username or ''
password = config.smtp_password or ''
use_tls = True # 默认使用TLS
# 创建邮件后端
backend = EmailBackend(
host=host,
port=port,
username=username,
password=password,
use_tls=use_tls,
fail_silently=False
)
# 创建邮件内容
subject = f"[Celery测试] 异步邮件发送成功 - {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}"
if test_mode:
body = f"""
这是一封通过Celery异步发送的测试邮件。
✅ 邮件发送成功!
任务信息:
- 任务ID: {task_id}
- 发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}
- 发送方式: Celery异步任务
- 执行重试次数: {self.request.retries}
邮件配置:
- 发件服务器: {host}:{port}
- 使用TLS: {use_tls}
---
家庭日报系统
自动发送(Celery异步任务)
"""
else:
body = f"""
家庭日报系统测试邮件
发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}
发送方式: Celery异步任务
任务ID: {task_id}
这是一封测试邮件,用于验证Celery异步邮件发送功能。
"""
# 创建邮件
email = EmailMessage(
subject=subject,
body=body,
from_email=from_email,
to=recipient_list,
connection=backend
)
email.content_subtype = 'plain'
email.encoding = 'utf-8'
# 发送邮件
logger.info(f"[任务 {task_id}] 正在发送邮件...")
sent_count = email.send(fail_silently=False)
if sent_count > 0:
logger.success(f"[任务 {task_id}] 邮件发送成功!发送给 {len(recipient_list)} 个收件人")
return {
'status': 'success',
'task_id': task_id,
'sent_to': len(recipient_list),
'timestamp': timezone.now().isoformat()
}
else:
raise Exception("邮件发送返回0,未能发送邮件")
except Exception as e:
error_msg = str(e)
error_traceback = traceback.format_exc()
logger.error(f"[任务 {task_id}] 邮件发送失败: {error_msg}")
logger.error(f"[任务 {task_id}] 错误详情:\n{error_traceback}")
# 检查是否应该重试
if self.request.retries < self.max_retries:
logger.info(f"[任务 {task_id}] 准备第 {self.request.retries + 1} 次重试...")
# 重试延迟指数增长
retry_delay = 60 * (2 ** self.request.retries)
logger.info(f"[任务 {task_id}] {retry_delay}秒后进行重试")
raise self.retry(exc=e, countdown=retry_delay)
else:
logger.error(f"[任务 {task_id}] 已达到最大重试次数 {self.max_retries},放弃重试")
# 返回错误信息而不是抛出异常
return {
'status': 'failed',
'task_id': task_id,
'error': error_msg,
'retries': self.request.retries,
'timestamp': timezone.now().isoformat()
}
@shared_task(bind=True)
def celery_send_html_report_email(self, include_attachment=False):
"""
Celery异步发送HTML报告邮件
模拟每日报告发送功能
"""
task_id = self.request.id if hasattr(self, 'request') else 'unknown'
logger.info(f"[任务 {task_id}] 开始执行HTML报告邮件发送任务")
try:
# 从数据库获取邮件配置
from core.models import SystemConfig
config = SystemConfig.get_config()
# 优先使用sender_email,其次使用smtp_username作为发件人
from_email = config.sender_email or config.smtp_username
if not from_email:
raise ValueError("未配置发件邮箱")
# 验证发件人邮箱格式
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, from_email):
raise ValueError(f"发件人邮箱格式不正确: {from_email}")
to_email = config.recipient_email or from_email
# 验证收件人邮箱格式
if not re.match(email_pattern, to_email):
raise ValueError(f"收件人邮箱格式不正确: {to_email}")
if isinstance(to_email, list):
recipient_list = to_email
else:
recipient_list = [to_email]
# 获取SMTP配置
host = config.smtp_server or 'localhost'
port = config.smtp_port or 587
username = config.smtp_username or ''
password = config.smtp_password or ''
use_tls = True # 默认使用TLS
# 准备报告数据
today = timezone.now().date()
yesterday = today - timedelta(days=1)
report_data = {
'today': today.strftime('%Y年%m月%d日'),
'yesterday': yesterday.strftime('%Y年%m月%d日'),
'tasks': [],
'reading': {'count': 0, 'items': []},
'insights': {'count': 0, 'items': []},
}
# 创建HTML邮件内容(使用普通字符串拼接,避免f-string中的%语法问题)
html_content = '''
✅ 邮件发送测试
这是一封通过 Celery异步任务 发送的HTML格式邮件。
任务ID: {task_id}
发送时间: {send_time}
状态: 发送成功
📈 统计信息
阅读记录: {reading_count} 篇
感悟记录: {insights_count} 条
家庭事项: {tasks_count} 项
📋 说明
此邮件由Celery异步任务自动发送。
如需取消订阅,请联系系统管理员。
'''.format(
today=report_data['today'],
task_id=task_id,
send_time=timezone.now().strftime('%Y-%m-%d %H:%M:%S'),
reading_count=report_data['reading']['count'],
insights_count=report_data['insights']['count'],
tasks_count=len(report_data['tasks'])
)
# 创建邮件后端
backend = EmailBackend(
host=host,
port=port,
username=username,
password=password,
use_tls=use_tls,
fail_silently=False
)
# 创建邮件
subject = f"[Celery] 家庭日报 {report_data['today']} - 测试报告"
email = EmailMultiAlternatives(
subject=subject,
body=html_content,
from_email=from_email,
to=recipient_list,
connection=backend
)
email.attach_alternative(html_content, "text/html")
email.encoding = 'utf-8'
# 添加附件(如果需要)
if include_attachment:
attachment_content = '''家庭日报测试报告
发送时间: {send_time}
任务ID: {task_id}
发送方式: Celery异步任务
这是一份自动生成的测试报告
'''.format(
send_time=timezone.now().isoformat(),
task_id=task_id
)
email.attach('daily_report_test.txt', attachment_content, 'text/plain')
logger.info(f"[任务 {task_id}] 已添加测试附件")
# 发送邮件
logger.info(f"[任务 {task_id}] 正在发送HTML报告邮件...")
sent_count = email.send(fail_silently=False)
if sent_count > 0:
logger.success(f"[任务 {task_id}] HTML报告邮件发送成功!")
return {
'status': 'success',
'task_id': task_id,
'type': 'html_report',
'sent_to': len(recipient_list),
'has_attachment': include_attachment,
'timestamp': timezone.now().isoformat()
}
else:
raise Exception("邮件发送返回0")
except Exception as e:
error_msg = str(e)
error_traceback = traceback.format_exc()
logger.error(f"[任务 {task_id}] HTML报告邮件发送失败: {error_msg}")
logger.error(f"[任务 {task_id}] 错误详情:\n{error_traceback}")
return {
'status': 'failed',
'task_id': task_id,
'type': 'html_report',
'error': error_msg,
'timestamp': timezone.now().isoformat()
}
@shared_task(bind=True, max_retries=2, default_retry_delay=30, time_limit=600, soft_time_limit=500)
def celery_send_pdf_report_email(self):
"""
Celery异步生成PDF报告并发送邮件
1. 检查WeasyPrint是否可用
2. 生成PDF报告
3. 发送包含PDF附件的邮件
"""
task_id = self.request.id if hasattr(self, 'request') else 'unknown'
logger.info(f"[任务 {task_id}] 开始执行PDF报告邮件发送任务")
logger.info(f"[任务 {task_id}] 重试次数: {self.request.retries if hasattr(self, 'request') else 0}")
try:
# 检查WeasyPrint是否可用
weasyprint_available = False
try:
from weasyprint import HTML
weasyprint_available = True
logger.info(f"[任务 {task_id}] ✅ WeasyPrint库导入成功")
except ImportError as e:
logger.error(f"[任务 {task_id}] ❌ WeasyPrint库无法导入: {e}")
raise ValueError("WeasyPrint库无法导入,PDF功能将不可用")
# 从数据库获取邮件配置
from core.models import SystemConfig, ReadingRecord, InsightRecord, TodayPlan, FamilyTask
from django.conf import settings
from django.template.loader import render_to_string
config = SystemConfig.get_config()
logger.info(f"[任务 {task_id}] ✅ 成功获取系统配置")
# 优先使用sender_email,其次使用smtp_username作为发件人
from_email = config.sender_email or config.smtp_username
if not from_email:
logger.error(f"[任务 {task_id}] ❌ 未配置发件邮箱")
raise ValueError("未配置发件邮箱")
# 验证发件人邮箱格式
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, from_email):
logger.error(f"[任务 {task_id}] ❌ 发件人邮箱格式不正确: {from_email}")
raise ValueError(f"发件人邮箱格式不正确: {from_email}")
to_email = config.recipient_email or from_email
# 验证收件人邮箱格式
if not re.match(email_pattern, to_email):
logger.error(f"[任务 {task_id}] ❌ 收件人邮箱格式不正确: {to_email}")
raise ValueError(f"收件人邮箱格式不正确: {to_email}")
recipient_list = [to_email] if isinstance(to_email, str) else to_email
logger.info(f"[任务 {task_id}] ✅ 邮箱配置验证通过,发件人: {from_email}, 收件人: {recipient_list}")
# 获取SMTP配置
host = config.smtp_server or 'localhost'
port = config.smtp_port or 587
username = config.smtp_username or ''
password = config.smtp_password or ''
use_tls = True # 默认使用TLS
logger.info(f"[任务 {task_id}] ✅ SMTP配置: 服务器={host}, 端口={port}, 用户名={username}")
# 准备报告数据
today = timezone.now().date()
yesterday = today - timedelta(days=1)
logger.info(f"[任务 {task_id}] ✅ 报告日期: {today}, 昨日: {yesterday}")
# 获取昨日记录
logger.info(f"[任务 {task_id}] 📊 获取昨日阅读记录...")
yesterday_reading = ReadingRecord.objects.filter(date=yesterday)
logger.info(f"[任务 {task_id}] ✅ 昨日阅读记录: {yesterday_reading.count()} 条")
logger.info(f"[任务 {task_id}] 📊 获取昨日感悟记录...")
yesterday_insight = InsightRecord.objects.filter(date=yesterday)
logger.info(f"[任务 {task_id}] ✅ 昨日感悟记录: {yesterday_insight.count()} 条")
# 获取今日计划
logger.info(f"[任务 {task_id}] 📋 获取今日计划...")
today_plan = TodayPlan.objects.filter(date=today)
logger.info(f"[任务 {task_id}] ✅ 今日计划: {today_plan.count()} 项")
# 获取家庭事项统计
logger.info(f"[任务 {task_id}] 📊 获取家庭事项统计...")
from django.db.models import Count
family_task_stats = FamilyTask.objects.values('type').annotate(count=Count('id'))
logger.info(f"[任务 {task_id}] ✅ 家庭事项统计: {len(family_task_stats)} 个类型")
# 准备上下文数据
context = {
'today': today,
'yesterday': yesterday,
'yesterday_reading': yesterday_reading,
'yesterday_insight': yesterday_insight,
'today_plan': today_plan,
'family_task_stats': family_task_stats,
}
# 渲染HTML模板
logger.info(f"[任务 {task_id}] 🎨 渲染HTML模板...")
html_string = render_to_string('core/report_pdf.html', context)
logger.info(f"[任务 {task_id}] ✅ HTML模板渲染成功")
# 生成PDF
today_str = today.strftime('%Y-%m-%d')
pdf_file = f"report_{today_str}.pdf"
pdf_path = os.path.join(settings.REPORTS_ROOT, pdf_file)
logger.info(f"[任务 {task_id}] 📄 准备生成PDF,保存路径: {pdf_path}")
# 确保报告目录存在
os.makedirs(settings.REPORTS_ROOT, exist_ok=True)
logger.info(f"[任务 {task_id}] ✅ 报告目录已准备好")
# 使用WeasyPrint生成PDF,设置超时
logger.info(f"[任务 {task_id}] 📄 正在生成PDF...")
HTML(string=html_string).write_pdf(pdf_path, timeout=60) # 设置60秒超时
logger.info(f"[任务 {task_id}] ✅ PDF报告生成成功: {pdf_path}")
# 检查PDF文件大小
file_size = 0
if os.path.exists(pdf_path):
file_size = os.path.getsize(pdf_path) / (1024 * 1024) # MB
logger.info(f"[任务 {task_id}] 📄 PDF文件大小: {file_size:.2f} MB")
# 创建邮件后端,设置超时
logger.info(f"[任务 {task_id}] 📧 正在创建SMTP连接...")
backend = EmailBackend(
host=host,
port=port,
username=username,
password=password,
use_tls=use_tls,
fail_silently=False,
timeout=30, # 设置30秒连接超时
ssl_certfile=None,
ssl_keyfile=None
)
logger.info(f"[任务 {task_id}] ✅ SMTP连接创建成功")
# 创建邮件
subject = f"[Celery] 家庭日报 {today_str} - PDF报告"
body = f"""
这是一封包含PDF报告的邮件。
报告日期: {today_str}
发送时间: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}
任务ID: {task_id}
报告包含以下内容:
- 昨日阅读记录 ({yesterday_reading.count()} 条)
- 昨日感悟记录 ({yesterday_insight.count()} 条)
- 今日计划 ({today_plan.count()} 项)
- 家庭事项统计
请查看附件获取完整报告。
---
家庭日报系统
自动发送(Celery异步任务)
"""
email = EmailMessage(
subject=subject,
body=body,
from_email=from_email,
to=recipient_list,
connection=backend
)
email.content_subtype = 'plain'
email.encoding = 'utf-8'
logger.info(f"[任务 {task_id}] ✅ 邮件对象创建成功")
# 添加PDF附件
logger.info(f"[任务 {task_id}] 📎 正在添加PDF附件...")
with open(pdf_path, 'rb') as f:
email.attach(pdf_file, f.read(), 'application/pdf')
logger.info(f"[任务 {task_id}] ✅ PDF附件添加成功")
# 发送邮件
logger.info(f"[任务 {task_id}] 📧 正在发送邮件...")
logger.info(f"[任务 {task_id}] 发件人: {from_email}")
logger.info(f"[任务 {task_id}] 收件人: {recipient_list}")
logger.info(f"[任务 {task_id}] 主题: {subject}")
sent_count = email.send(fail_silently=False)
logger.info(f"[任务 {task_id}] 📧 邮件发送返回结果: {sent_count}")
if sent_count > 0:
logger.success(f"[任务 {task_id}] ✅ 包含PDF附件的邮件发送成功!")
# 清理生成的PDF文件
if os.path.exists(pdf_path):
os.remove(pdf_path)
logger.info(f"[任务 {task_id}] 🧹 已清理生成的PDF文件: {pdf_path}")
return {
'status': 'success',
'task_id': task_id,
'type': 'pdf_report',
'sent_to': len(recipient_list),
'pdf_file': pdf_file,
'pdf_size': file_size,
'timestamp': timezone.now().isoformat()
}
else:
logger.error(f"[任务 {task_id}] ❌ 邮件发送返回0")
raise Exception("邮件发送返回0")
except Exception as e:
error_msg = str(e)
error_traceback = traceback.format_exc()
logger.error(f"[任务 {task_id}] ❌ PDF报告邮件发送失败: {error_msg}")
logger.error(f"[任务 {task_id}] 📋 错误详情:\n{error_traceback}")
# 检查是否应该重试
if hasattr(self, 'request') and self.request.retries < self.max_retries:
logger.info(f"[任务 {task_id}] 🔄 准备第 {self.request.retries + 1} 次重试...")
# 重试延迟指数增长
retry_delay = 30 * (2 ** self.request.retries)
logger.info(f"[任务 {task_id}] ⏱️ {retry_delay}秒后进行重试")
raise self.retry(exc=e, countdown=retry_delay)
else:
logger.error(f"[任务 {task_id}] 💥 已达到最大重试次数 {self.max_retries},放弃重试")
# 返回错误信息而不是抛出异常
return {
'status': 'failed',
'task_id': task_id,
'type': 'pdf_report',
'error': error_msg,
'error_detail': error_traceback[:500] if error_traceback else '无',
'retries': self.request.retries if hasattr(self, 'request') else 0,
'timestamp': timezone.now().isoformat()
}
@shared_task(bind=True)
def cleanup_expired_temp_files(self):
"""
清理过期的临时文件
每小时执行一次,删除已过期的临时文件及其物理文件
"""
task_id = self.request.id if hasattr(self, 'request') else 'unknown'
logger.info(f"[任务 {task_id}] 开始执行清理过期临时文件任务")
try:
from core.models import PublicContent
expired_files = PublicContent.objects.filter(
is_temp_file=True,
expire_at__lte=timezone.now()
)
deleted_count = 0
for temp_file in expired_files:
try:
if temp_file.file:
file_path = temp_file.file.path
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f"[任务 {task_id}] 已删除物理文件: {file_path}")
file_title = temp_file.title
temp_file.delete()
deleted_count += 1
logger.info(f"[任务 {task_id}] 已删除临时文件记录: {file_title}")
except Exception as e:
logger.error(f"[任务 {task_id}] 删除临时文件失败: {str(e)}")
logger.success(f"[任务 {task_id}] 清理完成,共删除 {deleted_count} 个过期临时文件")
return {
'status': 'success',
'task_id': task_id,
'deleted_count': deleted_count,
'timestamp': timezone.now().isoformat()
}
except Exception as e:
error_msg = str(e)
error_traceback = traceback.format_exc()
logger.error(f"[任务 {task_id}] 清理过期临时文件失败: {error_msg}")
logger.error(f"[任务 {task_id}] 错误详情:\n{error_traceback}")
return {
'status': 'failed',
'task_id': task_id,
'error': error_msg,
'timestamp': timezone.now().isoformat()
}
@shared_task(bind=True)
def cleanup_expired_messages(self):
"""
清理过期的临时留言
每5分钟执行一次,删除已过期的留言
"""
task_id = self.request.id if hasattr(self, 'request') else 'unknown'
logger.info(f"[任务 {task_id}] 开始执行清理过期留言任务")
try:
from core.models import TempMessage
expired_messages = TempMessage.objects.filter(expire_at__lte=timezone.now())
deleted_count = 0
for message in expired_messages:
try:
msg_content = message.content[:20]
message.delete()
deleted_count += 1
logger.info(f"[任务 {task_id}] 已删除过期留言: {msg_content}...")
except Exception as e:
logger.error(f"[任务 {task_id}] 删除留言失败: {str(e)}")
logger.success(f"[任务 {task_id}] 清理完成,共删除 {deleted_count} 条过期留言")
return {
'status': 'success',
'task_id': task_id,
'deleted_count': deleted_count,
'timestamp': timezone.now().isoformat()
}
except Exception as e:
error_msg = str(e)
error_traceback = traceback.format_exc()
logger.error(f"[任务 {task_id}] 清理过期留言失败: {error_msg}")
logger.error(f"[任务 {task_id}] 错误详情:\n{error_traceback}")
return {
'status': 'failed',
'task_id': task_id,
'error': error_msg,
'timestamp': timezone.now().isoformat()
}