Files
diary-family/test_redis_celery.py
xiaji e3c9d6f17d fix(celery): 优化PDF邮件任务配置并增强测试
调整Celery任务的retry和timeout配置,增加详细的日志记录
增强测试脚本的错误处理和诊断信息
2026-01-19 21:58:26 +08:00

682 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
"""
Redis和Celery连接测试脚本
用于验证生产环境Redis和Celery配置是否正确
"""
import os
import sys
import time
from loguru import logger
# 添加项目路径到Python路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
def test_redis_connection():
"""测试Redis连接是否正常"""
logger.info("开始测试Redis连接...")
try:
import redis
from urllib.parse import urlparse
# 从Django设置中获取Redis配置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from django.conf import settings
# 获取Redis配置URL
redis_url = settings.CELERY_BROKER_URL
logger.info(f"Redis配置URL: {redis_url}")
# 使用urllib解析URL更健壮的方法
parsed = urlparse(redis_url)
# 提取连接参数
host = parsed.hostname or 'localhost'
port = parsed.port or 6379
db = 0
# 解析数据库编号
if parsed.path and len(parsed.path) > 1:
try:
db = int(parsed.path[1:]) # 移除开头的'/'
except ValueError:
db = 0
# 解析密码注意密码可能在netloc中的username部分
password = None
if parsed.password:
# URL解码密码处理特殊字符
import urllib.parse
password = urllib.parse.unquote(parsed.password)
elif parsed.username:
# 有些配置可能把密码放在username位置
password = urllib.parse.unquote(parsed.username)
logger.info(f"Redis连接参数: host={host}, port={port}, db={db}, password={'***' if password else 'None'}")
# 测试连接
client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
socket_connect_timeout=5,
socket_timeout=5,
decode_responses=True
)
# 测试ping
response = client.ping()
logger.info(f"Redis ping响应: {response}")
# 测试基本操作
test_key = f"test_{int(time.time())}"
client.set(test_key, "test_value", ex=10) # 10秒后过期
value = client.get(test_key)
logger.info(f"Redis set/get测试: key={test_key}, value={value}")
# 删除测试键
client.delete(test_key)
# 获取Redis信息
info = client.info()
logger.info(f"Redis版本: {info.get('redis_version', '未知')}")
logger.info(f"已用内存: {info.get('used_memory_human', '未知')}")
logger.info(f"连接数: {info.get('connected_clients', '未知')}")
client.close()
logger.success("Redis连接测试通过")
return True
except Exception as e:
error_msg = str(e)
logger.error(f"Redis连接测试失败: {error_msg}")
# 根据错误类型提供具体的解决方案
if "invalid username-password pair" in error_msg or "Authentication required" in error_msg:
logger.error("🔐 密码验证失败,可能的原因:")
logger.error("1. Redis密码错误或未设置密码")
logger.error("2. 密码包含特殊字符需要URL编码")
logger.error("3. Redis配置中requirepass设置不正确")
logger.error("")
logger.error("💡 解决方案:")
logger.error("1. 检查Redis密码: sudo grep '^requirepass' /etc/redis/redis.conf")
logger.error("2. 测试密码连接: redis-cli -a '你的密码' ping")
logger.error("3. 如果密码包含特殊字符在Django配置中使用URL编码:")
logger.error(" 例如: 'xjjq1234!' 编码为 'xjjq1234%21'")
logger.error(" CELERY_BROKER_URL = 'redis://:xjjq1234%21@localhost:6379/0'")
logger.error("4. 临时禁用密码测试: 在redis.conf中注释掉requirepass行并重启")
elif "Connection refused" in error_msg:
logger.error("🔌 连接被拒绝,可能的原因:")
logger.error("1. Redis服务未运行")
logger.error("2. Redis绑定到其他IP地址")
logger.error("3. 防火墙阻止连接")
logger.error("")
logger.error("💡 解决方案:")
logger.error("1. 启动Redis: sudo systemctl start redis-server")
logger.error("2. 检查绑定IP: sudo grep '^bind' /etc/redis/redis.conf")
logger.error("3. 检查端口监听: sudo netstat -tlnp | grep 6379")
else:
logger.error("请检查:")
logger.error("1. Redis服务状态: sudo systemctl status redis-server")
logger.error("2. Redis配置文件: /etc/redis/redis.conf")
logger.error("3. 防火墙设置: sudo ufw status")
logger.error("4. Django settings.py中的CELERY_BROKER_URL配置")
return False
def test_celery_redis_integration():
"""测试Celery与Redis集成"""
logger.info("开始测试Celery与Redis集成...")
try:
# 初始化Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
import django
django.setup()
from diary_family.celery import app
# 测试Celery连接
logger.info("测试Celery连接...")
try:
result = app.control.ping(timeout=10)
logger.info(f"Celery连接测试结果: {result}")
except Exception as e:
logger.warning(f"Celery连接测试失败可能是worker未运行: {e}")
logger.info("这可能是正常的如果只测试Redis连接的话")
# 测试任务队列
logger.info("测试任务队列...")
try:
# 获取worker状态
stats = app.control.inspect().stats()
if stats:
logger.info(f"找到 {len(stats)} 个worker")
for worker, info in stats.items():
logger.info(f"Worker {worker}: {info.get('pool', {}).get('max-concurrency', '未知')} 并发")
else:
logger.warning("未找到运行的worker请启动Celery worker")
except Exception as e:
logger.warning(f"获取worker状态失败: {e}")
# 测试发送简单任务
logger.info("测试发送简单任务到Redis队列...")
try:
# 尝试导入debug_task如果不存在则创建简单的测试任务
try:
from core.tasks import debug_task
test_task = debug_task
logger.info("使用core.tasks中的debug_task")
except ImportError:
# 如果debug_task不存在创建一个简单的测试任务
logger.warning("debug_task未找到创建临时测试任务")
from celery import shared_task
@shared_task
def temp_debug_task():
return {"status": "success", "message": "临时测试任务执行成功"}
test_task = temp_debug_task
# 发送调试任务
task_result = test_task.delay()
logger.info(f"调试任务已发送任务ID: {task_result.id}")
# 等待任务完成最多10秒
try:
result = task_result.get(timeout=10)
logger.info(f"调试任务执行结果: {result}")
logger.success("任务成功执行并返回结果")
except Exception as e:
logger.warning(f"获取任务结果超时或失败: {e}")
logger.info("这可能是因为:")
logger.info("1. Celery worker未运行或未处理任务")
logger.info("2. 任务执行时间超过10秒")
logger.info("3. 任务执行出错")
logger.info("但任务已成功发送到Redis队列说明Celery-Redis连接正常")
except Exception as e:
logger.error(f"发送任务失败: {e}")
logger.error("这可能表示:")
logger.error("1. Celery配置错误")
logger.error("2. Redis连接问题")
logger.error("3. 任务序列化问题")
return False
logger.success("Celery与Redis集成测试通过")
return True
except Exception as e:
logger.error(f"Celery与Redis集成测试失败: {e}")
return False
def test_redis_performance():
"""测试Redis性能"""
logger.info("开始测试Redis性能...")
try:
import redis
import time
from urllib.parse import urlparse
import urllib.parse
# 从Django设置中获取Redis配置
from django.conf import settings
redis_url = settings.CELERY_BROKER_URL
# 使用urllib解析URL
parsed = urlparse(redis_url)
# 提取连接参数
host = parsed.hostname or 'localhost'
port = parsed.port or 6379
db = 0
# 解析数据库编号
if parsed.path and len(parsed.path) > 1:
try:
db = int(parsed.path[1:]) # 移除开头的'/'
except ValueError:
db = 0
# 解析密码
password = None
if parsed.password:
password = urllib.parse.unquote(parsed.password)
elif parsed.username:
password = urllib.parse.unquote(parsed.username)
client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
socket_connect_timeout=5,
socket_timeout=5,
decode_responses=True
)
# 性能测试
test_count = 100
start_time = time.time()
for i in range(test_count):
key = f"perf_test_{i}"
client.set(key, f"value_{i}", ex=60)
set_time = time.time() - start_time
logger.info(f"设置 {test_count} 个键耗时: {set_time:.3f}秒, 平均: {set_time/test_count*1000:.2f}毫秒/个")
start_time = time.time()
for i in range(test_count):
key = f"perf_test_{i}"
client.get(key)
get_time = time.time() - start_time
logger.info(f"获取 {test_count} 个键耗时: {get_time:.3f}秒, 平均: {get_time/test_count*1000:.2f}毫秒/个")
# 清理测试数据
for i in range(test_count):
key = f"perf_test_{i}"
client.delete(key)
client.close()
# 性能评估
avg_set_time = set_time / test_count * 1000 # 毫秒
avg_get_time = get_time / test_count * 1000 # 毫秒
if avg_set_time < 1 and avg_get_time < 1:
logger.success("Redis性能优秀")
elif avg_set_time < 5 and avg_get_time < 5:
logger.info("Redis性能良好")
elif avg_set_time < 10 and avg_get_time < 10:
logger.warning("Redis性能一般建议优化")
else:
logger.error("Redis性能较差请检查服务器负载和Redis配置")
return True
except Exception as e:
logger.error(f"Redis性能测试失败: {e}")
return False
def test_celery_redis_email():
"""测试通过Celery和Redis发送邮件功能"""
logger.info("开始测试通过Celery和Redis发送邮件...")
try:
from core.tasks import celery_send_test_email
# 发送测试邮件任务
logger.info("发送Celery邮件测试任务到Redis队列...")
task = celery_send_test_email.delay(test_mode=True)
logger.info(f"邮件测试任务已发送任务ID: {task.id}")
# 等待任务完成
logger.info("等待任务执行最多30秒...")
result = task.get(timeout=30)
logger.info(f"邮件任务执行结果: {result}")
# 检查结果状态
if result.get('status') == 'success':
logger.success("通过Celery和Redis发送邮件测试成功")
return True
else:
logger.error(f"邮件发送失败: {result.get('error', '未知错误')}")
return False
except Exception as e:
error_msg = str(e)
logger.error(f"通过Celery和Redis发送邮件测试失败: {error_msg}")
# 提供具体的解决方案
if "No such file or directory" in error_msg:
logger.error("📁 文件路径错误,可能的原因:")
logger.error("1. 配置文件路径不存在")
logger.error("2. 权限问题导致无法访问文件")
elif "Connection refused" in error_msg:
logger.error("🔌 连接被拒绝,可能的原因:")
logger.error("1. Celery worker未运行")
logger.error("2. Redis服务未运行")
logger.error("3. 防火墙阻止连接")
else:
logger.error("请检查:")
logger.error("1. Celery worker是否运行: sudo supervisorctl status celery_worker")
logger.error("2. Redis服务状态: sudo systemctl status redis-server")
logger.error("3. 系统配置中的邮件设置是否正确")
logger.error("4. SMTP服务器配置是否正确")
return False
def test_celery_redis_pdf_email():
"""测试通过Celery和Redis发送包含PDF附件的邮件功能"""
logger.info("开始测试通过Celery和Redis发送包含PDF附件的邮件...")
try:
from core.tasks import celery_send_pdf_report_email
# 检查Celery worker状态
logger.info("检查Celery worker状态...")
from diary_family.celery import app
try:
# 检查worker是否运行
stats = app.control.inspect().stats()
if stats:
logger.info(f"✅ 找到 {len(stats)} 个运行中的worker")
else:
logger.warning("⚠️ 未找到运行中的worker任务可能无法执行")
logger.warning("请确保Celery worker正在运行: sudo supervisorctl status celery_worker")
except Exception as e:
logger.warning(f"⚠️ 检查worker状态失败: {e}")
# 发送PDF报告邮件任务
logger.info("发送Celery PDF报告邮件测试任务到Redis队列...")
task = celery_send_pdf_report_email.delay()
logger.info(f"PDF报告邮件测试任务已发送任务ID: {task.id}")
# 等待任务完成
logger.info("等待任务执行最多120秒...")
try:
result = task.get(timeout=120)
logger.info(f"PDF报告邮件任务执行结果: {result}")
# 检查结果状态
if result.get('status') == 'success':
logger.success("✅ 通过Celery和Redis发送包含PDF附件的邮件测试成功")
return True
else:
error_msg = result.get('error', '未知错误')
logger.error(f"❌ 包含PDF附件的邮件发送失败: {error_msg}")
# 提供具体的解决方案
if "WeasyPrint" in error_msg:
logger.error("📄 WeasyPrint相关错误可能的原因:")
logger.error("1. WeasyPrint库未安装")
logger.error("2. WeasyPrint依赖项缺失")
logger.error("3. 权限问题导致无法生成PDF")
logger.error("解决方案: 运行 'pip install weasyprint' 安装WeasyPrint库")
elif "SMTP" in error_msg or "smtp" in error_msg:
logger.error("📧 SMTP相关错误可能的原因:")
logger.error("1. SMTP服务器地址或端口错误")
logger.error("2. SMTP用户名或密码错误")
logger.error("3. SMTP服务器不允许从当前IP地址连接")
logger.error("4. SMTP服务器超时")
elif "Redis" in error_msg or "redis" in error_msg:
logger.error("🔴 Redis相关错误可能的原因:")
logger.error("1. Redis服务未运行")
logger.error("2. Redis密码错误")
logger.error("3. Redis连接超时")
return False
except Exception as e:
error_msg = str(e)
logger.error(f"❌ 等待任务完成时发生错误: {error_msg}")
# 检查任务状态
try:
state = task.state
logger.info(f"任务当前状态: {state}")
if state == 'PENDING':
logger.error("🔴 任务仍处于待处理状态,可能的原因:")
logger.error("1. Celery worker未运行")
logger.error("2. 任务队列已满")
logger.error("3. Redis连接问题")
elif state == 'RETRY':
logger.error("🔄 任务正在重试,可能的原因:")
logger.error("1. 任务执行过程中发生错误")
logger.error("2. SMTP服务器临时不可用")
elif state == 'FAILURE':
logger.error("💥 任务执行失败,可能的原因:")
logger.error("1. 任务代码中存在错误")
logger.error("2. WeasyPrint库未正确安装")
logger.error("3. 邮件配置错误")
except Exception as state_e:
logger.error(f"获取任务状态失败: {state_e}")
# 提供具体的解决方案
if "timeout" in error_msg.lower():
logger.error("⏱️ 超时错误,可能的原因:")
logger.error("1. Celery worker未运行任务无法执行")
logger.error("2. 任务执行时间过长超过了120秒的超时限制")
logger.error("3. SMTP服务器响应缓慢")
logger.error("4. Redis连接超时")
# 检查系统资源
logger.error("请检查系统资源使用情况:")
logger.error(" sudo top -bn1 | head -20")
logger.error(" sudo systemctl status redis-server")
logger.error(" sudo supervisorctl status celery_worker")
return False
except Exception as e:
error_msg = str(e)
logger.error(f"❌ 通过Celery和Redis发送包含PDF附件的邮件测试失败: {error_msg}")
# 提供具体的解决方案
if "No module named" in error_msg:
module_name = error_msg.split("'"[1])
logger.error(f"📦 缺少模块: {module_name}")
logger.error(f"解决方案: 运行 'pip install {module_name}'")
elif "No such file or directory" in error_msg:
logger.error("📁 文件路径错误,可能的原因:")
logger.error("1. 报告目录不存在")
logger.error("2. 权限问题导致无法访问目录")
logger.error("3. 检查settings.py中的REPORTS_ROOT配置")
elif "Connection refused" in error_msg:
logger.error("🔌 连接被拒绝,可能的原因:")
logger.error("1. Redis服务未运行")
logger.error("2. Celery worker未运行")
logger.error("3. 防火墙阻止连接")
else:
logger.error("请检查:")
logger.error("1. Celery worker是否运行: sudo supervisorctl status celery_worker")
logger.error("2. Redis服务状态: sudo systemctl status redis-server")
logger.error("3. 系统配置中的邮件设置是否正确")
logger.error("4. SMTP服务器配置是否正确")
logger.error("5. WeasyPrint库是否已正确安装: pip show weasyprint")
logger.error("6. 检查Celery日志获取更多信息: sudo tail -f /var/log/celery/worker.log")
return False
def check_logs_config():
"""检查Gunicorn、Celery、Redis的日志是否写入同一个文件"""
logger.info("开始检查日志配置...")
try:
from django.conf import settings
# 检查Django日志配置
if hasattr(settings, 'LOGGING'):
logging_config = settings.LOGGING
logger.info("Django日志配置:")
# 检查日志处理器
handlers = logging_config.get('handlers', {})
file_handlers = [name for name, handler in handlers.items() if handler.get('class') == 'logging.handlers.RotatingFileHandler']
if file_handlers:
for handler_name in file_handlers:
handler = handlers[handler_name]
log_file = handler.get('filename', '未知')
logger.info(f" 文件日志处理器 '{handler_name}' 写入到: {log_file}")
# 检查日志器配置
loggers = logging_config.get('loggers', {})
logger_names = []
for logger_name, logger_config in loggers.items():
if handler_name in logger_config.get('handlers', []):
logger_names.append(logger_name)
if logger_names:
logger.info(f" 使用该处理器的日志器: {', '.join(logger_names)}")
# 检查是否包含celery日志
if 'celery' in logger_names:
logger.info(" ✅ Celery日志将写入该文件")
else:
logger.warning(" ⚠️ Celery日志未配置使用该处理器")
else:
logger.warning(" 未找到文件日志处理器配置")
else:
logger.warning(" 未配置Django LOGGING设置")
# 检查Celery日志配置
if hasattr(settings, 'CELERY_LOG_FILE'):
celery_log_file = settings.CELERY_LOG_FILE
logger.info(f"Celery工作进程日志配置: {celery_log_file}")
else:
logger.warning("未配置CELERY_LOG_FILE")
# 检查Gunicorn日志配置通常在Gunicorn配置文件中这里检查环境变量或常见配置
logger.info("Gunicorn日志配置:")
logger.info(" 注意: Gunicorn日志通常在其配置文件或启动命令中设置")
logger.info(" 建议配置Gunicorn日志写入与Django/Celery相同的日志文件")
# 检查Redis日志配置
logger.info("Redis日志配置:")
logger.info(" 注意: Redis日志通常在/etc/redis/redis.conf中配置")
logger.info(" 建议配置Redis日志写入与其他服务相同的日志目录")
logger.info("\n日志配置检查完成!")
logger.info("建议:")
logger.info("1. 确保Django、Celery、Gunicorn、Redis的日志都写入同一个文件")
logger.info("2. 配置日志轮转和保留策略,避免日志文件过大")
logger.info("3. 定期检查日志文件,及时发现问题")
return True
except Exception as e:
logger.error(f"日志配置检查失败: {e}")
return False
def main():
"""主测试函数"""
logger.info("=== Redis和Celery生产环境配置测试开始 ===")
logger.info("测试环境: Ubuntu + Redis + Celery")
logger.info("=" * 50)
tests_passed = 0
total_tests = 6
# 测试1: Redis连接
logger.info("\n[测试1] Redis连接测试")
if test_redis_connection():
tests_passed += 1
else:
logger.error("Redis连接测试失败后续测试可能也会失败")
# 测试2: Celery与Redis集成
logger.info("\n[测试2] Celery与Redis集成测试")
if test_celery_redis_integration():
tests_passed += 1
# 测试3: Redis性能
logger.info("\n[测试3] Redis性能测试")
if test_redis_performance():
tests_passed += 1
# 测试4: 通过Celery和Redis发送邮件
logger.info("\n[测试4] 通过Celery和Redis发送邮件测试")
if test_celery_redis_email():
tests_passed += 1
# 测试5: 通过Celery和Redis发送包含PDF附件的邮件
logger.info("\n[测试5] 通过Celery和Redis发送包含PDF附件的邮件测试")
if test_celery_redis_pdf_email():
tests_passed += 1
# 测试6: 检查日志配置
logger.info("\n[测试6] 检查日志配置")
if check_logs_config():
tests_passed += 1
# 测试总结
logger.info("\n" + "=" * 50)
logger.info("测试总结:")
logger.info(f"通过测试: {tests_passed}/{total_tests}")
if tests_passed == total_tests:
logger.success("所有测试通过生产环境Redis和Celery配置正确。")
logger.info("\n部署建议:")
logger.info("1. ✅ Redis服务运行正常")
logger.info("2. ✅ Celery可以连接到Redis")
logger.info("3. ✅ Redis性能满足要求")
logger.info("4. ✅ 可以通过Celery和Redis发送邮件")
logger.info("5. ✅ 可以通过Celery和Redis发送包含PDF附件的邮件")
logger.info("6. ✅ 日志配置检查完成")
logger.info("7. 建议配置Redis持久化和备份")
logger.info("8. 建议监控Redis内存使用情况")
logger.info("9. 确保Gunicorn、Celery、Redis日志写入同一个文件")
return 0
elif tests_passed >= 4:
logger.warning("部分测试通过,生产环境基本可用。")
logger.info("\n需要检查:")
logger.info("1. 确保Redis服务正常运行")
logger.info("2. 检查Celery worker配置")
logger.info("3. 参考README中的故障排除指南")
logger.info("4. 检查邮件配置是否正确")
logger.info("5. 检查WeasyPrint库是否已正确安装")
logger.info("6. 检查日志配置是否符合要求")
return 1
else:
logger.error("多数测试失败,生产环境可能无法正常工作。")
logger.info("\n紧急处理:")
logger.info("1. ❌ 检查Redis服务状态: sudo systemctl status redis-server")
logger.info("2. ❌ 检查Redis配置: /etc/redis/redis.conf")
logger.info("3. ❌ 检查Django settings.py中的Celery配置")
logger.info("4. ❌ 检查Celery worker是否运行")
logger.info("5. ❌ 检查系统配置中的邮件设置")
logger.info("6. ❌ 检查WeasyPrint库是否已正确安装")
logger.info("7. ❌ 参考README中的Redis部署章节重新配置")
return 1
if __name__ == "__main__":
import os
from pathlib import Path
# 配置日志
logger.remove()
# 创建日志目录
log_dir = Path("/var/log/celery")
log_dir.mkdir(parents=True, exist_ok=True)
# 设置日志文件权限
log_dir.chmod(0o755)
# 添加控制台输出
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="INFO"
)
# 添加日志文件输出
log_file = log_dir / "test_redis_celery.log"
logger.add(
log_file,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
level="INFO",
rotation="1 day",
retention="7 days",
encoding="utf-8"
)
logger.info(f"测试日志将同时输出到控制台和 {log_file}")
logger.info("=" * 60)
try:
exit_code = main()
sys.exit(exit_code)
except KeyboardInterrupt:
logger.warning("测试被用户中断")
sys.exit(1)
except Exception as e:
logger.error(f"测试过程中发生未预期错误: {e}")
sys.exit(1)