Files
diary-family/test_redis_celery.py
xiaji 3776de8477 feat(日志): 添加测试脚本日志文件输出并更新日志文档
添加测试脚本日志文件输出功能,创建日志目录并设置权限。同时更新README.md文档,详细说明生产环境中各类日志的查看方法和常见问题解决方案。
2026-01-17 22:03:37 +08:00

406 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
"""
Redis和Celery连接测试脚本
用于验证生产环境Redis和Celery配置是否正确
"""
import os
import sys
import time
from loguru import logger
# 添加项目路径到Python路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
def test_redis_connection():
"""测试Redis连接是否正常"""
logger.info("开始测试Redis连接...")
try:
import redis
from urllib.parse import urlparse
# 从Django设置中获取Redis配置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from django.conf import settings
# 获取Redis配置URL
redis_url = settings.CELERY_BROKER_URL
logger.info(f"Redis配置URL: {redis_url}")
# 使用urllib解析URL更健壮的方法
parsed = urlparse(redis_url)
# 提取连接参数
host = parsed.hostname or 'localhost'
port = parsed.port or 6379
db = 0
# 解析数据库编号
if parsed.path and len(parsed.path) > 1:
try:
db = int(parsed.path[1:]) # 移除开头的'/'
except ValueError:
db = 0
# 解析密码注意密码可能在netloc中的username部分
password = None
if parsed.password:
# URL解码密码处理特殊字符
import urllib.parse
password = urllib.parse.unquote(parsed.password)
elif parsed.username:
# 有些配置可能把密码放在username位置
password = urllib.parse.unquote(parsed.username)
logger.info(f"Redis连接参数: host={host}, port={port}, db={db}, password={'***' if password else 'None'}")
# 测试连接
client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
socket_connect_timeout=5,
socket_timeout=5,
decode_responses=True
)
# 测试ping
response = client.ping()
logger.info(f"Redis ping响应: {response}")
# 测试基本操作
test_key = f"test_{int(time.time())}"
client.set(test_key, "test_value", ex=10) # 10秒后过期
value = client.get(test_key)
logger.info(f"Redis set/get测试: key={test_key}, value={value}")
# 删除测试键
client.delete(test_key)
# 获取Redis信息
info = client.info()
logger.info(f"Redis版本: {info.get('redis_version', '未知')}")
logger.info(f"已用内存: {info.get('used_memory_human', '未知')}")
logger.info(f"连接数: {info.get('connected_clients', '未知')}")
client.close()
logger.success("Redis连接测试通过")
return True
except Exception as e:
error_msg = str(e)
logger.error(f"Redis连接测试失败: {error_msg}")
# 根据错误类型提供具体的解决方案
if "invalid username-password pair" in error_msg or "Authentication required" in error_msg:
logger.error("🔐 密码验证失败,可能的原因:")
logger.error("1. Redis密码错误或未设置密码")
logger.error("2. 密码包含特殊字符需要URL编码")
logger.error("3. Redis配置中requirepass设置不正确")
logger.error("")
logger.error("💡 解决方案:")
logger.error("1. 检查Redis密码: sudo grep '^requirepass' /etc/redis/redis.conf")
logger.error("2. 测试密码连接: redis-cli -a '你的密码' ping")
logger.error("3. 如果密码包含特殊字符在Django配置中使用URL编码:")
logger.error(" 例如: 'xjjq1234!' 编码为 'xjjq1234%21'")
logger.error(" CELERY_BROKER_URL = 'redis://:xjjq1234%21@localhost:6379/0'")
logger.error("4. 临时禁用密码测试: 在redis.conf中注释掉requirepass行并重启")
elif "Connection refused" in error_msg:
logger.error("🔌 连接被拒绝,可能的原因:")
logger.error("1. Redis服务未运行")
logger.error("2. Redis绑定到其他IP地址")
logger.error("3. 防火墙阻止连接")
logger.error("")
logger.error("💡 解决方案:")
logger.error("1. 启动Redis: sudo systemctl start redis-server")
logger.error("2. 检查绑定IP: sudo grep '^bind' /etc/redis/redis.conf")
logger.error("3. 检查端口监听: sudo netstat -tlnp | grep 6379")
else:
logger.error("请检查:")
logger.error("1. Redis服务状态: sudo systemctl status redis-server")
logger.error("2. Redis配置文件: /etc/redis/redis.conf")
logger.error("3. 防火墙设置: sudo ufw status")
logger.error("4. Django settings.py中的CELERY_BROKER_URL配置")
return False
def test_celery_redis_integration():
"""测试Celery与Redis集成"""
logger.info("开始测试Celery与Redis集成...")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from diary_family.celery import app
# 测试Celery连接
logger.info("测试Celery连接...")
try:
result = app.control.ping(timeout=10)
logger.info(f"Celery连接测试结果: {result}")
except Exception as e:
logger.warning(f"Celery连接测试失败可能是worker未运行: {e}")
logger.info("这可能是正常的如果只测试Redis连接的话")
# 测试任务队列
logger.info("测试任务队列...")
try:
# 获取worker状态
stats = app.control.inspect().stats()
if stats:
logger.info(f"找到 {len(stats)} 个worker")
for worker, info in stats.items():
logger.info(f"Worker {worker}: {info.get('pool', {}).get('max-concurrency', '未知')} 并发")
else:
logger.warning("未找到运行的worker请启动Celery worker")
except Exception as e:
logger.warning(f"获取worker状态失败: {e}")
# 测试发送简单任务
logger.info("测试发送简单任务到Redis队列...")
try:
# 尝试导入debug_task如果不存在则创建简单的测试任务
try:
from core.tasks import debug_task
test_task = debug_task
logger.info("使用core.tasks中的debug_task")
except ImportError:
# 如果debug_task不存在创建一个简单的测试任务
logger.warning("debug_task未找到创建临时测试任务")
from celery import shared_task
@shared_task
def temp_debug_task():
return {"status": "success", "message": "临时测试任务执行成功"}
test_task = temp_debug_task
# 发送调试任务
task_result = test_task.delay()
logger.info(f"调试任务已发送任务ID: {task_result.id}")
# 等待任务完成最多10秒
try:
result = task_result.get(timeout=10)
logger.info(f"调试任务执行结果: {result}")
logger.success("任务成功执行并返回结果")
except Exception as e:
logger.warning(f"获取任务结果超时或失败: {e}")
logger.info("这可能是因为:")
logger.info("1. Celery worker未运行或未处理任务")
logger.info("2. 任务执行时间超过10秒")
logger.info("3. 任务执行出错")
logger.info("但任务已成功发送到Redis队列说明Celery-Redis连接正常")
except Exception as e:
logger.error(f"发送任务失败: {e}")
logger.error("这可能表示:")
logger.error("1. Celery配置错误")
logger.error("2. Redis连接问题")
logger.error("3. 任务序列化问题")
return False
logger.success("Celery与Redis集成测试通过")
return True
except Exception as e:
logger.error(f"Celery与Redis集成测试失败: {e}")
return False
def test_redis_performance():
"""测试Redis性能"""
logger.info("开始测试Redis性能...")
try:
import redis
import time
from urllib.parse import urlparse
import urllib.parse
# 从Django设置中获取Redis配置
from django.conf import settings
redis_url = settings.CELERY_BROKER_URL
# 使用urllib解析URL
parsed = urlparse(redis_url)
# 提取连接参数
host = parsed.hostname or 'localhost'
port = parsed.port or 6379
db = 0
# 解析数据库编号
if parsed.path and len(parsed.path) > 1:
try:
db = int(parsed.path[1:]) # 移除开头的'/'
except ValueError:
db = 0
# 解析密码
password = None
if parsed.password:
password = urllib.parse.unquote(parsed.password)
elif parsed.username:
password = urllib.parse.unquote(parsed.username)
client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
socket_connect_timeout=5,
socket_timeout=5,
decode_responses=True
)
# 性能测试
test_count = 100
start_time = time.time()
for i in range(test_count):
key = f"perf_test_{i}"
client.set(key, f"value_{i}", ex=60)
set_time = time.time() - start_time
logger.info(f"设置 {test_count} 个键耗时: {set_time:.3f}秒, 平均: {set_time/test_count*1000:.2f}毫秒/个")
start_time = time.time()
for i in range(test_count):
key = f"perf_test_{i}"
client.get(key)
get_time = time.time() - start_time
logger.info(f"获取 {test_count} 个键耗时: {get_time:.3f}秒, 平均: {get_time/test_count*1000:.2f}毫秒/个")
# 清理测试数据
for i in range(test_count):
key = f"perf_test_{i}"
client.delete(key)
client.close()
# 性能评估
avg_set_time = set_time / test_count * 1000 # 毫秒
avg_get_time = get_time / test_count * 1000 # 毫秒
if avg_set_time < 1 and avg_get_time < 1:
logger.success("Redis性能优秀")
elif avg_set_time < 5 and avg_get_time < 5:
logger.info("Redis性能良好")
elif avg_set_time < 10 and avg_get_time < 10:
logger.warning("Redis性能一般建议优化")
else:
logger.error("Redis性能较差请检查服务器负载和Redis配置")
return True
except Exception as e:
logger.error(f"Redis性能测试失败: {e}")
return False
def main():
"""主测试函数"""
logger.info("=== Redis和Celery生产环境配置测试开始 ===")
logger.info("测试环境: Ubuntu + Redis + Celery")
logger.info("=" * 50)
tests_passed = 0
total_tests = 3
# 测试1: Redis连接
logger.info("\n[测试1] Redis连接测试")
if test_redis_connection():
tests_passed += 1
else:
logger.error("Redis连接测试失败后续测试可能也会失败")
# 测试2: Celery与Redis集成
logger.info("\n[测试2] Celery与Redis集成测试")
if test_celery_redis_integration():
tests_passed += 1
# 测试3: Redis性能
logger.info("\n[测试3] Redis性能测试")
if test_redis_performance():
tests_passed += 1
# 测试总结
logger.info("\n" + "=" * 50)
logger.info("测试总结:")
logger.info(f"通过测试: {tests_passed}/{total_tests}")
if tests_passed == total_tests:
logger.success("所有测试通过生产环境Redis和Celery配置正确。")
logger.info("\n部署建议:")
logger.info("1. ✅ Redis服务运行正常")
logger.info("2. ✅ Celery可以连接到Redis")
logger.info("3. ✅ Redis性能满足要求")
logger.info("4. 建议配置Redis持久化和备份")
logger.info("5. 建议监控Redis内存使用情况")
return 0
elif tests_passed >= 2:
logger.warning("部分测试通过,生产环境基本可用。")
logger.info("\n需要检查:")
logger.info("1. 确保Redis服务正常运行")
logger.info("2. 检查Celery worker配置")
logger.info("3. 参考README中的故障排除指南")
return 1
else:
logger.error("多数测试失败,生产环境可能无法正常工作。")
logger.info("\n紧急处理:")
logger.info("1. ❌ 检查Redis服务状态: sudo systemctl status redis-server")
logger.info("2. ❌ 检查Redis配置: /etc/redis/redis.conf")
logger.info("3. ❌ 检查Django settings.py中的Celery配置")
logger.info("4. 参考README中的Redis部署章节重新配置")
return 1
if __name__ == "__main__":
import os
from pathlib import Path
# 配置日志
logger.remove()
# 创建日志目录
log_dir = Path("/var/log/celery")
log_dir.mkdir(parents=True, exist_ok=True)
# 设置日志文件权限
log_dir.chmod(0o755)
# 添加控制台输出
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="INFO"
)
# 添加日志文件输出
log_file = log_dir / "test_redis_celery.log"
logger.add(
log_file,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
level="INFO",
rotation="1 day",
retention="7 days",
encoding="utf-8"
)
logger.info(f"测试日志将同时输出到控制台和 {log_file}")
logger.info("=" * 60)
try:
exit_code = main()
sys.exit(exit_code)
except KeyboardInterrupt:
logger.warning("测试被用户中断")
sys.exit(1)
except Exception as e:
logger.error(f"测试过程中发生未预期错误: {e}")
sys.exit(1)