最近遇到一个case, supervisor控制的celery worker异常了,没有自动拉起。
但是celery beat 还是存活状态的,这就导致它一直往redis消息队列里面塞task,导致待执行的task量很大(大部分都是MySQL的巡检任务),当启动celery worker的时候,大量的task开始执行,导致目标数据源的负载很高。
这种情况下,建议配置celery的expires任务过期机制,具体如下:
编辑 settings.py 文件
# 全局默认过期时间(针对未执行任务)
# 消息过期设置
CELERY_TASK_DEFAULT_EXPIRES = 300 # 5分钟后过期(单位:秒,建议 30~300 秒,根据业务调整)
# 启用过期任务的自动拒绝(增强可靠性)
CELERY_TASK_REJECT_ON_WORKER_LOST = True
# 其它参数
CELERY_TASK_TRACK_STARTED = True # 跟踪任务开始
CELERY_TASK_IGNORE_RESULT = True # 如果不关心结果,设为True提高性能
# 监控
CELERY_WORKER_SEND_TASK_EVENTS = True
CELERY_TASK_SEND_SENT_EVENT = True
# 结果设置
CELERY_RESULT_EXPIRES = 86400 # 24小时过期
CELERY_RESULT_EXTENDED = True # 扩展结果信息
CELERY_TASK_STORE_ERRORS_EVEN_IF_IGNORED = True # 即使忽略结果也存储错误
# 任务限流
CELERY_TASK_ANNOTATIONS = {"tasks.add": {"rate_limit": "10/s"}}
# Worker 设置
CELERY_WORKER_CONCURRENCY = 4 # 根据 CPU 核心数调整,但是会被命令行启动参数 --autoscale=3,10 覆盖掉
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # 每次只取 1 个任务
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100 # 每个 worker 子进程执行100个任务后重启
CELERY_WORKER_MAX_MEMORY_PER_CHILD = 200000 # 200MB 内存限制
# Beat 调度器
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
CELERY_BEAT_MAX_LOOP_INTERVAL = 300 # Beat 最大循环间隔
# 队列和路由
CELERY_TASK_DEFAULT_QUEUE = "default"
CELERY_TASK_QUEUES = {
"default": {
"exchange": "default",
"exchange_type": "direct",
"routing_key": "default",
},
"high_priority": {
"exchange": "high_priority",
"exchange_type": "direct",
"routing_key": "high_priority",
},
"low_priority": {
"exchange": "low_priority",
"exchange_type": "direct",
"routing_key": "low_priority",
},
}
CELERY_TASK_ROUTES = {
"*.critical": {"queue": "high_priority"},
"*.background": {"queue": "low_priority"},
}
# CELERY Backend
CELERY_RESULT_BACKEND = "django-db"
# Broker 连接设置
CELERY_BROKER_URL = configs.CELERY_BROKER_URL
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
CELERY_BROKER_CONNECTION_MAX_RETRIES = 10
CELERY_BROKER_CONNECTION_TIMEOUT = 30
CELERY_BROKER_HEARTBEAT = 120 # 心跳间隔
CELERY_BROKER_POOL_LIMIT = 10 # 连接池限制
# 时区
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = False
# 序列化
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"不要只依赖全局 CELERY_TASK_DEFAULT_EXPIRES,而是对关键任务显式设置:
# tasks.py
@app.task(expires=30) # 30秒过期
def send_sms():
...
@app.task(expires=300) # 5分钟过期
def generate_report():
...
@app.task # 使用全局默认值
def log_event():
...
@shared_task(expires=5) # 设置5秒过期
def add(x, y):
return x + y经此设置后,重启下django、celery相关的进程即可生效了。
然后开始演示:
1、在django admin添加一个add的任务(我这里设置为每隔15秒就触发一次)
2、然后停掉celery worker进程
3、等5分钟后,再把celery worker进程启动起来
4、查看flower界面,可以看到早期的任务已经是revoked状态了(这些任务都不会被执行了),新触发的任务的状态都是success的。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 [email protected] 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 [email protected] 删除。