Files
diary-family/test_celery.py

84 lines
2.5 KiB
Python
Raw Normal View History

#!/usr/bin/env python
"""
Celery任务测试脚本
用于验证Celery任务是否可以正常运行
"""
import os
import sys
from loguru import logger
# 添加项目路径到Python路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
# 导入Celery应用和任务
from diary_family.celery import app
from core.tasks import generate_daily_pdf_report
def test_celery_connection():
"""测试Celery连接是否正常"""
logger.info("开始测试Celery连接...")
try:
# 测试Celery是否可以正常连接
result = app.control.ping(timeout=5)
logger.info(f"Celery连接测试结果: {result}")
return True
except Exception as e:
logger.error(f"Celery连接测试失败: {e}")
return False
def test_task_execution():
"""测试任务执行是否正常"""
logger.info("开始测试Celery任务执行...")
try:
# 直接调用任务函数(同步执行,用于测试)
result = generate_daily_pdf_report()
logger.info(f"任务执行结果: {result}")
return result
except Exception as e:
logger.error(f"任务执行失败: {e}")
return False
def test_task_async_execution():
"""测试异步任务执行是否正常"""
logger.info("开始测试Celery异步任务执行...")
try:
# 异步执行任务
task = generate_daily_pdf_report.delay()
logger.info(f"异步任务ID: {task.id}")
# 等待任务完成最多等待30秒
result = task.get(timeout=30)
logger.info(f"异步任务执行结果: {result}")
return result
except Exception as e:
logger.error(f"异步任务执行失败: {e}")
return False
if __name__ == "__main__":
logger.info("=== Celery任务测试开始 ===")
# 测试连接
connection_ok = test_celery_connection()
# 测试同步任务执行
sync_ok = test_task_execution()
# 测试异步任务执行
async_ok = test_task_async_execution()
logger.info("=== Celery任务测试结束 ===")
logger.info(f"测试结果: 连接={connection_ok}, 同步执行={sync_ok}, 异步执行={async_ok}")
if connection_ok and sync_ok and async_ok:
logger.success("所有测试通过Celery任务可以正常运行。")
sys.exit(0)
else:
logger.error("测试失败请检查Celery配置和任务代码。")
sys.exit(1)