From 42002493981c0a0f3b99423c86738aaa64792d41 Mon Sep 17 00:00:00 2001 From: xiaji Date: Sat, 17 Jan 2026 20:56:38 +0800 Subject: [PATCH] =?UTF-8?q?feat(test):=20=E6=B7=BB=E5=8A=A0Celery=E5=92=8C?= =?UTF-8?q?Redis=E9=9B=86=E6=88=90=E6=B5=8B=E8=AF=95=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 添加两个测试脚本用于验证生产环境配置: 1. test_celery.py - 测试Celery任务执行和连接 2. test_redis_celery.py - 测试Redis连接和Celery集成 同时更新.gitignore以排除其他测试文件但保留这两个测试脚本 --- .gitignore | 4 +- test_celery.py | 84 +++++++++++ test_redis_celery.py | 337 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 424 insertions(+), 1 deletion(-) create mode 100644 test_celery.py create mode 100644 test_redis_celery.py diff --git a/.gitignore b/.gitignore index 94323d1..1517683 100644 --- a/.gitignore +++ b/.gitignore @@ -90,5 +90,7 @@ supervisor.conf *.log # 测试文件 -test*.py* +test*.py *test.py +!test_redis_celery.py +!test_celery.py diff --git a/test_celery.py b/test_celery.py new file mode 100644 index 0000000..b47d4b7 --- /dev/null +++ b/test_celery.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python +""" +Celery任务测试脚本 +用于验证Celery任务是否可以正常运行 +""" + +import os +import sys +from loguru import logger + +# 添加项目路径到Python路径 +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +# 初始化Django环境 +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings') + +import django +django.setup() + +# 导入Celery应用和任务 +from diary_family.celery import app +from core.tasks import generate_daily_pdf_report + +def test_celery_connection(): + """测试Celery连接是否正常""" + logger.info("开始测试Celery连接...") + try: + # 测试Celery是否可以正常连接 + result = app.control.ping(timeout=5) + logger.info(f"Celery连接测试结果: {result}") + return True + except Exception as e: + logger.error(f"Celery连接测试失败: {e}") + return False + +def test_task_execution(): + """测试任务执行是否正常""" + logger.info("开始测试Celery任务执行...") + try: + # 直接调用任务函数(同步执行,用于测试) + result = generate_daily_pdf_report() + logger.info(f"任务执行结果: {result}") + return result + except Exception as e: + logger.error(f"任务执行失败: {e}") + return False + +def test_task_async_execution(): + """测试异步任务执行是否正常""" + logger.info("开始测试Celery异步任务执行...") + try: + # 异步执行任务 + task = generate_daily_pdf_report.delay() + logger.info(f"异步任务ID: {task.id}") + + # 等待任务完成(最多等待30秒) + result = task.get(timeout=30) + logger.info(f"异步任务执行结果: {result}") + return result + except Exception as e: + logger.error(f"异步任务执行失败: {e}") + return False + +if __name__ == "__main__": + logger.info("=== Celery任务测试开始 ===") + + # 测试连接 + connection_ok = test_celery_connection() + + # 测试同步任务执行 + sync_ok = test_task_execution() + + # 测试异步任务执行 + async_ok = test_task_async_execution() + + logger.info("=== Celery任务测试结束 ===") + logger.info(f"测试结果: 连接={connection_ok}, 同步执行={sync_ok}, 异步执行={async_ok}") + + if connection_ok and sync_ok and async_ok: + logger.success("所有测试通过!Celery任务可以正常运行。") + sys.exit(0) + else: + logger.error("测试失败!请检查Celery配置和任务代码。") + sys.exit(1) \ No newline at end of file diff --git a/test_redis_celery.py b/test_redis_celery.py new file mode 100644 index 0000000..b3cb94a --- /dev/null +++ b/test_redis_celery.py @@ -0,0 +1,337 @@ +#!/usr/bin/env python +""" +Redis和Celery连接测试脚本 +用于验证生产环境Redis和Celery配置是否正确 +""" + +import os +import sys +import time +from loguru import logger + +# 添加项目路径到Python路径 +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +def test_redis_connection(): + """测试Redis连接是否正常""" + logger.info("开始测试Redis连接...") + try: + import redis + + # 从Django设置中获取Redis配置 + os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings') + import django + django.setup() + + from django.conf import settings + + # 解析Redis URL + redis_url = settings.CELERY_BROKER_URL + logger.info(f"Redis配置URL: {redis_url}") + + # 提取连接参数 + if redis_url.startswith('redis://'): + # 移除redis://前缀 + redis_url = redis_url[8:] + + # 检查是否有密码 + if '@' in redis_url: + # 格式: password@host:port/db + password_part, host_part = redis_url.split('@') + password = password_part + host_port_db = host_part + else: + # 格式: host:port/db + password = None + host_port_db = redis_url + + # 解析主机、端口和数据库 + if ':' in host_port_db: + host_port, db_str = host_port_db.split('/') + host, port = host_port.split(':') + db = int(db_str) if db_str else 0 + else: + host = 'localhost' + port = 6379 + db = 0 + + port = int(port) + else: + # 默认配置 + host = 'localhost' + port = 6379 + db = 0 + password = None + + logger.info(f"Redis连接参数: host={host}, port={port}, db={db}, password={'***' if password else 'None'}") + + # 测试连接 + client = redis.Redis( + host=host, + port=port, + db=db, + password=password, + socket_connect_timeout=5, + socket_timeout=5 + ) + + # 测试ping + response = client.ping() + logger.info(f"Redis ping响应: {response}") + + # 测试基本操作 + test_key = f"test_{int(time.time())}" + client.set(test_key, "test_value", ex=10) # 10秒后过期 + value = client.get(test_key) + logger.info(f"Redis set/get测试: key={test_key}, value={value}") + + # 删除测试键 + client.delete(test_key) + + # 获取Redis信息 + info = client.info() + logger.info(f"Redis版本: {info.get('redis_version', '未知')}") + logger.info(f"已用内存: {info.get('used_memory_human', '未知')}") + logger.info(f"连接数: {info.get('connected_clients', '未知')}") + + client.close() + logger.success("Redis连接测试通过!") + return True + + except Exception as e: + logger.error(f"Redis连接测试失败: {e}") + logger.error("请检查:") + logger.error("1. Redis服务是否运行: sudo systemctl status redis-server") + logger.error("2. Redis配置是否正确: /etc/redis/redis.conf") + logger.error("3. 防火墙是否允许连接: sudo ufw status") + logger.error("4. Django settings.py中的CELERY_BROKER_URL配置") + return False + +def test_celery_redis_integration(): + """测试Celery与Redis集成""" + logger.info("开始测试Celery与Redis集成...") + try: + # 初始化Django环境 + os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings') + import django + django.setup() + + from diary_family.celery import app + + # 测试Celery连接 + logger.info("测试Celery连接...") + try: + result = app.control.ping(timeout=10) + logger.info(f"Celery连接测试结果: {result}") + except Exception as e: + logger.warning(f"Celery连接测试失败(可能是worker未运行): {e}") + logger.info("这可能是正常的,如果只测试Redis连接的话") + + # 测试任务队列 + logger.info("测试任务队列...") + try: + # 获取worker状态 + stats = app.control.inspect().stats() + if stats: + logger.info(f"找到 {len(stats)} 个worker") + for worker, info in stats.items(): + logger.info(f"Worker {worker}: {info.get('pool', {}).get('max-concurrency', '未知')} 并发") + else: + logger.warning("未找到运行的worker,请启动Celery worker") + except Exception as e: + logger.warning(f"获取worker状态失败: {e}") + + # 测试发送简单任务 + logger.info("测试发送简单任务到Redis队列...") + try: + from core.tasks import debug_task + + # 发送调试任务 + task_result = debug_task.delay() + logger.info(f"调试任务已发送,任务ID: {task_result.id}") + + # 等待任务完成(最多5秒) + try: + result = task_result.get(timeout=5) + logger.info(f"调试任务执行结果: {result}") + except Exception as e: + logger.warning(f"获取任务结果超时或失败: {e}") + logger.info("这可能是正常的,如果没有worker处理任务") + + except Exception as e: + logger.error(f"发送任务失败: {e}") + return False + + logger.success("Celery与Redis集成测试通过!") + return True + + except Exception as e: + logger.error(f"Celery与Redis集成测试失败: {e}") + return False + +def test_redis_performance(): + """测试Redis性能""" + logger.info("开始测试Redis性能...") + try: + import redis + import time + + # 从Django设置中获取Redis配置 + from django.conf import settings + + redis_url = settings.CELERY_BROKER_URL + if redis_url.startswith('redis://'): + redis_url = redis_url[8:] + + if '@' in redis_url: + password_part, host_part = redis_url.split('@') + password = password_part + host_port_db = host_part + else: + password = None + host_port_db = redis_url + + if ':' in host_port_db: + host_port, db_str = host_port_db.split('/') + host, port = host_port.split(':') + db = int(db_str) if db_str else 0 + else: + host = 'localhost' + port = 6379 + db = 0 + + port = int(port) + else: + host = 'localhost' + port = 6379 + db = 0 + password = None + + client = redis.Redis( + host=host, + port=port, + db=db, + password=password, + socket_connect_timeout=5, + socket_timeout=5 + ) + + # 性能测试 + test_count = 100 + start_time = time.time() + + for i in range(test_count): + key = f"perf_test_{i}" + client.set(key, f"value_{i}", ex=60) + + set_time = time.time() - start_time + logger.info(f"设置 {test_count} 个键耗时: {set_time:.3f}秒, 平均: {set_time/test_count*1000:.2f}毫秒/个") + + start_time = time.time() + for i in range(test_count): + key = f"perf_test_{i}" + client.get(key) + + get_time = time.time() - start_time + logger.info(f"获取 {test_count} 个键耗时: {get_time:.3f}秒, 平均: {get_time/test_count*1000:.2f}毫秒/个") + + # 清理测试数据 + for i in range(test_count): + key = f"perf_test_{i}" + client.delete(key) + + client.close() + + # 性能评估 + avg_set_time = set_time / test_count * 1000 # 毫秒 + avg_get_time = get_time / test_count * 1000 # 毫秒 + + if avg_set_time < 1 and avg_get_time < 1: + logger.success("Redis性能优秀!") + elif avg_set_time < 5 and avg_get_time < 5: + logger.info("Redis性能良好") + elif avg_set_time < 10 and avg_get_time < 10: + logger.warning("Redis性能一般,建议优化") + else: + logger.error("Redis性能较差,请检查服务器负载和Redis配置") + + return True + + except Exception as e: + logger.error(f"Redis性能测试失败: {e}") + return False + +def main(): + """主测试函数""" + logger.info("=== Redis和Celery生产环境配置测试开始 ===") + logger.info("测试环境: Ubuntu + Redis + Celery") + logger.info("=" * 50) + + tests_passed = 0 + total_tests = 3 + + # 测试1: Redis连接 + logger.info("\n[测试1] Redis连接测试") + if test_redis_connection(): + tests_passed += 1 + else: + logger.error("Redis连接测试失败,后续测试可能也会失败") + + # 测试2: Celery与Redis集成 + logger.info("\n[测试2] Celery与Redis集成测试") + if test_celery_redis_integration(): + tests_passed += 1 + + # 测试3: Redis性能 + logger.info("\n[测试3] Redis性能测试") + if test_redis_performance(): + tests_passed += 1 + + # 测试总结 + logger.info("\n" + "=" * 50) + logger.info("测试总结:") + logger.info(f"通过测试: {tests_passed}/{total_tests}") + + if tests_passed == total_tests: + logger.success("所有测试通过!生产环境Redis和Celery配置正确。") + logger.info("\n部署建议:") + logger.info("1. ✅ Redis服务运行正常") + logger.info("2. ✅ Celery可以连接到Redis") + logger.info("3. ✅ Redis性能满足要求") + logger.info("4. 建议配置Redis持久化和备份") + logger.info("5. 建议监控Redis内存使用情况") + return 0 + elif tests_passed >= 2: + logger.warning("部分测试通过,生产环境基本可用。") + logger.info("\n需要检查:") + logger.info("1. 确保Redis服务正常运行") + logger.info("2. 检查Celery worker配置") + logger.info("3. 参考README中的故障排除指南") + return 1 + else: + logger.error("多数测试失败,生产环境可能无法正常工作。") + logger.info("\n紧急处理:") + logger.info("1. ❌ 检查Redis服务状态: sudo systemctl status redis-server") + logger.info("2. ❌ 检查Redis配置: /etc/redis/redis.conf") + logger.info("3. ❌ 检查Django settings.py中的Celery配置") + logger.info("4. 参考README中的Redis部署章节重新配置") + return 1 + +if __name__ == "__main__": + # 配置日志 + logger.remove() + logger.add( + sys.stdout, + format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", + level="INFO" + ) + + try: + exit_code = main() + sys.exit(exit_code) + except KeyboardInterrupt: + logger.warning("测试被用户中断") + sys.exit(1) + except Exception as e: + logger.error(f"测试过程中发生未预期错误: {e}") + sys.exit(1) \ No newline at end of file