develop
Celery Task高级用法
摘要
本文系统梳理了 Celery Task 的三类高级玩法:
- 超时控制——用
soft_time_limit/time_limit实现“先告警、后强杀”的两级兜底,兼顾优雅收尾与资源保护; - 速率限制——通过
rate_limit把“入队”与“消费”解耦,精准平滑任务流量,保护下游; - 定时调度——借助
Celery Beat以“秒级间隔 / Crontab / 一次性”三种策略提前编排周期任务,让 Worker 专注执行。
掌握这三板斧,即可在分布式场景中做到“该快的快、该慢的慢、该来的准时来”,既防失控又保弹性。
Task超时时间
在 Celery 中,可以为任务设置总执行超时时间,当任务运行超过指定时间后,会被强制终止并标记为失败。主要通过以下两个参数配置,适用于不同场景:
| 参数 | 作用说明 | 适用场景 |
|---|---|---|
| time_limit | 硬超时时间(秒)。超过此时间,任务会被强制终止(发送 SIGKILL 信号),无论是否在执行中。 | 需要严格限制任务执行时间,不允许超时运行。 |
| soft_time_limit | 软超时时间(秒)。超过此时间,会抛出 SoftTimeLimitExceeded 异常,允许任务捕获异常并执行清理逻辑(如释放资源),若未捕获则任务失败。 | 希望超时前有机会处理收尾工作(如关闭连接) |
1. 代码示例
- 任务定义
@app.task( bind=True, time_limit=10, # 硬超时:10秒后强制终止 soft_time_limit=8, # 软超时:8秒后抛出异常 name='timeout_task' ) def timeout_task(self, duration): try: print(f"任务开始,预计运行 {duration} 秒...") # 模拟耗时操作(如处理数据、网络请求等) time.sleep(duration) return f"任务成功完成,实际运行 {duration} 秒" except SoftTimeLimitExceeded: # 捕获软超时异常,执行清理逻辑 print("触发软超时!执行紧急清理...") # 例如:关闭数据库连接、保存中间结果等 time.sleep(4) print("紧急清理完成") raise # 重新抛出异常,标记任务为失败 finally: print("任务结束(无论成功/失败)") - 任务调用
# 触发超时任务 async_result = timeout_task.delay(15) print("超时任务ID:", async_result.id) # 输出任务ID try: task_result = async_result.get() # retry次数小于2次时,会抛出异常,否则会返回成功 print(f"超时任务执行成功: {task_result}") except Exception as e: print(f"超时任务执行失败: {str(e)}")
2. 控制台输出结果
任务控制台输出
超时任务ID: 000bc8eb-ec18-4cf3-a842-63df86ceb87c 超时任务执行失败: TimeLimitExceeded(10,)Celery worker输出
[14:59:32,167: INFO/MainProcess] Task timeout_task[000bc8eb-ec18-4cf3-a842-63df86ceb87c] received [14:59:32,169: WARNING/ForkPoolWorker-59] 任务开始,预计运行 15 秒... [14:59:40,173: WARNING/MainProcess] Soft time limit (8s) exceeded for timeout_task[000bc8eb-ec18-4cf3-a842-63df86ceb87c] [14:59:40,174: WARNING/ForkPoolWorker-59] 触发软超时!执行紧急清理... [14:59:42,181: ERROR/MainProcess] Task handler raised error: TimeLimitExceeded(10) Traceback (most recent call last): File "/root/.pyenv/versions/3.11.9/envs/ssytest/lib/python3.11/site-packages/billiard/pool.py", line 684, in on_hard_timeout raise TimeLimitExceeded(job._timeout) billiard.einfo.ExceptionWithTraceback: """ Traceback (most recent call last): File "/root/.pyenv/versions/3.11.9/envs/ssytest/lib/python3.11/site-packages/billiard/pool.py", line 684, in on_hard_timeout raise TimeLimitExceeded(job._timeout) billiard.exceptions.TimeLimitExceeded: TimeLimitExceeded(10,) """ [14:59:42,182: ERROR/MainProcess] Hard time limit (10s) exceeded for timeout_task[000bc8eb-ec18-4cf3-a842-63df86ceb87c] [14:59:42,183: WARNING/ForkPoolWorker-59] 任务结束(无论成功/失败) [14:59:42,288: ERROR/MainProcess] Process 'ForkPoolWorker-59' pid:3916642 exited with 'signal 9 (SIGKILL)'
3. 输出结果解读
- 通过分析任务控制台输出和 Celery worker 日志,可以发现以下超时行为:
- ID 为
000bc8eb-ec18-4cf3-a842-63df86ceb87c的任务遇到了超时问题。 - 在第 8 秒时触发了软超时,超过了预先设置的 8 秒限制。
- 在第 8 秒时抛出了
SoftTimeLimitExceeded异常,将任务标记为失败。 - 在第 10 秒时触发了硬超时,超过了预先设置的 10 秒限制。
- 在第 10 秒时发送了 SIGKILL 信号,强制终止任务进程。
- ID 为
4. 总结
- SoftTimeLimitExceeded 异常是一个软超时异常,任务可以捕获并处理该异常,执行清理逻辑。
- HardTimeLimitExceeded 异常是一个硬超时异常,任务不能捕获该异常,会被强制终止。
- 要预留足够的时间给任务完成清理逻辑,避免硬超时导致任务失败(比如例子所示)。
Task速率限制
rate_limit 用于限制任务在单位时间内的最大执行次数,本质是通过控制 Worker 处理任务的频率,避免 “任务洪峰” 对系统或依赖服务造成压力。例如: 调用第三方 API 时,对方限制每秒最多 10 次请求 → 可设置 rate_limit=”10/s”; 生成报表的任务很耗 CPU,希望每分钟最多执行 5 次 → 可设置 rate_limit=”5/m”。
语法规则:
- 单位时间:秒(s)、分钟(m)、小时(h)
- 格式:
{数量}/{单位},如10/s表示 10 个任务/秒
原理: rate_limit 不限制任务入队(即 delay() 或 apply_async() 会正常将任务放入队列),仅限制 Worker 从队列中取出任务执行的频率。若队列中积累了大量任务,Worker 会按 rate_limit 逐步消费,避免瞬间执行过多。
1. 代码示例
全局级别,所有任务生效:在 Celery 配置文件(如
celeryconfig.py)中添加以下配置:# 配置任务速率限制为 10 个任务/秒 app = Celery('basic_tasks') app.conf.task_rate_limit = '10/s'任务级别:只在当前任务生效,定义中添加
rate_limit参数:# 定义任务时设置默认速率:每秒最多 1 次 @app.task(rate_limit="1/s", name="rate_limit_task") def rate_limit_task(): """速率限制任务""" print(f"任务开始时间: {time.time()}") return f"速率限制任务执行成功: {time.time()}"任务调用
for i in range(3): async_result = rate_limit_task.delay() print(f"第{i+1}次速率限制任务ID:", async_result.id) # 输出任务ID task_result = async_result.get() # 每秒最多执行1次 print(f"第{i+1}次速率限制任务执行成功: {task_result}")
2. 控制台输出结果
任务控制台输出
第1次速率限制任务ID: 68b79c80-477f-466f-933a-4a655e838dca 第1次速率限制任务执行成功: 速率限制任务执行成功: 1760949254.7125618 第2次速率限制任务ID: 6e8c58c9-2bec-4203-8cda-382080a6f2ce 第2次速率限制任务执行成功: 速率限制任务执行成功: 1760949255.713834 第3次速率限制任务ID: 0c76b08a-4b54-4fb6-b192-5c148739ee97 第3次速率限制任务执行成功: 速率限制任务执行成功: 1760949256.7162948Celery worker输出:
[16:34:14,710: INFO/MainProcess] Task rate_limit_task[68b79c80-477f-466f-933a-4a655e838dca] received [16:34:14,712: WARNING/ForkPoolWorker-65] 任务开始时间: 1760949254.7123966 [16:34:14,713: INFO/ForkPoolWorker-65] Task rate_limit_task[68b79c80-477f-466f-933a-4a655e838dca] succeeded in 0.0011626780033111572s: '速率限制任务执行成功: 1760949254.7125618' [16:34:14,719: INFO/MainProcess] Task rate_limit_task[6e8c58c9-2bec-4203-8cda-382080a6f2ce] received [16:34:15,713: WARNING/ForkPoolWorker-65] 任务开始时间: 1760949255.7136571 [16:34:15,714: INFO/ForkPoolWorker-65] Task rate_limit_task[6e8c58c9-2bec-4203-8cda-382080a6f2ce] succeeded in 0.0012042250018566847s: '速率限制任务执行成功: 1760949255.713834' [16:34:15,720: INFO/MainProcess] Task rate_limit_task[0c76b08a-4b54-4fb6-b192-5c148739ee97] received [16:34:16,716: WARNING/ForkPoolWorker-65] 任务开始时间: 1760949256.71612 [16:34:16,717: INFO/ForkPoolWorker-65] Task rate_limit_task[0c76b08a-4b54-4fb6-b192-5c148739ee97] succeeded in 0.0012061879970133305s: '速率限制任务执行成功: 1760949256.7162948'
3. 输出结果解读
- 任务控制台输出:每个任务的 ID 和执行成功的时间戳,按顺序展示。
- Celery worker 输出:每个任务的 ID、开始时间、执行时间和成功/失败状态,按顺序展示。
4. 总结
- 从结果可知,任务执行时间间隔符合速率限制配置,即每秒最多 1 个任务。
- 通过worker的日志可知,worker是每秒收到一个任务,任务的限速是Broker来实现的。
定时任务
Celery Beat 是 Celery 内置的定时任务调度器,用于周期性地生成任务并将其发送到消息队列(由 Worker 消费执行)。它解决了 “如何按固定时间间隔或特定时间点自动触发任务” 的问题。
Celery Beat 本身不执行任务,而是作为调度中心:
- 按预设的scheduler时间规则生成任务,支持3种格式:
- 固定间隔(float/int):直接指定秒数,表示 “每隔 N 秒执行一次”,适合简单的周期性任务。
Cron 表达式(crontab 对象):通过 celery.schedules.crontab 实现类 Unix cron 的时间规则,支持按 “分、时、日、月、周” 精确配置,适合复杂时间场景【如:每年 12 月的最后一天 23:59 执行crontab(hour=23, minute=59, month_of_year=12, day_of_month=”last”))】,crontab 的常用参数如下:
参数名 含义 取值范围 / 示例 minute 分钟 0-59(如 30 表示 30 分;*/10 表示每 10 分) hour 小时 0-23(如 2 表示 2 点;8-18 表示 8-18 点) day_of_month 每月的哪几天 1-31(如 1 表示 1 号;last 表示最后一天) month_of_year 每年的哪几个月 1-12 或 Jan-Dec(如 1,3 表示 1 月和 3 月) day_of_week 每周的哪几天 0-6 或 Sun-Sat(0 = 周日,1 = 周一;1-5 表示工作日) - datetime/timedelta:
- datetime:指定一个精确的 UTC 时间(仅执行一次);
- timedelta:从当前时间开始,延迟一段时间后执行(仅执行一次)
- 将生成的任务发送到消息代理(Broker,如 Redis、RabbitMQ);
- 由 Celery Worker 从消息代理中获取并执行任务。
Beat原生只支持提前配置静态定时任务,而不支持动态定时任务的管理。
1. 代码示例
在 Celery 配置文件中添加定时任务的触发规则:
# 配置 Celery Beat 任务调度器 app = Celery('basic_tasks') app.conf.beat_schedule = { 'cron_task': { 'task': 'cron_task', # 任务函数路径 'schedule': 10.0, # 每 10 秒执行一次 'args': (), # 任务参数 }, }任务定义
@app.task(name="cron_task") def cron_task(): """定时任务""" print(f"定时任务执行时间: {time.time()}") return f"定时任务执行成功: {time.time()}"启动Beat
celery -A basic_tasks beat --loglevel=info
2. 控制台输出结果
Beat 控制台输出
celery beat v5.5.1 (immunity) is starting. __ - ... __ - _ LocalTime -> 10:09:44 Configuration -> . broker -> redis://localhost:26379/4 . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule . logfile -> [stderr]@%INFO . maxinterval -> 5.00 minutes (300s) [10:09:44,691: INFO/MainProcess] beat: Starting... [10:09:54,726: INFO/MainProcess] Scheduler: Sending due task cron_task (cron_task) [10:10:04,723: INFO/MainProcess] Scheduler: Sending due task cron_task (cron_task)Worker控制台输出
[10:09:56,751: INFO/MainProcess] Task cron_task[7b4ca853-c3d9-4053-baaf-945450eb28d6] received [10:09:56,754: WARNING/ForkPoolWorker-59] 定时任务执行时间: 1761012596.7545104 [10:09:56,758: INFO/ForkPoolWorker-59] Task cron_task[7b4ca853-c3d9-4053-baaf-945450eb28d6] succeeded in 0.004147124942392111s: '定时任务执行成功: 1761012596.7548997' [10:10:04,725: INFO/MainProcess] Task cron_task[5f07fd80-cd41-4170-8f9f-7691652b24fc] received [10:10:04,728: WARNING/ForkPoolWorker-59] 定时任务执行时间: 1761012604.7281864 [10:10:04,729: INFO/ForkPoolWorker-59] Task cron_task[5f07fd80-cd41-4170-8f9f-7691652b24fc] succeeded in 0.0012831370113417506s: '定时任务执行成功: 1761012604.728391'
3. 输出结果解读
- Beat 控制台输出了相关信息,需要注意的是:
- scheduler使用的是默认的PersistentScheduler。
- Beat启动后,任务 cron_task 每 10 秒被调度一次,添加到Broker的队列中。
- Worker 控制台输出:
- 展示了 Celery Worker 接收和执行定时任务的详细信息,包括任务 ID、开始时间、执行时间和成功/失败状态。
- 从时间点也可以对得上,worker每隔10s收到一次任务 cron_task,然后执行成功,打印信息并返回结果。
4. 总结
- 定时任务需要提前配置在Celery App中,Beat启动后,才能按照规则调度。
- Beat 仅负责 发送任务,不处理任务执行失败。异常处理的内容,在之前的文章中已经介绍过,这里不再赘述。
DEVELOP · CELERY
develop python3 celery
