Files
diary-family/test_redis_celery.py

682 lines
28 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python
"""
Redis和Celery连接测试脚本
用于验证生产环境Redis和Celery配置是否正确
"""
import os
import sys
import time
from loguru import logger
# 添加项目路径到Python路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
def test_redis_connection():
"""测试Redis连接是否正常"""
logger.info("开始测试Redis连接...")
try:
import redis
from urllib.parse import urlparse
# 从Django设置中获取Redis配置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from django.conf import settings
# 获取Redis配置URL
redis_url = settings.CELERY_BROKER_URL
logger.info(f"Redis配置URL: {redis_url}")
# 使用urllib解析URL更健壮的方法
parsed = urlparse(redis_url)
# 提取连接参数
host = parsed.hostname or 'localhost'
port = parsed.port or 6379
db = 0
# 解析数据库编号
if parsed.path and len(parsed.path) > 1:
try:
db = int(parsed.path[1:]) # 移除开头的'/'
except ValueError:
db = 0
# 解析密码注意密码可能在netloc中的username部分
password = None
if parsed.password:
# URL解码密码处理特殊字符
import urllib.parse
password = urllib.parse.unquote(parsed.password)
elif parsed.username:
# 有些配置可能把密码放在username位置
password = urllib.parse.unquote(parsed.username)
logger.info(f"Redis连接参数: host={host}, port={port}, db={db}, password={'***' if password else 'None'}")
# 测试连接
client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
socket_connect_timeout=5,
socket_timeout=5,
decode_responses=True
)
# 测试ping
response = client.ping()
logger.info(f"Redis ping响应: {response}")
# 测试基本操作
test_key = f"test_{int(time.time())}"
client.set(test_key, "test_value", ex=10) # 10秒后过期
value = client.get(test_key)
logger.info(f"Redis set/get测试: key={test_key}, value={value}")
# 删除测试键
client.delete(test_key)
# 获取Redis信息
info = client.info()
logger.info(f"Redis版本: {info.get('redis_version', '未知')}")
logger.info(f"已用内存: {info.get('used_memory_human', '未知')}")
logger.info(f"连接数: {info.get('connected_clients', '未知')}")
client.close()
logger.success("Redis连接测试通过")
return True
except Exception as e:
error_msg = str(e)
logger.error(f"Redis连接测试失败: {error_msg}")
# 根据错误类型提供具体的解决方案
if "invalid username-password pair" in error_msg or "Authentication required" in error_msg:
logger.error("🔐 密码验证失败,可能的原因:")
logger.error("1. Redis密码错误或未设置密码")
logger.error("2. 密码包含特殊字符需要URL编码")
logger.error("3. Redis配置中requirepass设置不正确")
logger.error("")
logger.error("💡 解决方案:")
logger.error("1. 检查Redis密码: sudo grep '^requirepass' /etc/redis/redis.conf")
logger.error("2. 测试密码连接: redis-cli -a '你的密码' ping")
logger.error("3. 如果密码包含特殊字符在Django配置中使用URL编码:")
logger.error(" 例如: 'xjjq1234!' 编码为 'xjjq1234%21'")
logger.error(" CELERY_BROKER_URL = 'redis://:xjjq1234%21@localhost:6379/0'")
logger.error("4. 临时禁用密码测试: 在redis.conf中注释掉requirepass行并重启")
elif "Connection refused" in error_msg:
logger.error("🔌 连接被拒绝,可能的原因:")
logger.error("1. Redis服务未运行")
logger.error("2. Redis绑定到其他IP地址")
logger.error("3. 防火墙阻止连接")
logger.error("")
logger.error("💡 解决方案:")
logger.error("1. 启动Redis: sudo systemctl start redis-server")
logger.error("2. 检查绑定IP: sudo grep '^bind' /etc/redis/redis.conf")
logger.error("3. 检查端口监听: sudo netstat -tlnp | grep 6379")
else:
logger.error("请检查:")
logger.error("1. Redis服务状态: sudo systemctl status redis-server")
logger.error("2. Redis配置文件: /etc/redis/redis.conf")
logger.error("3. 防火墙设置: sudo ufw status")
logger.error("4. Django settings.py中的CELERY_BROKER_URL配置")
return False
def test_celery_redis_integration():
"""测试Celery与Redis集成"""
logger.info("开始测试Celery与Redis集成...")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from diary_family.celery import app
# 测试Celery连接
logger.info("测试Celery连接...")
try:
result = app.control.ping(timeout=10)
logger.info(f"Celery连接测试结果: {result}")
except Exception as e:
logger.warning(f"Celery连接测试失败可能是worker未运行: {e}")
logger.info("这可能是正常的如果只测试Redis连接的话")
# 测试任务队列
logger.info("测试任务队列...")
try:
# 获取worker状态
stats = app.control.inspect().stats()
if stats:
logger.info(f"找到 {len(stats)} 个worker")
for worker, info in stats.items():
logger.info(f"Worker {worker}: {info.get('pool', {}).get('max-concurrency', '未知')} 并发")
else:
logger.warning("未找到运行的worker请启动Celery worker")
except Exception as e:
logger.warning(f"获取worker状态失败: {e}")
# 测试发送简单任务
logger.info("测试发送简单任务到Redis队列...")
try:
# 尝试导入debug_task如果不存在则创建简单的测试任务
try:
from core.tasks import debug_task
test_task = debug_task
logger.info("使用core.tasks中的debug_task")
except ImportError:
# 如果debug_task不存在创建一个简单的测试任务
logger.warning("debug_task未找到创建临时测试任务")
from celery import shared_task
@shared_task
def temp_debug_task():
return {"status": "success", "message": "临时测试任务执行成功"}
test_task = temp_debug_task
# 发送调试任务
task_result = test_task.delay()
logger.info(f"调试任务已发送任务ID: {task_result.id}")
# 等待任务完成最多10秒
try:
result = task_result.get(timeout=10)
logger.info(f"调试任务执行结果: {result}")
logger.success("任务成功执行并返回结果")
except Exception as e:
logger.warning(f"获取任务结果超时或失败: {e}")
logger.info("这可能是因为:")
logger.info("1. Celery worker未运行或未处理任务")
logger.info("2. 任务执行时间超过10秒")
logger.info("3. 任务执行出错")
logger.info("但任务已成功发送到Redis队列说明Celery-Redis连接正常")
except Exception as e:
logger.error(f"发送任务失败: {e}")
logger.error("这可能表示:")
logger.error("1. Celery配置错误")
logger.error("2. Redis连接问题")
logger.error("3. 任务序列化问题")
return False
logger.success("Celery与Redis集成测试通过")
return True
except Exception as e:
logger.error(f"Celery与Redis集成测试失败: {e}")
return False
def test_redis_performance():
"""测试Redis性能"""
logger.info("开始测试Redis性能...")
try:
import redis
import time
from urllib.parse import urlparse
import urllib.parse
# 从Django设置中获取Redis配置
from django.conf import settings
redis_url = settings.CELERY_BROKER_URL
# 使用urllib解析URL
parsed = urlparse(redis_url)
# 提取连接参数
host = parsed.hostname or 'localhost'
port = parsed.port or 6379
db = 0
# 解析数据库编号
if parsed.path and len(parsed.path) > 1:
try:
db = int(parsed.path[1:]) # 移除开头的'/'
except ValueError:
db = 0
# 解析密码
password = None
if parsed.password:
password = urllib.parse.unquote(parsed.password)
elif parsed.username:
password = urllib.parse.unquote(parsed.username)
client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
socket_connect_timeout=5,
socket_timeout=5,
decode_responses=True
)
# 性能测试
test_count = 100
start_time = time.time()
for i in range(test_count):
key = f"perf_test_{i}"
client.set(key, f"value_{i}", ex=60)
set_time = time.time() - start_time
logger.info(f"设置 {test_count} 个键耗时: {set_time:.3f}秒, 平均: {set_time/test_count*1000:.2f}毫秒/个")
start_time = time.time()
for i in range(test_count):
key = f"perf_test_{i}"
client.get(key)
get_time = time.time() - start_time
logger.info(f"获取 {test_count} 个键耗时: {get_time:.3f}秒, 平均: {get_time/test_count*1000:.2f}毫秒/个")
# 清理测试数据
for i in range(test_count):
key = f"perf_test_{i}"
client.delete(key)
client.close()
# 性能评估
avg_set_time = set_time / test_count * 1000 # 毫秒
avg_get_time = get_time / test_count * 1000 # 毫秒
if avg_set_time < 1 and avg_get_time < 1:
logger.success("Redis性能优秀")
elif avg_set_time < 5 and avg_get_time < 5:
logger.info("Redis性能良好")
elif avg_set_time < 10 and avg_get_time < 10:
logger.warning("Redis性能一般建议优化")
else:
logger.error("Redis性能较差请检查服务器负载和Redis配置")
return True
except Exception as e:
logger.error(f"Redis性能测试失败: {e}")
return False
def test_celery_redis_email():
"""测试通过Celery和Redis发送邮件功能"""
logger.info("开始测试通过Celery和Redis发送邮件...")
try:
from core.tasks import celery_send_test_email
# 发送测试邮件任务
logger.info("发送Celery邮件测试任务到Redis队列...")
task = celery_send_test_email.delay(test_mode=True)
logger.info(f"邮件测试任务已发送任务ID: {task.id}")
# 等待任务完成
logger.info("等待任务执行最多30秒...")
result = task.get(timeout=30)
logger.info(f"邮件任务执行结果: {result}")
# 检查结果状态
if result.get('status') == 'success':
logger.success("通过Celery和Redis发送邮件测试成功")
return True
else:
logger.error(f"邮件发送失败: {result.get('error', '未知错误')}")
return False
except Exception as e:
error_msg = str(e)
logger.error(f"通过Celery和Redis发送邮件测试失败: {error_msg}")
# 提供具体的解决方案
if "No such file or directory" in error_msg:
logger.error("📁 文件路径错误,可能的原因:")
logger.error("1. 配置文件路径不存在")
logger.error("2. 权限问题导致无法访问文件")
elif "Connection refused" in error_msg:
logger.error("🔌 连接被拒绝,可能的原因:")
logger.error("1. Celery worker未运行")
logger.error("2. Redis服务未运行")
logger.error("3. 防火墙阻止连接")
else:
logger.error("请检查:")
logger.error("1. Celery worker是否运行: sudo supervisorctl status celery_worker")
logger.error("2. Redis服务状态: sudo systemctl status redis-server")
logger.error("3. 系统配置中的邮件设置是否正确")
logger.error("4. SMTP服务器配置是否正确")
return False
def test_celery_redis_pdf_email():
"""测试通过Celery和Redis发送包含PDF附件的邮件功能"""
logger.info("开始测试通过Celery和Redis发送包含PDF附件的邮件...")
try:
from core.tasks import celery_send_pdf_report_email
# 检查Celery worker状态
logger.info("检查Celery worker状态...")
from diary_family.celery import app
try:
# 检查worker是否运行
stats = app.control.inspect().stats()
if stats:
logger.info(f"✅ 找到 {len(stats)} 个运行中的worker")
else:
logger.warning("⚠️ 未找到运行中的worker任务可能无法执行")
logger.warning("请确保Celery worker正在运行: sudo supervisorctl status celery_worker")
except Exception as e:
logger.warning(f"⚠️ 检查worker状态失败: {e}")
# 发送PDF报告邮件任务
logger.info("发送Celery PDF报告邮件测试任务到Redis队列...")
task = celery_send_pdf_report_email.delay()
logger.info(f"PDF报告邮件测试任务已发送任务ID: {task.id}")
# 等待任务完成
logger.info("等待任务执行最多120秒...")
try:
result = task.get(timeout=120)
logger.info(f"PDF报告邮件任务执行结果: {result}")
# 检查结果状态
if result.get('status') == 'success':
logger.success("✅ 通过Celery和Redis发送包含PDF附件的邮件测试成功")
return True
else:
error_msg = result.get('error', '未知错误')
logger.error(f"❌ 包含PDF附件的邮件发送失败: {error_msg}")
# 提供具体的解决方案
if "WeasyPrint" in error_msg:
logger.error("📄 WeasyPrint相关错误可能的原因:")
logger.error("1. WeasyPrint库未安装")
logger.error("2. WeasyPrint依赖项缺失")
logger.error("3. 权限问题导致无法生成PDF")
logger.error("解决方案: 运行 'pip install weasyprint' 安装WeasyPrint库")
elif "SMTP" in error_msg or "smtp" in error_msg:
logger.error("📧 SMTP相关错误可能的原因:")
logger.error("1. SMTP服务器地址或端口错误")
logger.error("2. SMTP用户名或密码错误")
logger.error("3. SMTP服务器不允许从当前IP地址连接")
logger.error("4. SMTP服务器超时")
elif "Redis" in error_msg or "redis" in error_msg:
logger.error("🔴 Redis相关错误可能的原因:")
logger.error("1. Redis服务未运行")
logger.error("2. Redis密码错误")
logger.error("3. Redis连接超时")
return False
except Exception as e:
error_msg = str(e)
logger.error(f"❌ 等待任务完成时发生错误: {error_msg}")
# 检查任务状态
try:
state = task.state
logger.info(f"任务当前状态: {state}")
if state == 'PENDING':
logger.error("🔴 任务仍处于待处理状态,可能的原因:")
logger.error("1. Celery worker未运行")
logger.error("2. 任务队列已满")
logger.error("3. Redis连接问题")
elif state == 'RETRY':
logger.error("🔄 任务正在重试,可能的原因:")
logger.error("1. 任务执行过程中发生错误")
logger.error("2. SMTP服务器临时不可用")
elif state == 'FAILURE':
logger.error("💥 任务执行失败,可能的原因:")
logger.error("1. 任务代码中存在错误")
logger.error("2. WeasyPrint库未正确安装")
logger.error("3. 邮件配置错误")
except Exception as state_e:
logger.error(f"获取任务状态失败: {state_e}")
# 提供具体的解决方案
if "timeout" in error_msg.lower():
logger.error("⏱️ 超时错误,可能的原因:")
logger.error("1. Celery worker未运行任务无法执行")
logger.error("2. 任务执行时间过长超过了120秒的超时限制")
logger.error("3. SMTP服务器响应缓慢")
logger.error("4. Redis连接超时")
# 检查系统资源
logger.error("请检查系统资源使用情况:")
logger.error(" sudo top -bn1 | head -20")
logger.error(" sudo systemctl status redis-server")
logger.error(" sudo supervisorctl status celery_worker")
return False
except Exception as e:
error_msg = str(e)
logger.error(f"❌ 通过Celery和Redis发送包含PDF附件的邮件测试失败: {error_msg}")
# 提供具体的解决方案
if "No module named" in error_msg:
module_name = error_msg.split("'"[1])
logger.error(f"📦 缺少模块: {module_name}")
logger.error(f"解决方案: 运行 'pip install {module_name}'")
elif "No such file or directory" in error_msg:
logger.error("📁 文件路径错误,可能的原因:")
logger.error("1. 报告目录不存在")
logger.error("2. 权限问题导致无法访问目录")
logger.error("3. 检查settings.py中的REPORTS_ROOT配置")
elif "Connection refused" in error_msg:
logger.error("🔌 连接被拒绝,可能的原因:")
logger.error("1. Redis服务未运行")
logger.error("2. Celery worker未运行")
logger.error("3. 防火墙阻止连接")
else:
logger.error("请检查:")
logger.error("1. Celery worker是否运行: sudo supervisorctl status celery_worker")
logger.error("2. Redis服务状态: sudo systemctl status redis-server")
logger.error("3. 系统配置中的邮件设置是否正确")
logger.error("4. SMTP服务器配置是否正确")
logger.error("5. WeasyPrint库是否已正确安装: pip show weasyprint")
logger.error("6. 检查Celery日志获取更多信息: sudo tail -f /var/log/celery/worker.log")
return False
def check_logs_config():
"""检查Gunicorn、Celery、Redis的日志是否写入同一个文件"""
logger.info("开始检查日志配置...")
try:
from django.conf import settings
# 检查Django日志配置
if hasattr(settings, 'LOGGING'):
logging_config = settings.LOGGING
logger.info("Django日志配置:")
# 检查日志处理器
handlers = logging_config.get('handlers', {})
file_handlers = [name for name, handler in handlers.items() if handler.get('class') == 'logging.handlers.RotatingFileHandler']
if file_handlers:
for handler_name in file_handlers:
handler = handlers[handler_name]
log_file = handler.get('filename', '未知')
logger.info(f" 文件日志处理器 '{handler_name}' 写入到: {log_file}")
# 检查日志器配置
loggers = logging_config.get('loggers', {})
logger_names = []
for logger_name, logger_config in loggers.items():
if handler_name in logger_config.get('handlers', []):
logger_names.append(logger_name)
if logger_names:
logger.info(f" 使用该处理器的日志器: {', '.join(logger_names)}")
# 检查是否包含celery日志
if 'celery' in logger_names:
logger.info(" ✅ Celery日志将写入该文件")
else:
logger.warning(" ⚠️ Celery日志未配置使用该处理器")
else:
logger.warning(" 未找到文件日志处理器配置")
else:
logger.warning(" 未配置Django LOGGING设置")
# 检查Celery日志配置
if hasattr(settings, 'CELERY_LOG_FILE'):
celery_log_file = settings.CELERY_LOG_FILE
logger.info(f"Celery工作进程日志配置: {celery_log_file}")
else:
logger.warning("未配置CELERY_LOG_FILE")
# 检查Gunicorn日志配置通常在Gunicorn配置文件中这里检查环境变量或常见配置
logger.info("Gunicorn日志配置:")
logger.info(" 注意: Gunicorn日志通常在其配置文件或启动命令中设置")
logger.info(" 建议配置Gunicorn日志写入与Django/Celery相同的日志文件")
# 检查Redis日志配置
logger.info("Redis日志配置:")
logger.info(" 注意: Redis日志通常在/etc/redis/redis.conf中配置")
logger.info(" 建议配置Redis日志写入与其他服务相同的日志目录")
logger.info("\n日志配置检查完成!")
logger.info("建议:")
logger.info("1. 确保Django、Celery、Gunicorn、Redis的日志都写入同一个文件")
logger.info("2. 配置日志轮转和保留策略,避免日志文件过大")
logger.info("3. 定期检查日志文件,及时发现问题")
return True
except Exception as e:
logger.error(f"日志配置检查失败: {e}")
return False
def main():
"""主测试函数"""
logger.info("=== Redis和Celery生产环境配置测试开始 ===")
logger.info("测试环境: Ubuntu + Redis + Celery")
logger.info("=" * 50)
tests_passed = 0
total_tests = 6
# 测试1: Redis连接
logger.info("\n[测试1] Redis连接测试")
if test_redis_connection():
tests_passed += 1
else:
logger.error("Redis连接测试失败后续测试可能也会失败")
# 测试2: Celery与Redis集成
logger.info("\n[测试2] Celery与Redis集成测试")
if test_celery_redis_integration():
tests_passed += 1
# 测试3: Redis性能
logger.info("\n[测试3] Redis性能测试")
if test_redis_performance():
tests_passed += 1
# 测试4: 通过Celery和Redis发送邮件
logger.info("\n[测试4] 通过Celery和Redis发送邮件测试")
if test_celery_redis_email():
tests_passed += 1
# 测试5: 通过Celery和Redis发送包含PDF附件的邮件
logger.info("\n[测试5] 通过Celery和Redis发送包含PDF附件的邮件测试")
if test_celery_redis_pdf_email():
tests_passed += 1
# 测试6: 检查日志配置
logger.info("\n[测试6] 检查日志配置")
if check_logs_config():
tests_passed += 1
# 测试总结
logger.info("\n" + "=" * 50)
logger.info("测试总结:")
logger.info(f"通过测试: {tests_passed}/{total_tests}")
if tests_passed == total_tests:
logger.success("所有测试通过生产环境Redis和Celery配置正确。")
logger.info("\n部署建议:")
logger.info("1. ✅ Redis服务运行正常")
logger.info("2. ✅ Celery可以连接到Redis")
logger.info("3. ✅ Redis性能满足要求")
logger.info("4. ✅ 可以通过Celery和Redis发送邮件")
logger.info("5. ✅ 可以通过Celery和Redis发送包含PDF附件的邮件")
logger.info("6. ✅ 日志配置检查完成")
logger.info("7. 建议配置Redis持久化和备份")
logger.info("8. 建议监控Redis内存使用情况")
logger.info("9. 确保Gunicorn、Celery、Redis日志写入同一个文件")
return 0
elif tests_passed >= 4:
logger.warning("部分测试通过,生产环境基本可用。")
logger.info("\n需要检查:")
logger.info("1. 确保Redis服务正常运行")
logger.info("2. 检查Celery worker配置")
logger.info("3. 参考README中的故障排除指南")
logger.info("4. 检查邮件配置是否正确")
logger.info("5. 检查WeasyPrint库是否已正确安装")
logger.info("6. 检查日志配置是否符合要求")
return 1
else:
logger.error("多数测试失败,生产环境可能无法正常工作。")
logger.info("\n紧急处理:")
logger.info("1. ❌ 检查Redis服务状态: sudo systemctl status redis-server")
logger.info("2. ❌ 检查Redis配置: /etc/redis/redis.conf")
logger.info("3. ❌ 检查Django settings.py中的Celery配置")
logger.info("4. ❌ 检查Celery worker是否运行")
logger.info("5. ❌ 检查系统配置中的邮件设置")
logger.info("6. ❌ 检查WeasyPrint库是否已正确安装")
logger.info("7. ❌ 参考README中的Redis部署章节重新配置")
return 1
if __name__ == "__main__":
import os
from pathlib import Path
# 配置日志
logger.remove()
# 创建日志目录
log_dir = Path("/var/log/celery")
log_dir.mkdir(parents=True, exist_ok=True)
# 设置日志文件权限
log_dir.chmod(0o755)
# 添加控制台输出
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="INFO"
)
# 添加日志文件输出
log_file = log_dir / "test_redis_celery.log"
logger.add(
log_file,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
level="INFO",
rotation="1 day",
retention="7 days",
encoding="utf-8"
)
logger.info(f"测试日志将同时输出到控制台和 {log_file}")
logger.info("=" * 60)
try:
exit_code = main()
sys.exit(exit_code)
except KeyboardInterrupt:
logger.warning("测试被用户中断")
sys.exit(1)
except Exception as e:
logger.error(f"测试过程中发生未预期错误: {e}")
sys.exit(1)