import os from celery import Celery from django.conf import settings from celery.schedules import crontab # 设置默认的Django设置模块 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'diary_family.settings') # 创建Celery应用实例 app = Celery('diary_family') # 从Django设置中读取配置 app.config_from_object('django.conf:settings', namespace='CELERY') # 自动发现任务 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) # Celery Beat 周期任务配置 app.conf.beat_schedule = { 'cleanup-expired-temp-files': { 'task': 'core.tasks.cleanup_expired_temp_files', 'schedule': crontab(minute=0), # 每小时整点执行 }, 'cleanup-expired-messages': { 'task': 'core.tasks.cleanup_expired_messages', 'schedule': crontab(minute='*/5'), # 每5分钟执行 }, } @app.task(bind=True) def debug_task(self): """调试任务""" print(f'Request: {self.request!r}')