Files
diary-family/test_redis_celery.py
xiaji 4200249398 feat(test): 添加Celery和Redis集成测试脚本
添加两个测试脚本用于验证生产环境配置:
1. test_celery.py - 测试Celery任务执行和连接
2. test_redis_celery.py - 测试Redis连接和Celery集成

同时更新.gitignore以排除其他测试文件但保留这两个测试脚本
2026-01-17 20:56:38 +08:00

337 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
"""
Redis和Celery连接测试脚本
用于验证生产环境Redis和Celery配置是否正确
"""
import os
import sys
import time
from loguru import logger
# 添加项目路径到Python路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
def test_redis_connection():
"""测试Redis连接是否正常"""
logger.info("开始测试Redis连接...")
try:
import redis
# 从Django设置中获取Redis配置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from django.conf import settings
# 解析Redis URL
redis_url = settings.CELERY_BROKER_URL
logger.info(f"Redis配置URL: {redis_url}")
# 提取连接参数
if redis_url.startswith('redis://'):
# 移除redis://前缀
redis_url = redis_url[8:]
# 检查是否有密码
if '@' in redis_url:
# 格式: password@host:port/db
password_part, host_part = redis_url.split('@')
password = password_part
host_port_db = host_part
else:
# 格式: host:port/db
password = None
host_port_db = redis_url
# 解析主机、端口和数据库
if ':' in host_port_db:
host_port, db_str = host_port_db.split('/')
host, port = host_port.split(':')
db = int(db_str) if db_str else 0
else:
host = 'localhost'
port = 6379
db = 0
port = int(port)
else:
# 默认配置
host = 'localhost'
port = 6379
db = 0
password = None
logger.info(f"Redis连接参数: host={host}, port={port}, db={db}, password={'***' if password else 'None'}")
# 测试连接
client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
socket_connect_timeout=5,
socket_timeout=5
)
# 测试ping
response = client.ping()
logger.info(f"Redis ping响应: {response}")
# 测试基本操作
test_key = f"test_{int(time.time())}"
client.set(test_key, "test_value", ex=10) # 10秒后过期
value = client.get(test_key)
logger.info(f"Redis set/get测试: key={test_key}, value={value}")
# 删除测试键
client.delete(test_key)
# 获取Redis信息
info = client.info()
logger.info(f"Redis版本: {info.get('redis_version', '未知')}")
logger.info(f"已用内存: {info.get('used_memory_human', '未知')}")
logger.info(f"连接数: {info.get('connected_clients', '未知')}")
client.close()
logger.success("Redis连接测试通过")
return True
except Exception as e:
logger.error(f"Redis连接测试失败: {e}")
logger.error("请检查:")
logger.error("1. Redis服务是否运行: sudo systemctl status redis-server")
logger.error("2. Redis配置是否正确: /etc/redis/redis.conf")
logger.error("3. 防火墙是否允许连接: sudo ufw status")
logger.error("4. Django settings.py中的CELERY_BROKER_URL配置")
return False
def test_celery_redis_integration():
"""测试Celery与Redis集成"""
logger.info("开始测试Celery与Redis集成...")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from diary_family.celery import app
# 测试Celery连接
logger.info("测试Celery连接...")
try:
result = app.control.ping(timeout=10)
logger.info(f"Celery连接测试结果: {result}")
except Exception as e:
logger.warning(f"Celery连接测试失败可能是worker未运行: {e}")
logger.info("这可能是正常的如果只测试Redis连接的话")
# 测试任务队列
logger.info("测试任务队列...")
try:
# 获取worker状态
stats = app.control.inspect().stats()
if stats:
logger.info(f"找到 {len(stats)} 个worker")
for worker, info in stats.items():
logger.info(f"Worker {worker}: {info.get('pool', {}).get('max-concurrency', '未知')} 并发")
else:
logger.warning("未找到运行的worker请启动Celery worker")
except Exception as e:
logger.warning(f"获取worker状态失败: {e}")
# 测试发送简单任务
logger.info("测试发送简单任务到Redis队列...")
try:
from core.tasks import debug_task
# 发送调试任务
task_result = debug_task.delay()
logger.info(f"调试任务已发送任务ID: {task_result.id}")
# 等待任务完成最多5秒
try:
result = task_result.get(timeout=5)
logger.info(f"调试任务执行结果: {result}")
except Exception as e:
logger.warning(f"获取任务结果超时或失败: {e}")
logger.info("这可能是正常的如果没有worker处理任务")
except Exception as e:
logger.error(f"发送任务失败: {e}")
return False
logger.success("Celery与Redis集成测试通过")
return True
except Exception as e:
logger.error(f"Celery与Redis集成测试失败: {e}")
return False
def test_redis_performance():
"""测试Redis性能"""
logger.info("开始测试Redis性能...")
try:
import redis
import time
# 从Django设置中获取Redis配置
from django.conf import settings
redis_url = settings.CELERY_BROKER_URL
if redis_url.startswith('redis://'):
redis_url = redis_url[8:]
if '@' in redis_url:
password_part, host_part = redis_url.split('@')
password = password_part
host_port_db = host_part
else:
password = None
host_port_db = redis_url
if ':' in host_port_db:
host_port, db_str = host_port_db.split('/')
host, port = host_port.split(':')
db = int(db_str) if db_str else 0
else:
host = 'localhost'
port = 6379
db = 0
port = int(port)
else:
host = 'localhost'
port = 6379
db = 0
password = None
client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
socket_connect_timeout=5,
socket_timeout=5
)
# 性能测试
test_count = 100
start_time = time.time()
for i in range(test_count):
key = f"perf_test_{i}"
client.set(key, f"value_{i}", ex=60)
set_time = time.time() - start_time
logger.info(f"设置 {test_count} 个键耗时: {set_time:.3f}秒, 平均: {set_time/test_count*1000:.2f}毫秒/个")
start_time = time.time()
for i in range(test_count):
key = f"perf_test_{i}"
client.get(key)
get_time = time.time() - start_time
logger.info(f"获取 {test_count} 个键耗时: {get_time:.3f}秒, 平均: {get_time/test_count*1000:.2f}毫秒/个")
# 清理测试数据
for i in range(test_count):
key = f"perf_test_{i}"
client.delete(key)
client.close()
# 性能评估
avg_set_time = set_time / test_count * 1000 # 毫秒
avg_get_time = get_time / test_count * 1000 # 毫秒
if avg_set_time < 1 and avg_get_time < 1:
logger.success("Redis性能优秀")
elif avg_set_time < 5 and avg_get_time < 5:
logger.info("Redis性能良好")
elif avg_set_time < 10 and avg_get_time < 10:
logger.warning("Redis性能一般建议优化")
else:
logger.error("Redis性能较差请检查服务器负载和Redis配置")
return True
except Exception as e:
logger.error(f"Redis性能测试失败: {e}")
return False
def main():
"""主测试函数"""
logger.info("=== Redis和Celery生产环境配置测试开始 ===")
logger.info("测试环境: Ubuntu + Redis + Celery")
logger.info("=" * 50)
tests_passed = 0
total_tests = 3
# 测试1: Redis连接
logger.info("\n[测试1] Redis连接测试")
if test_redis_connection():
tests_passed += 1
else:
logger.error("Redis连接测试失败后续测试可能也会失败")
# 测试2: Celery与Redis集成
logger.info("\n[测试2] Celery与Redis集成测试")
if test_celery_redis_integration():
tests_passed += 1
# 测试3: Redis性能
logger.info("\n[测试3] Redis性能测试")
if test_redis_performance():
tests_passed += 1
# 测试总结
logger.info("\n" + "=" * 50)
logger.info("测试总结:")
logger.info(f"通过测试: {tests_passed}/{total_tests}")
if tests_passed == total_tests:
logger.success("所有测试通过生产环境Redis和Celery配置正确。")
logger.info("\n部署建议:")
logger.info("1. ✅ Redis服务运行正常")
logger.info("2. ✅ Celery可以连接到Redis")
logger.info("3. ✅ Redis性能满足要求")
logger.info("4. 建议配置Redis持久化和备份")
logger.info("5. 建议监控Redis内存使用情况")
return 0
elif tests_passed >= 2:
logger.warning("部分测试通过,生产环境基本可用。")
logger.info("\n需要检查:")
logger.info("1. 确保Redis服务正常运行")
logger.info("2. 检查Celery worker配置")
logger.info("3. 参考README中的故障排除指南")
return 1
else:
logger.error("多数测试失败,生产环境可能无法正常工作。")
logger.info("\n紧急处理:")
logger.info("1. ❌ 检查Redis服务状态: sudo systemctl status redis-server")
logger.info("2. ❌ 检查Redis配置: /etc/redis/redis.conf")
logger.info("3. ❌ 检查Django settings.py中的Celery配置")
logger.info("4. 参考README中的Redis部署章节重新配置")
return 1
if __name__ == "__main__":
# 配置日志
logger.remove()
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="INFO"
)
try:
exit_code = main()
sys.exit(exit_code)
except KeyboardInterrupt:
logger.warning("测试被用户中断")
sys.exit(1)
except Exception as e:
logger.error(f"测试过程中发生未预期错误: {e}")
sys.exit(1)