From 2921ab8ddadcf48121ee2903b15ce6e40d211379 Mon Sep 17 00:00:00 2001 From: xiaji Date: Sat, 17 Jan 2026 21:35:34 +0800 Subject: [PATCH] =?UTF-8?q?feat(tasks):=20=E6=B7=BB=E5=8A=A0=E8=B0=83?= =?UTF-8?q?=E8=AF=95=E4=BB=BB=E5=8A=A1=E7=94=A8=E4=BA=8E=E6=B5=8B=E8=AF=95?= =?UTF-8?q?Celery=E5=92=8CRedis=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit refactor(test_redis_celery): 使用urllib解析Redis URL并改进错误处理 --- core/tasks.py | 9 ++- test_redis_celery.py | 183 ++++++++++++++++++++++++++----------------- 2 files changed, 121 insertions(+), 71 deletions(-) diff --git a/core/tasks.py b/core/tasks.py index 69c470e..daad141 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -139,4 +139,11 @@ def send_daily_report(): return True except Exception as e: logger.error(f"邮件发送失败:{str(e)}") - return False \ No newline at end of file + return False + +@shared_task +def debug_task(): + """调试任务,用于测试Celery和Redis连接""" + from loguru import logger + logger.info("调试任务执行成功") + return {"status": "success", "message": "Celery任务执行正常", "timestamp": timezone.now().isoformat()} \ No newline at end of file diff --git a/test_redis_celery.py b/test_redis_celery.py index b3cb94a..0f33743 100644 --- a/test_redis_celery.py +++ b/test_redis_celery.py @@ -17,6 +17,7 @@ def test_redis_connection(): logger.info("开始测试Redis连接...") try: import redis + from urllib.parse import urlparse # 从Django设置中获取Redis配置 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings') @@ -25,43 +26,34 @@ def test_redis_connection(): from django.conf import settings - # 解析Redis URL + # 获取Redis配置URL redis_url = settings.CELERY_BROKER_URL logger.info(f"Redis配置URL: {redis_url}") + # 使用urllib解析URL(更健壮的方法) + parsed = urlparse(redis_url) + # 提取连接参数 - if redis_url.startswith('redis://'): - # 移除redis://前缀 - redis_url = redis_url[8:] - - # 检查是否有密码 - if '@' in redis_url: - # 格式: password@host:port/db - password_part, host_part = redis_url.split('@') - password = password_part - host_port_db = host_part - else: - # 格式: host:port/db - password = None - host_port_db = redis_url - - # 解析主机、端口和数据库 - if ':' in host_port_db: - host_port, db_str = host_port_db.split('/') - host, port = host_port.split(':') - db = int(db_str) if db_str else 0 - else: - host = 'localhost' - port = 6379 + host = parsed.hostname or 'localhost' + port = parsed.port or 6379 + db = 0 + + # 解析数据库编号 + if parsed.path and len(parsed.path) > 1: + try: + db = int(parsed.path[1:]) # 移除开头的'/' + except ValueError: db = 0 - - port = int(port) - else: - # 默认配置 - host = 'localhost' - port = 6379 - db = 0 - password = None + + # 解析密码(注意:密码可能在netloc中的username部分) + password = None + if parsed.password: + # URL解码密码(处理特殊字符) + import urllib.parse + password = urllib.parse.unquote(parsed.password) + elif parsed.username: + # 有些配置可能把密码放在username位置 + password = urllib.parse.unquote(parsed.username) logger.info(f"Redis连接参数: host={host}, port={port}, db={db}, password={'***' if password else 'None'}") @@ -72,7 +64,8 @@ def test_redis_connection(): db=db, password=password, socket_connect_timeout=5, - socket_timeout=5 + socket_timeout=5, + decode_responses=True ) # 测试ping @@ -99,12 +92,40 @@ def test_redis_connection(): return True except Exception as e: - logger.error(f"Redis连接测试失败: {e}") - logger.error("请检查:") - logger.error("1. Redis服务是否运行: sudo systemctl status redis-server") - logger.error("2. Redis配置是否正确: /etc/redis/redis.conf") - logger.error("3. 防火墙是否允许连接: sudo ufw status") - logger.error("4. Django settings.py中的CELERY_BROKER_URL配置") + error_msg = str(e) + logger.error(f"Redis连接测试失败: {error_msg}") + + # 根据错误类型提供具体的解决方案 + if "invalid username-password pair" in error_msg or "Authentication required" in error_msg: + logger.error("🔐 密码验证失败,可能的原因:") + logger.error("1. Redis密码错误或未设置密码") + logger.error("2. 密码包含特殊字符需要URL编码") + logger.error("3. Redis配置中requirepass设置不正确") + logger.error("") + logger.error("💡 解决方案:") + logger.error("1. 检查Redis密码: sudo grep '^requirepass' /etc/redis/redis.conf") + logger.error("2. 测试密码连接: redis-cli -a '你的密码' ping") + logger.error("3. 如果密码包含特殊字符,在Django配置中使用URL编码:") + logger.error(" 例如: 'xjjq1234!' 编码为 'xjjq1234%21'") + logger.error(" CELERY_BROKER_URL = 'redis://:xjjq1234%21@localhost:6379/0'") + logger.error("4. 临时禁用密码测试: 在redis.conf中注释掉requirepass行并重启") + elif "Connection refused" in error_msg: + logger.error("🔌 连接被拒绝,可能的原因:") + logger.error("1. Redis服务未运行") + logger.error("2. Redis绑定到其他IP地址") + logger.error("3. 防火墙阻止连接") + logger.error("") + logger.error("💡 解决方案:") + logger.error("1. 启动Redis: sudo systemctl start redis-server") + logger.error("2. 检查绑定IP: sudo grep '^bind' /etc/redis/redis.conf") + logger.error("3. 检查端口监听: sudo netstat -tlnp | grep 6379") + else: + logger.error("请检查:") + logger.error("1. Redis服务状态: sudo systemctl status redis-server") + logger.error("2. Redis配置文件: /etc/redis/redis.conf") + logger.error("3. 防火墙设置: sudo ufw status") + logger.error("4. Django settings.py中的CELERY_BROKER_URL配置") + return False def test_celery_redis_integration(): @@ -144,22 +165,45 @@ def test_celery_redis_integration(): # 测试发送简单任务 logger.info("测试发送简单任务到Redis队列...") try: - from core.tasks import debug_task + # 尝试导入debug_task,如果不存在则创建简单的测试任务 + try: + from core.tasks import debug_task + test_task = debug_task + logger.info("使用core.tasks中的debug_task") + except ImportError: + # 如果debug_task不存在,创建一个简单的测试任务 + logger.warning("debug_task未找到,创建临时测试任务") + from celery import shared_task + + @shared_task + def temp_debug_task(): + return {"status": "success", "message": "临时测试任务执行成功"} + + test_task = temp_debug_task # 发送调试任务 - task_result = debug_task.delay() + task_result = test_task.delay() logger.info(f"调试任务已发送,任务ID: {task_result.id}") - # 等待任务完成(最多5秒) + # 等待任务完成(最多10秒) try: - result = task_result.get(timeout=5) + result = task_result.get(timeout=10) logger.info(f"调试任务执行结果: {result}") + logger.success("任务成功执行并返回结果") except Exception as e: logger.warning(f"获取任务结果超时或失败: {e}") - logger.info("这可能是正常的,如果没有worker处理任务") + logger.info("这可能是因为:") + logger.info("1. Celery worker未运行或未处理任务") + logger.info("2. 任务执行时间超过10秒") + logger.info("3. 任务执行出错") + logger.info("但任务已成功发送到Redis队列,说明Celery-Redis连接正常") except Exception as e: logger.error(f"发送任务失败: {e}") + logger.error("这可能表示:") + logger.error("1. Celery配置错误") + logger.error("2. Redis连接问题") + logger.error("3. 任务序列化问题") return False logger.success("Celery与Redis集成测试通过!") @@ -175,37 +219,35 @@ def test_redis_performance(): try: import redis import time + from urllib.parse import urlparse + import urllib.parse # 从Django设置中获取Redis配置 from django.conf import settings redis_url = settings.CELERY_BROKER_URL - if redis_url.startswith('redis://'): - redis_url = redis_url[8:] - - if '@' in redis_url: - password_part, host_part = redis_url.split('@') - password = password_part - host_port_db = host_part - else: - password = None - host_port_db = redis_url - - if ':' in host_port_db: - host_port, db_str = host_port_db.split('/') - host, port = host_port.split(':') - db = int(db_str) if db_str else 0 - else: - host = 'localhost' - port = 6379 + + # 使用urllib解析URL + parsed = urlparse(redis_url) + + # 提取连接参数 + host = parsed.hostname or 'localhost' + port = parsed.port or 6379 + db = 0 + + # 解析数据库编号 + if parsed.path and len(parsed.path) > 1: + try: + db = int(parsed.path[1:]) # 移除开头的'/' + except ValueError: db = 0 - - port = int(port) - else: - host = 'localhost' - port = 6379 - db = 0 - password = None + + # 解析密码 + password = None + if parsed.password: + password = urllib.parse.unquote(parsed.password) + elif parsed.username: + password = urllib.parse.unquote(parsed.username) client = redis.Redis( host=host, @@ -213,7 +255,8 @@ def test_redis_performance(): db=db, password=password, socket_connect_timeout=5, - socket_timeout=5 + socket_timeout=5, + decode_responses=True ) # 性能测试