feat(tasks): 添加调试任务用于测试Celery和Redis连接
refactor(test_redis_celery): 使用urllib解析Redis URL并改进错误处理
This commit is contained in:
@@ -140,3 +140,10 @@ def send_daily_report():
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"邮件发送失败:{str(e)}")
|
logger.error(f"邮件发送失败:{str(e)}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@shared_task
|
||||||
|
def debug_task():
|
||||||
|
"""调试任务,用于测试Celery和Redis连接"""
|
||||||
|
from loguru import logger
|
||||||
|
logger.info("调试任务执行成功")
|
||||||
|
return {"status": "success", "message": "Celery任务执行正常", "timestamp": timezone.now().isoformat()}
|
||||||
@@ -17,6 +17,7 @@ def test_redis_connection():
|
|||||||
logger.info("开始测试Redis连接...")
|
logger.info("开始测试Redis连接...")
|
||||||
try:
|
try:
|
||||||
import redis
|
import redis
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
# 从Django设置中获取Redis配置
|
# 从Django设置中获取Redis配置
|
||||||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings')
|
||||||
@@ -25,43 +26,34 @@ def test_redis_connection():
|
|||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
|
||||||
# 解析Redis URL
|
# 获取Redis配置URL
|
||||||
redis_url = settings.CELERY_BROKER_URL
|
redis_url = settings.CELERY_BROKER_URL
|
||||||
logger.info(f"Redis配置URL: {redis_url}")
|
logger.info(f"Redis配置URL: {redis_url}")
|
||||||
|
|
||||||
|
# 使用urllib解析URL(更健壮的方法)
|
||||||
|
parsed = urlparse(redis_url)
|
||||||
|
|
||||||
# 提取连接参数
|
# 提取连接参数
|
||||||
if redis_url.startswith('redis://'):
|
host = parsed.hostname or 'localhost'
|
||||||
# 移除redis://前缀
|
port = parsed.port or 6379
|
||||||
redis_url = redis_url[8:]
|
db = 0
|
||||||
|
|
||||||
# 检查是否有密码
|
# 解析数据库编号
|
||||||
if '@' in redis_url:
|
if parsed.path and len(parsed.path) > 1:
|
||||||
# 格式: password@host:port/db
|
try:
|
||||||
password_part, host_part = redis_url.split('@')
|
db = int(parsed.path[1:]) # 移除开头的'/'
|
||||||
password = password_part
|
except ValueError:
|
||||||
host_port_db = host_part
|
|
||||||
else:
|
|
||||||
# 格式: host:port/db
|
|
||||||
password = None
|
|
||||||
host_port_db = redis_url
|
|
||||||
|
|
||||||
# 解析主机、端口和数据库
|
|
||||||
if ':' in host_port_db:
|
|
||||||
host_port, db_str = host_port_db.split('/')
|
|
||||||
host, port = host_port.split(':')
|
|
||||||
db = int(db_str) if db_str else 0
|
|
||||||
else:
|
|
||||||
host = 'localhost'
|
|
||||||
port = 6379
|
|
||||||
db = 0
|
db = 0
|
||||||
|
|
||||||
port = int(port)
|
# 解析密码(注意:密码可能在netloc中的username部分)
|
||||||
else:
|
password = None
|
||||||
# 默认配置
|
if parsed.password:
|
||||||
host = 'localhost'
|
# URL解码密码(处理特殊字符)
|
||||||
port = 6379
|
import urllib.parse
|
||||||
db = 0
|
password = urllib.parse.unquote(parsed.password)
|
||||||
password = None
|
elif parsed.username:
|
||||||
|
# 有些配置可能把密码放在username位置
|
||||||
|
password = urllib.parse.unquote(parsed.username)
|
||||||
|
|
||||||
logger.info(f"Redis连接参数: host={host}, port={port}, db={db}, password={'***' if password else 'None'}")
|
logger.info(f"Redis连接参数: host={host}, port={port}, db={db}, password={'***' if password else 'None'}")
|
||||||
|
|
||||||
@@ -72,7 +64,8 @@ def test_redis_connection():
|
|||||||
db=db,
|
db=db,
|
||||||
password=password,
|
password=password,
|
||||||
socket_connect_timeout=5,
|
socket_connect_timeout=5,
|
||||||
socket_timeout=5
|
socket_timeout=5,
|
||||||
|
decode_responses=True
|
||||||
)
|
)
|
||||||
|
|
||||||
# 测试ping
|
# 测试ping
|
||||||
@@ -99,12 +92,40 @@ def test_redis_connection():
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Redis连接测试失败: {e}")
|
error_msg = str(e)
|
||||||
logger.error("请检查:")
|
logger.error(f"Redis连接测试失败: {error_msg}")
|
||||||
logger.error("1. Redis服务是否运行: sudo systemctl status redis-server")
|
|
||||||
logger.error("2. Redis配置是否正确: /etc/redis/redis.conf")
|
# 根据错误类型提供具体的解决方案
|
||||||
logger.error("3. 防火墙是否允许连接: sudo ufw status")
|
if "invalid username-password pair" in error_msg or "Authentication required" in error_msg:
|
||||||
logger.error("4. Django settings.py中的CELERY_BROKER_URL配置")
|
logger.error("🔐 密码验证失败,可能的原因:")
|
||||||
|
logger.error("1. Redis密码错误或未设置密码")
|
||||||
|
logger.error("2. 密码包含特殊字符需要URL编码")
|
||||||
|
logger.error("3. Redis配置中requirepass设置不正确")
|
||||||
|
logger.error("")
|
||||||
|
logger.error("💡 解决方案:")
|
||||||
|
logger.error("1. 检查Redis密码: sudo grep '^requirepass' /etc/redis/redis.conf")
|
||||||
|
logger.error("2. 测试密码连接: redis-cli -a '你的密码' ping")
|
||||||
|
logger.error("3. 如果密码包含特殊字符,在Django配置中使用URL编码:")
|
||||||
|
logger.error(" 例如: 'xjjq1234!' 编码为 'xjjq1234%21'")
|
||||||
|
logger.error(" CELERY_BROKER_URL = 'redis://:xjjq1234%21@localhost:6379/0'")
|
||||||
|
logger.error("4. 临时禁用密码测试: 在redis.conf中注释掉requirepass行并重启")
|
||||||
|
elif "Connection refused" in error_msg:
|
||||||
|
logger.error("🔌 连接被拒绝,可能的原因:")
|
||||||
|
logger.error("1. Redis服务未运行")
|
||||||
|
logger.error("2. Redis绑定到其他IP地址")
|
||||||
|
logger.error("3. 防火墙阻止连接")
|
||||||
|
logger.error("")
|
||||||
|
logger.error("💡 解决方案:")
|
||||||
|
logger.error("1. 启动Redis: sudo systemctl start redis-server")
|
||||||
|
logger.error("2. 检查绑定IP: sudo grep '^bind' /etc/redis/redis.conf")
|
||||||
|
logger.error("3. 检查端口监听: sudo netstat -tlnp | grep 6379")
|
||||||
|
else:
|
||||||
|
logger.error("请检查:")
|
||||||
|
logger.error("1. Redis服务状态: sudo systemctl status redis-server")
|
||||||
|
logger.error("2. Redis配置文件: /etc/redis/redis.conf")
|
||||||
|
logger.error("3. 防火墙设置: sudo ufw status")
|
||||||
|
logger.error("4. Django settings.py中的CELERY_BROKER_URL配置")
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def test_celery_redis_integration():
|
def test_celery_redis_integration():
|
||||||
@@ -144,22 +165,45 @@ def test_celery_redis_integration():
|
|||||||
# 测试发送简单任务
|
# 测试发送简单任务
|
||||||
logger.info("测试发送简单任务到Redis队列...")
|
logger.info("测试发送简单任务到Redis队列...")
|
||||||
try:
|
try:
|
||||||
from core.tasks import debug_task
|
# 尝试导入debug_task,如果不存在则创建简单的测试任务
|
||||||
|
try:
|
||||||
|
from core.tasks import debug_task
|
||||||
|
test_task = debug_task
|
||||||
|
logger.info("使用core.tasks中的debug_task")
|
||||||
|
except ImportError:
|
||||||
|
# 如果debug_task不存在,创建一个简单的测试任务
|
||||||
|
logger.warning("debug_task未找到,创建临时测试任务")
|
||||||
|
from celery import shared_task
|
||||||
|
|
||||||
|
@shared_task
|
||||||
|
def temp_debug_task():
|
||||||
|
return {"status": "success", "message": "临时测试任务执行成功"}
|
||||||
|
|
||||||
|
test_task = temp_debug_task
|
||||||
|
|
||||||
# 发送调试任务
|
# 发送调试任务
|
||||||
task_result = debug_task.delay()
|
task_result = test_task.delay()
|
||||||
logger.info(f"调试任务已发送,任务ID: {task_result.id}")
|
logger.info(f"调试任务已发送,任务ID: {task_result.id}")
|
||||||
|
|
||||||
# 等待任务完成(最多5秒)
|
# 等待任务完成(最多10秒)
|
||||||
try:
|
try:
|
||||||
result = task_result.get(timeout=5)
|
result = task_result.get(timeout=10)
|
||||||
logger.info(f"调试任务执行结果: {result}")
|
logger.info(f"调试任务执行结果: {result}")
|
||||||
|
logger.success("任务成功执行并返回结果")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"获取任务结果超时或失败: {e}")
|
logger.warning(f"获取任务结果超时或失败: {e}")
|
||||||
logger.info("这可能是正常的,如果没有worker处理任务")
|
logger.info("这可能是因为:")
|
||||||
|
logger.info("1. Celery worker未运行或未处理任务")
|
||||||
|
logger.info("2. 任务执行时间超过10秒")
|
||||||
|
logger.info("3. 任务执行出错")
|
||||||
|
logger.info("但任务已成功发送到Redis队列,说明Celery-Redis连接正常")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"发送任务失败: {e}")
|
logger.error(f"发送任务失败: {e}")
|
||||||
|
logger.error("这可能表示:")
|
||||||
|
logger.error("1. Celery配置错误")
|
||||||
|
logger.error("2. Redis连接问题")
|
||||||
|
logger.error("3. 任务序列化问题")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
logger.success("Celery与Redis集成测试通过!")
|
logger.success("Celery与Redis集成测试通过!")
|
||||||
@@ -175,37 +219,35 @@ def test_redis_performance():
|
|||||||
try:
|
try:
|
||||||
import redis
|
import redis
|
||||||
import time
|
import time
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
import urllib.parse
|
||||||
|
|
||||||
# 从Django设置中获取Redis配置
|
# 从Django设置中获取Redis配置
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
|
||||||
redis_url = settings.CELERY_BROKER_URL
|
redis_url = settings.CELERY_BROKER_URL
|
||||||
if redis_url.startswith('redis://'):
|
|
||||||
redis_url = redis_url[8:]
|
|
||||||
|
|
||||||
if '@' in redis_url:
|
# 使用urllib解析URL
|
||||||
password_part, host_part = redis_url.split('@')
|
parsed = urlparse(redis_url)
|
||||||
password = password_part
|
|
||||||
host_port_db = host_part
|
|
||||||
else:
|
|
||||||
password = None
|
|
||||||
host_port_db = redis_url
|
|
||||||
|
|
||||||
if ':' in host_port_db:
|
# 提取连接参数
|
||||||
host_port, db_str = host_port_db.split('/')
|
host = parsed.hostname or 'localhost'
|
||||||
host, port = host_port.split(':')
|
port = parsed.port or 6379
|
||||||
db = int(db_str) if db_str else 0
|
db = 0
|
||||||
else:
|
|
||||||
host = 'localhost'
|
# 解析数据库编号
|
||||||
port = 6379
|
if parsed.path and len(parsed.path) > 1:
|
||||||
|
try:
|
||||||
|
db = int(parsed.path[1:]) # 移除开头的'/'
|
||||||
|
except ValueError:
|
||||||
db = 0
|
db = 0
|
||||||
|
|
||||||
port = int(port)
|
# 解析密码
|
||||||
else:
|
password = None
|
||||||
host = 'localhost'
|
if parsed.password:
|
||||||
port = 6379
|
password = urllib.parse.unquote(parsed.password)
|
||||||
db = 0
|
elif parsed.username:
|
||||||
password = None
|
password = urllib.parse.unquote(parsed.username)
|
||||||
|
|
||||||
client = redis.Redis(
|
client = redis.Redis(
|
||||||
host=host,
|
host=host,
|
||||||
@@ -213,7 +255,8 @@ def test_redis_performance():
|
|||||||
db=db,
|
db=db,
|
||||||
password=password,
|
password=password,
|
||||||
socket_connect_timeout=5,
|
socket_connect_timeout=5,
|
||||||
socket_timeout=5
|
socket_timeout=5,
|
||||||
|
decode_responses=True
|
||||||
)
|
)
|
||||||
|
|
||||||
# 性能测试
|
# 性能测试
|
||||||
|
|||||||
Reference in New Issue
Block a user