#!/usr/bin/env python """ Redis和Celery连接测试脚本 用于验证生产环境Redis和Celery配置是否正确 """ import os import sys import time from loguru import logger # 添加项目路径到Python路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) def test_redis_connection(): """测试Redis连接是否正常""" logger.info("开始测试Redis连接...") try: import redis from urllib.parse import urlparse # 从Django设置中获取Redis配置 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings') import django django.setup() from django.conf import settings # 获取Redis配置URL redis_url = settings.CELERY_BROKER_URL logger.info(f"Redis配置URL: {redis_url}") # 使用urllib解析URL(更健壮的方法) parsed = urlparse(redis_url) # 提取连接参数 host = parsed.hostname or 'localhost' port = parsed.port or 6379 db = 0 # 解析数据库编号 if parsed.path and len(parsed.path) > 1: try: db = int(parsed.path[1:]) # 移除开头的'/' except ValueError: db = 0 # 解析密码(注意:密码可能在netloc中的username部分) password = None if parsed.password: # URL解码密码(处理特殊字符) import urllib.parse password = urllib.parse.unquote(parsed.password) elif parsed.username: # 有些配置可能把密码放在username位置 password = urllib.parse.unquote(parsed.username) logger.info(f"Redis连接参数: host={host}, port={port}, db={db}, password={'***' if password else 'None'}") # 测试连接 client = redis.Redis( host=host, port=port, db=db, password=password, socket_connect_timeout=5, socket_timeout=5, decode_responses=True ) # 测试ping response = client.ping() logger.info(f"Redis ping响应: {response}") # 测试基本操作 test_key = f"test_{int(time.time())}" client.set(test_key, "test_value", ex=10) # 10秒后过期 value = client.get(test_key) logger.info(f"Redis set/get测试: key={test_key}, value={value}") # 删除测试键 client.delete(test_key) # 获取Redis信息 info = client.info() logger.info(f"Redis版本: {info.get('redis_version', '未知')}") logger.info(f"已用内存: {info.get('used_memory_human', '未知')}") logger.info(f"连接数: {info.get('connected_clients', '未知')}") client.close() logger.success("Redis连接测试通过!") return True except Exception as e: error_msg = str(e) logger.error(f"Redis连接测试失败: {error_msg}") # 根据错误类型提供具体的解决方案 if "invalid username-password pair" in error_msg or "Authentication required" in error_msg: logger.error("🔐 密码验证失败,可能的原因:") logger.error("1. Redis密码错误或未设置密码") logger.error("2. 密码包含特殊字符需要URL编码") logger.error("3. Redis配置中requirepass设置不正确") logger.error("") logger.error("💡 解决方案:") logger.error("1. 检查Redis密码: sudo grep '^requirepass' /etc/redis/redis.conf") logger.error("2. 测试密码连接: redis-cli -a '你的密码' ping") logger.error("3. 如果密码包含特殊字符,在Django配置中使用URL编码:") logger.error(" 例如: 'xjjq1234!' 编码为 'xjjq1234%21'") logger.error(" CELERY_BROKER_URL = 'redis://:xjjq1234%21@localhost:6379/0'") logger.error("4. 临时禁用密码测试: 在redis.conf中注释掉requirepass行并重启") elif "Connection refused" in error_msg: logger.error("🔌 连接被拒绝,可能的原因:") logger.error("1. Redis服务未运行") logger.error("2. Redis绑定到其他IP地址") logger.error("3. 防火墙阻止连接") logger.error("") logger.error("💡 解决方案:") logger.error("1. 启动Redis: sudo systemctl start redis-server") logger.error("2. 检查绑定IP: sudo grep '^bind' /etc/redis/redis.conf") logger.error("3. 检查端口监听: sudo netstat -tlnp | grep 6379") else: logger.error("请检查:") logger.error("1. Redis服务状态: sudo systemctl status redis-server") logger.error("2. Redis配置文件: /etc/redis/redis.conf") logger.error("3. 防火墙设置: sudo ufw status") logger.error("4. Django settings.py中的CELERY_BROKER_URL配置") return False def test_celery_redis_integration(): """测试Celery与Redis集成""" logger.info("开始测试Celery与Redis集成...") try: # 初始化Django环境 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings') import django django.setup() from diary_family.celery import app # 测试Celery连接 logger.info("测试Celery连接...") try: result = app.control.ping(timeout=10) logger.info(f"Celery连接测试结果: {result}") except Exception as e: logger.warning(f"Celery连接测试失败(可能是worker未运行): {e}") logger.info("这可能是正常的,如果只测试Redis连接的话") # 测试任务队列 logger.info("测试任务队列...") try: # 获取worker状态 stats = app.control.inspect().stats() if stats: logger.info(f"找到 {len(stats)} 个worker") for worker, info in stats.items(): logger.info(f"Worker {worker}: {info.get('pool', {}).get('max-concurrency', '未知')} 并发") else: logger.warning("未找到运行的worker,请启动Celery worker") except Exception as e: logger.warning(f"获取worker状态失败: {e}") # 测试发送简单任务 logger.info("测试发送简单任务到Redis队列...") try: # 尝试导入debug_task,如果不存在则创建简单的测试任务 try: from core.tasks import debug_task test_task = debug_task logger.info("使用core.tasks中的debug_task") except ImportError: # 如果debug_task不存在,创建一个简单的测试任务 logger.warning("debug_task未找到,创建临时测试任务") from celery import shared_task @shared_task def temp_debug_task(): return {"status": "success", "message": "临时测试任务执行成功"} test_task = temp_debug_task # 发送调试任务 task_result = test_task.delay() logger.info(f"调试任务已发送,任务ID: {task_result.id}") # 等待任务完成(最多10秒) try: result = task_result.get(timeout=10) logger.info(f"调试任务执行结果: {result}") logger.success("任务成功执行并返回结果") except Exception as e: logger.warning(f"获取任务结果超时或失败: {e}") logger.info("这可能是因为:") logger.info("1. Celery worker未运行或未处理任务") logger.info("2. 任务执行时间超过10秒") logger.info("3. 任务执行出错") logger.info("但任务已成功发送到Redis队列,说明Celery-Redis连接正常") except Exception as e: logger.error(f"发送任务失败: {e}") logger.error("这可能表示:") logger.error("1. Celery配置错误") logger.error("2. Redis连接问题") logger.error("3. 任务序列化问题") return False logger.success("Celery与Redis集成测试通过!") return True except Exception as e: logger.error(f"Celery与Redis集成测试失败: {e}") return False def test_redis_performance(): """测试Redis性能""" logger.info("开始测试Redis性能...") try: import redis import time from urllib.parse import urlparse import urllib.parse # 从Django设置中获取Redis配置 from django.conf import settings redis_url = settings.CELERY_BROKER_URL # 使用urllib解析URL parsed = urlparse(redis_url) # 提取连接参数 host = parsed.hostname or 'localhost' port = parsed.port or 6379 db = 0 # 解析数据库编号 if parsed.path and len(parsed.path) > 1: try: db = int(parsed.path[1:]) # 移除开头的'/' except ValueError: db = 0 # 解析密码 password = None if parsed.password: password = urllib.parse.unquote(parsed.password) elif parsed.username: password = urllib.parse.unquote(parsed.username) client = redis.Redis( host=host, port=port, db=db, password=password, socket_connect_timeout=5, socket_timeout=5, decode_responses=True ) # 性能测试 test_count = 100 start_time = time.time() for i in range(test_count): key = f"perf_test_{i}" client.set(key, f"value_{i}", ex=60) set_time = time.time() - start_time logger.info(f"设置 {test_count} 个键耗时: {set_time:.3f}秒, 平均: {set_time/test_count*1000:.2f}毫秒/个") start_time = time.time() for i in range(test_count): key = f"perf_test_{i}" client.get(key) get_time = time.time() - start_time logger.info(f"获取 {test_count} 个键耗时: {get_time:.3f}秒, 平均: {get_time/test_count*1000:.2f}毫秒/个") # 清理测试数据 for i in range(test_count): key = f"perf_test_{i}" client.delete(key) client.close() # 性能评估 avg_set_time = set_time / test_count * 1000 # 毫秒 avg_get_time = get_time / test_count * 1000 # 毫秒 if avg_set_time < 1 and avg_get_time < 1: logger.success("Redis性能优秀!") elif avg_set_time < 5 and avg_get_time < 5: logger.info("Redis性能良好") elif avg_set_time < 10 and avg_get_time < 10: logger.warning("Redis性能一般,建议优化") else: logger.error("Redis性能较差,请检查服务器负载和Redis配置") return True except Exception as e: logger.error(f"Redis性能测试失败: {e}") return False def test_celery_redis_email(): """测试通过Celery和Redis发送邮件功能""" logger.info("开始测试通过Celery和Redis发送邮件...") try: from core.tasks import celery_send_test_email # 发送测试邮件任务 logger.info("发送Celery邮件测试任务到Redis队列...") task = celery_send_test_email.delay(test_mode=True) logger.info(f"邮件测试任务已发送,任务ID: {task.id}") # 等待任务完成 logger.info("等待任务执行(最多30秒)...") result = task.get(timeout=30) logger.info(f"邮件任务执行结果: {result}") # 检查结果状态 if result.get('status') == 'success': logger.success("通过Celery和Redis发送邮件测试成功!") return True else: logger.error(f"邮件发送失败: {result.get('error', '未知错误')}") return False except Exception as e: error_msg = str(e) logger.error(f"通过Celery和Redis发送邮件测试失败: {error_msg}") # 提供具体的解决方案 if "No such file or directory" in error_msg: logger.error("📁 文件路径错误,可能的原因:") logger.error("1. 配置文件路径不存在") logger.error("2. 权限问题导致无法访问文件") elif "Connection refused" in error_msg: logger.error("🔌 连接被拒绝,可能的原因:") logger.error("1. Celery worker未运行") logger.error("2. Redis服务未运行") logger.error("3. 防火墙阻止连接") else: logger.error("请检查:") logger.error("1. Celery worker是否运行: sudo supervisorctl status celery_worker") logger.error("2. Redis服务状态: sudo systemctl status redis-server") logger.error("3. 系统配置中的邮件设置是否正确") logger.error("4. SMTP服务器配置是否正确") return False def test_celery_redis_pdf_email(): """测试通过Celery和Redis发送包含PDF附件的邮件功能""" logger.info("开始测试通过Celery和Redis发送包含PDF附件的邮件...") try: from core.tasks import celery_send_pdf_report_email # 发送PDF报告邮件任务 logger.info("发送Celery PDF报告邮件测试任务到Redis队列...") task = celery_send_pdf_report_email.delay() logger.info(f"PDF报告邮件测试任务已发送,任务ID: {task.id}") # 等待任务完成 logger.info("等待任务执行(最多60秒)...") result = task.get(timeout=60) logger.info(f"PDF报告邮件任务执行结果: {result}") # 检查结果状态 if result.get('status') == 'success': logger.success("通过Celery和Redis发送包含PDF附件的邮件测试成功!") return True else: error_msg = result.get('error', '未知错误') logger.error(f"包含PDF附件的邮件发送失败: {error_msg}") # 提供具体的解决方案 if "WeasyPrint" in error_msg: logger.error("📄 WeasyPrint相关错误,可能的原因:") logger.error("1. WeasyPrint库未安装") logger.error("2. WeasyPrint依赖项缺失") logger.error("3. 权限问题导致无法生成PDF") logger.error("解决方案: 运行 'pip install weasyprint' 安装WeasyPrint库") return False except Exception as e: error_msg = str(e) logger.error(f"通过Celery和Redis发送包含PDF附件的邮件测试失败: {error_msg}") # 提供具体的解决方案 if "No such file or directory" in error_msg: logger.error("📁 文件路径错误,可能的原因:") logger.error("1. 报告目录不存在") logger.error("2. 权限问题导致无法访问目录") elif "Connection refused" in error_msg: logger.error("🔌 连接被拒绝,可能的原因:") logger.error("1. Celery worker未运行") logger.error("2. Redis服务未运行") logger.error("3. 防火墙阻止连接") else: logger.error("请检查:") logger.error("1. Celery worker是否运行: sudo supervisorctl status celery_worker") logger.error("2. Redis服务状态: sudo systemctl status redis-server") logger.error("3. 系统配置中的邮件设置是否正确") logger.error("4. SMTP服务器配置是否正确") logger.error("5. WeasyPrint库是否已正确安装") return False def check_logs_config(): """检查Gunicorn、Celery、Redis的日志是否写入同一个文件""" logger.info("开始检查日志配置...") try: from django.conf import settings # 检查Django日志配置 if hasattr(settings, 'LOGGING'): logging_config = settings.LOGGING logger.info("Django日志配置:") # 检查日志处理器 handlers = logging_config.get('handlers', {}) file_handlers = [name for name, handler in handlers.items() if handler.get('class') == 'logging.handlers.RotatingFileHandler'] if file_handlers: for handler_name in file_handlers: handler = handlers[handler_name] log_file = handler.get('filename', '未知') logger.info(f" 文件日志处理器 '{handler_name}' 写入到: {log_file}") # 检查日志器配置 loggers = logging_config.get('loggers', {}) logger_names = [] for logger_name, logger_config in loggers.items(): if handler_name in logger_config.get('handlers', []): logger_names.append(logger_name) if logger_names: logger.info(f" 使用该处理器的日志器: {', '.join(logger_names)}") # 检查是否包含celery日志 if 'celery' in logger_names: logger.info(" ✅ Celery日志将写入该文件") else: logger.warning(" ⚠️ Celery日志未配置使用该处理器") else: logger.warning(" 未找到文件日志处理器配置") else: logger.warning(" 未配置Django LOGGING设置") # 检查Celery日志配置 if hasattr(settings, 'CELERY_LOG_FILE'): celery_log_file = settings.CELERY_LOG_FILE logger.info(f"Celery工作进程日志配置: {celery_log_file}") else: logger.warning("未配置CELERY_LOG_FILE") # 检查Gunicorn日志配置(通常在Gunicorn配置文件中,这里检查环境变量或常见配置) logger.info("Gunicorn日志配置:") logger.info(" 注意: Gunicorn日志通常在其配置文件或启动命令中设置") logger.info(" 建议配置Gunicorn日志写入与Django/Celery相同的日志文件") # 检查Redis日志配置 logger.info("Redis日志配置:") logger.info(" 注意: Redis日志通常在/etc/redis/redis.conf中配置") logger.info(" 建议配置Redis日志写入与其他服务相同的日志目录") logger.info("\n日志配置检查完成!") logger.info("建议:") logger.info("1. 确保Django、Celery、Gunicorn、Redis的日志都写入同一个文件") logger.info("2. 配置日志轮转和保留策略,避免日志文件过大") logger.info("3. 定期检查日志文件,及时发现问题") return True except Exception as e: logger.error(f"日志配置检查失败: {e}") return False def main(): """主测试函数""" logger.info("=== Redis和Celery生产环境配置测试开始 ===") logger.info("测试环境: Ubuntu + Redis + Celery") logger.info("=" * 50) tests_passed = 0 total_tests = 6 # 测试1: Redis连接 logger.info("\n[测试1] Redis连接测试") if test_redis_connection(): tests_passed += 1 else: logger.error("Redis连接测试失败,后续测试可能也会失败") # 测试2: Celery与Redis集成 logger.info("\n[测试2] Celery与Redis集成测试") if test_celery_redis_integration(): tests_passed += 1 # 测试3: Redis性能 logger.info("\n[测试3] Redis性能测试") if test_redis_performance(): tests_passed += 1 # 测试4: 通过Celery和Redis发送邮件 logger.info("\n[测试4] 通过Celery和Redis发送邮件测试") if test_celery_redis_email(): tests_passed += 1 # 测试5: 通过Celery和Redis发送包含PDF附件的邮件 logger.info("\n[测试5] 通过Celery和Redis发送包含PDF附件的邮件测试") if test_celery_redis_pdf_email(): tests_passed += 1 # 测试6: 检查日志配置 logger.info("\n[测试6] 检查日志配置") if check_logs_config(): tests_passed += 1 # 测试总结 logger.info("\n" + "=" * 50) logger.info("测试总结:") logger.info(f"通过测试: {tests_passed}/{total_tests}") if tests_passed == total_tests: logger.success("所有测试通过!生产环境Redis和Celery配置正确。") logger.info("\n部署建议:") logger.info("1. ✅ Redis服务运行正常") logger.info("2. ✅ Celery可以连接到Redis") logger.info("3. ✅ Redis性能满足要求") logger.info("4. ✅ 可以通过Celery和Redis发送邮件") logger.info("5. ✅ 可以通过Celery和Redis发送包含PDF附件的邮件") logger.info("6. ✅ 日志配置检查完成") logger.info("7. 建议配置Redis持久化和备份") logger.info("8. 建议监控Redis内存使用情况") logger.info("9. 确保Gunicorn、Celery、Redis日志写入同一个文件") return 0 elif tests_passed >= 4: logger.warning("部分测试通过,生产环境基本可用。") logger.info("\n需要检查:") logger.info("1. 确保Redis服务正常运行") logger.info("2. 检查Celery worker配置") logger.info("3. 参考README中的故障排除指南") logger.info("4. 检查邮件配置是否正确") logger.info("5. 检查WeasyPrint库是否已正确安装") logger.info("6. 检查日志配置是否符合要求") return 1 else: logger.error("多数测试失败,生产环境可能无法正常工作。") logger.info("\n紧急处理:") logger.info("1. ❌ 检查Redis服务状态: sudo systemctl status redis-server") logger.info("2. ❌ 检查Redis配置: /etc/redis/redis.conf") logger.info("3. ❌ 检查Django settings.py中的Celery配置") logger.info("4. ❌ 检查Celery worker是否运行") logger.info("5. ❌ 检查系统配置中的邮件设置") logger.info("6. ❌ 检查WeasyPrint库是否已正确安装") logger.info("7. ❌ 参考README中的Redis部署章节重新配置") return 1 if __name__ == "__main__": import os from pathlib import Path # 配置日志 logger.remove() # 创建日志目录 log_dir = Path("/var/log/celery") log_dir.mkdir(parents=True, exist_ok=True) # 设置日志文件权限 log_dir.chmod(0o755) # 添加控制台输出 logger.add( sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", level="INFO" ) # 添加日志文件输出 log_file = log_dir / "test_redis_celery.log" logger.add( log_file, format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", level="INFO", rotation="1 day", retention="7 days", encoding="utf-8" ) logger.info(f"测试日志将同时输出到控制台和 {log_file}") logger.info("=" * 60) try: exit_code = main() sys.exit(exit_code) except KeyboardInterrupt: logger.warning("测试被用户中断") sys.exit(1) except Exception as e: logger.error(f"测试过程中发生未预期错误: {e}") sys.exit(1)