84 lines
2.5 KiB
Python
84 lines
2.5 KiB
Python
|
|
#!/usr/bin/env python
|
|||
|
|
"""
|
|||
|
|
Celery任务测试脚本
|
|||
|
|
用于验证Celery任务是否可以正常运行
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import sys
|
|||
|
|
from loguru import logger
|
|||
|
|
|
|||
|
|
# 添加项目路径到Python路径
|
|||
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|||
|
|
|
|||
|
|
# 初始化Django环境
|
|||
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
|
|||
|
|
|
|||
|
|
import django
|
|||
|
|
django.setup()
|
|||
|
|
|
|||
|
|
# 导入Celery应用和任务
|
|||
|
|
from diary_family.celery import app
|
|||
|
|
from core.tasks import generate_daily_pdf_report
|
|||
|
|
|
|||
|
|
def test_celery_connection():
|
|||
|
|
"""测试Celery连接是否正常"""
|
|||
|
|
logger.info("开始测试Celery连接...")
|
|||
|
|
try:
|
|||
|
|
# 测试Celery是否可以正常连接
|
|||
|
|
result = app.control.ping(timeout=5)
|
|||
|
|
logger.info(f"Celery连接测试结果: {result}")
|
|||
|
|
return True
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"Celery连接测试失败: {e}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def test_task_execution():
|
|||
|
|
"""测试任务执行是否正常"""
|
|||
|
|
logger.info("开始测试Celery任务执行...")
|
|||
|
|
try:
|
|||
|
|
# 直接调用任务函数(同步执行,用于测试)
|
|||
|
|
result = generate_daily_pdf_report()
|
|||
|
|
logger.info(f"任务执行结果: {result}")
|
|||
|
|
return result
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"任务执行失败: {e}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def test_task_async_execution():
|
|||
|
|
"""测试异步任务执行是否正常"""
|
|||
|
|
logger.info("开始测试Celery异步任务执行...")
|
|||
|
|
try:
|
|||
|
|
# 异步执行任务
|
|||
|
|
task = generate_daily_pdf_report.delay()
|
|||
|
|
logger.info(f"异步任务ID: {task.id}")
|
|||
|
|
|
|||
|
|
# 等待任务完成(最多等待30秒)
|
|||
|
|
result = task.get(timeout=30)
|
|||
|
|
logger.info(f"异步任务执行结果: {result}")
|
|||
|
|
return result
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"异步任务执行失败: {e}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
logger.info("=== Celery任务测试开始 ===")
|
|||
|
|
|
|||
|
|
# 测试连接
|
|||
|
|
connection_ok = test_celery_connection()
|
|||
|
|
|
|||
|
|
# 测试同步任务执行
|
|||
|
|
sync_ok = test_task_execution()
|
|||
|
|
|
|||
|
|
# 测试异步任务执行
|
|||
|
|
async_ok = test_task_async_execution()
|
|||
|
|
|
|||
|
|
logger.info("=== Celery任务测试结束 ===")
|
|||
|
|
logger.info(f"测试结果: 连接={connection_ok}, 同步执行={sync_ok}, 异步执行={async_ok}")
|
|||
|
|
|
|||
|
|
if connection_ok and sync_ok and async_ok:
|
|||
|
|
logger.success("所有测试通过!Celery任务可以正常运行。")
|
|||
|
|
sys.exit(0)
|
|||
|
|
else:
|
|||
|
|
logger.error("测试失败!请检查Celery配置和任务代码。")
|
|||
|
|
sys.exit(1)
|