#!/usr/bin/env python """ Celery任务测试脚本 用于验证Celery任务是否可以正常运行 """ import os import sys from loguru import logger # 添加项目路径到Python路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) # 初始化Django环境 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings') import django django.setup() # 导入Celery应用和任务 from diary_family.celery import app from core.tasks import generate_daily_pdf_report def test_celery_connection(): """测试Celery连接是否正常""" logger.info("开始测试Celery连接...") try: # 测试Celery是否可以正常连接 result = app.control.ping(timeout=5) logger.info(f"Celery连接测试结果: {result}") return True except Exception as e: logger.error(f"Celery连接测试失败: {e}") return False def test_task_execution(): """测试任务执行是否正常""" logger.info("开始测试Celery任务执行...") try: # 直接调用任务函数(同步执行,用于测试) result = generate_daily_pdf_report() logger.info(f"任务执行结果: {result}") return result except Exception as e: logger.error(f"任务执行失败: {e}") return False def test_task_async_execution(): """测试异步任务执行是否正常""" logger.info("开始测试Celery异步任务执行...") try: # 异步执行任务 task = generate_daily_pdf_report.delay() logger.info(f"异步任务ID: {task.id}") # 等待任务完成(最多等待30秒) result = task.get(timeout=30) logger.info(f"异步任务执行结果: {result}") return result except Exception as e: logger.error(f"异步任务执行失败: {e}") return False if __name__ == "__main__": logger.info("=== Celery任务测试开始 ===") # 测试连接 connection_ok = test_celery_connection() # 测试同步任务执行 sync_ok = test_task_execution() # 测试异步任务执行 async_ok = test_task_async_execution() logger.info("=== Celery任务测试结束 ===") logger.info(f"测试结果: 连接={connection_ok}, 同步执行={sync_ok}, 异步执行={async_ok}") if connection_ok and sync_ok and async_ok: logger.success("所有测试通过!Celery任务可以正常运行。") sys.exit(0) else: logger.error("测试失败!请检查Celery配置和任务代码。") sys.exit(1)