Personal Blog

develop

Celery Task高级用法

介绍Celery Task的高级用法

摘要

本文系统梳理了 Celery Task 的三类高级玩法:

  1. 超时控制——用 soft_time_limit / time_limit 实现“先告警、后强杀”的两级兜底,兼顾优雅收尾与资源保护;
  2. 速率限制——通过 rate_limit 把“入队”与“消费”解耦,精准平滑任务流量,保护下游;
  3. 定时调度——借助 Celery Beat 以“秒级间隔 / Crontab / 一次性”三种策略提前编排周期任务,让 Worker 专注执行。

掌握这三板斧,即可在分布式场景中做到“该快的快、该慢的慢、该来的准时来”,既防失控又保弹性。

Task超时时间

在 Celery 中,可以为任务设置总执行超时时间,当任务运行超过指定时间后,会被强制终止并标记为失败。主要通过以下两个参数配置,适用于不同场景:

参数作用说明适用场景
time_limit硬超时时间(秒)。超过此时间,任务会被强制终止(发送 SIGKILL 信号),无论是否在执行中。需要严格限制任务执行时间,不允许超时运行。
soft_time_limit软超时时间(秒)。超过此时间,会抛出 SoftTimeLimitExceeded 异常,允许任务捕获异常并执行清理逻辑(如释放资源),若未捕获则任务失败。希望超时前有机会处理收尾工作(如关闭连接)

1. 代码示例

  1. 任务定义
     @app.task(
         bind=True, 
         time_limit=10,         # 硬超时:10秒后强制终止
         soft_time_limit=8,     # 软超时:8秒后抛出异常
         name='timeout_task'
     )
     def timeout_task(self, duration):
         try:
             print(f"任务开始,预计运行 {duration} 秒...")
             # 模拟耗时操作(如处理数据、网络请求等)
             time.sleep(duration)
             return f"任务成功完成,实际运行 {duration}"
            
         except SoftTimeLimitExceeded:
             # 捕获软超时异常,执行清理逻辑
             print("触发软超时!执行紧急清理...")
             # 例如:关闭数据库连接、保存中间结果等
             time.sleep(4)
             print("紧急清理完成")
             raise  # 重新抛出异常,标记任务为失败
            
         finally:
             print("任务结束(无论成功/失败)")
    
  2. 任务调用
     # 触发超时任务
     async_result = timeout_task.delay(15)
     print("超时任务ID:", async_result.id)  # 输出任务ID
     try:
         task_result = async_result.get() # retry次数小于2次时,会抛出异常,否则会返回成功
         print(f"超时任务执行成功: {task_result}")
     except Exception as e:
         print(f"超时任务执行失败: {str(e)}")
    

2. 控制台输出结果

  1. 任务控制台输出

     超时任务ID: 000bc8eb-ec18-4cf3-a842-63df86ceb87c
     超时任务执行失败: TimeLimitExceeded(10,)
    
  2. Celery worker输出

     [14:59:32,167: INFO/MainProcess] Task timeout_task[000bc8eb-ec18-4cf3-a842-63df86ceb87c] received
     [14:59:32,169: WARNING/ForkPoolWorker-59] 任务开始,预计运行 15 秒...
     [14:59:40,173: WARNING/MainProcess] Soft time limit (8s) exceeded for timeout_task[000bc8eb-ec18-4cf3-a842-63df86ceb87c]
     [14:59:40,174: WARNING/ForkPoolWorker-59] 触发软超时!执行紧急清理...
     [14:59:42,181: ERROR/MainProcess] Task handler raised error: TimeLimitExceeded(10)
     Traceback (most recent call last):
     File "/root/.pyenv/versions/3.11.9/envs/ssytest/lib/python3.11/site-packages/billiard/pool.py", line 684, in on_hard_timeout
         raise TimeLimitExceeded(job._timeout)
     billiard.einfo.ExceptionWithTraceback: 
     """
     Traceback (most recent call last):
     File "/root/.pyenv/versions/3.11.9/envs/ssytest/lib/python3.11/site-packages/billiard/pool.py", line 684, in on_hard_timeout
         raise TimeLimitExceeded(job._timeout)
     billiard.exceptions.TimeLimitExceeded: TimeLimitExceeded(10,)
     """
     [14:59:42,182: ERROR/MainProcess] Hard time limit (10s) exceeded for timeout_task[000bc8eb-ec18-4cf3-a842-63df86ceb87c]
     [14:59:42,183: WARNING/ForkPoolWorker-59] 任务结束(无论成功/失败)
     [14:59:42,288: ERROR/MainProcess] Process 'ForkPoolWorker-59' pid:3916642 exited with 'signal 9 (SIGKILL)'
    

3. 输出结果解读

  1. 通过分析任务控制台输出和 Celery worker 日志,可以发现以下超时行为:
    • ID 为 000bc8eb-ec18-4cf3-a842-63df86ceb87c 的任务遇到了超时问题。
    • 在第 8 秒时触发了软超时,超过了预先设置的 8 秒限制。
    • 在第 8 秒时抛出了 SoftTimeLimitExceeded 异常,将任务标记为失败。
    • 在第 10 秒时触发了硬超时,超过了预先设置的 10 秒限制。
    • 在第 10 秒时发送了 SIGKILL 信号,强制终止任务进程。

4. 总结

  1. SoftTimeLimitExceeded 异常是一个软超时异常,任务可以捕获并处理该异常,执行清理逻辑。
  2. HardTimeLimitExceeded 异常是一个硬超时异常,任务不能捕获该异常,会被强制终止。
  3. 要预留足够的时间给任务完成清理逻辑,避免硬超时导致任务失败(比如例子所示)。

Task速率限制

rate_limit 用于限制任务在单位时间内的最大执行次数,本质是通过控制 Worker 处理任务的频率,避免 “任务洪峰” 对系统或依赖服务造成压力。例如: 调用第三方 API 时,对方限制每秒最多 10 次请求 → 可设置 rate_limit=”10/s”; 生成报表的任务很耗 CPU,希望每分钟最多执行 5 次 → 可设置 rate_limit=”5/m”。

语法规则:

  • 单位时间:秒(s)、分钟(m)、小时(h)
  • 格式:{数量}/{单位},如 10/s 表示 10 个任务/秒

原理: rate_limit 不限制任务入队(即 delay() 或 apply_async() 会正常将任务放入队列),仅限制 Worker 从队列中取出任务执行的频率。若队列中积累了大量任务,Worker 会按 rate_limit 逐步消费,避免瞬间执行过多。

1. 代码示例

  1. 全局级别,所有任务生效:在 Celery 配置文件(如 celeryconfig.py)中添加以下配置:

     # 配置任务速率限制为 10 个任务/秒
     app = Celery('basic_tasks')
     app.conf.task_rate_limit = '10/s'    
    
  2. 任务级别:只在当前任务生效,定义中添加 rate_limit 参数:

     # 定义任务时设置默认速率:每秒最多 1 次
     @app.task(rate_limit="1/s", name="rate_limit_task")
     def rate_limit_task():
         """速率限制任务"""
         print(f"任务开始时间: {time.time()}")
         return f"速率限制任务执行成功: {time.time()}"
    
  3. 任务调用

     for i in range(3):
         async_result = rate_limit_task.delay()
         print(f"{i+1}次速率限制任务ID:", async_result.id)  # 输出任务ID
         task_result = async_result.get() # 每秒最多执行1次
         print(f"{i+1}次速率限制任务执行成功: {task_result}")
    

2. 控制台输出结果

  1. 任务控制台输出

    第1次速率限制任务ID: 68b79c80-477f-466f-933a-4a655e838dca
    第1次速率限制任务执行成功: 速率限制任务执行成功: 1760949254.7125618
    第2次速率限制任务ID: 6e8c58c9-2bec-4203-8cda-382080a6f2ce
    第2次速率限制任务执行成功: 速率限制任务执行成功: 1760949255.713834
    第3次速率限制任务ID: 0c76b08a-4b54-4fb6-b192-5c148739ee97
    第3次速率限制任务执行成功: 速率限制任务执行成功: 1760949256.7162948
    
  2. Celery worker输出:

    [16:34:14,710: INFO/MainProcess] Task rate_limit_task[68b79c80-477f-466f-933a-4a655e838dca] received
    [16:34:14,712: WARNING/ForkPoolWorker-65] 任务开始时间: 1760949254.7123966
    [16:34:14,713: INFO/ForkPoolWorker-65] Task rate_limit_task[68b79c80-477f-466f-933a-4a655e838dca] succeeded in 0.0011626780033111572s: '速率限制任务执行成功: 1760949254.7125618'
    [16:34:14,719: INFO/MainProcess] Task rate_limit_task[6e8c58c9-2bec-4203-8cda-382080a6f2ce] received
    [16:34:15,713: WARNING/ForkPoolWorker-65] 任务开始时间: 1760949255.7136571
    [16:34:15,714: INFO/ForkPoolWorker-65] Task rate_limit_task[6e8c58c9-2bec-4203-8cda-382080a6f2ce] succeeded in 0.0012042250018566847s: '速率限制任务执行成功: 1760949255.713834'
    [16:34:15,720: INFO/MainProcess] Task rate_limit_task[0c76b08a-4b54-4fb6-b192-5c148739ee97] received
    [16:34:16,716: WARNING/ForkPoolWorker-65] 任务开始时间: 1760949256.71612
    [16:34:16,717: INFO/ForkPoolWorker-65] Task rate_limit_task[0c76b08a-4b54-4fb6-b192-5c148739ee97] succeeded in 0.0012061879970133305s: '速率限制任务执行成功: 1760949256.7162948'
    

3. 输出结果解读

  1. 任务控制台输出:每个任务的 ID 和执行成功的时间戳,按顺序展示。
  2. Celery worker 输出:每个任务的 ID、开始时间、执行时间和成功/失败状态,按顺序展示。

4. 总结

  1. 从结果可知,任务执行时间间隔符合速率限制配置,即每秒最多 1 个任务。
  2. 通过worker的日志可知,worker是每秒收到一个任务,任务的限速是Broker来实现的。

定时任务

Celery Beat 是 Celery 内置的定时任务调度器,用于周期性地生成任务并将其发送到消息队列(由 Worker 消费执行)。它解决了 “如何按固定时间间隔或特定时间点自动触发任务” 的问题。

Celery Beat 本身不执行任务,而是作为调度中心

  1. 按预设的scheduler时间规则生成任务,支持3种格式:
    1. 固定间隔(float/int):直接指定秒数,表示 “每隔 N 秒执行一次”,适合简单的周期性任务。
    2. Cron 表达式(crontab 对象):通过 celery.schedules.crontab 实现类 Unix cron 的时间规则,支持按 “分、时、日、月、周” 精确配置,适合复杂时间场景【如:每年 12 月的最后一天 23:59 执行crontab(hour=23, minute=59, month_of_year=12, day_of_month=”last”))】,crontab 的常用参数如下:

      参数名含义取值范围 / 示例
      minute分钟0-59(如 30 表示 30 分;*/10 表示每 10 分)
      hour小时0-23(如 2 表示 2 点;8-18 表示 8-18 点)
      day_of_month每月的哪几天1-31(如 1 表示 1 号;last 表示最后一天)
      month_of_year每年的哪几个月1-12 或 Jan-Dec(如 1,3 表示 1 月和 3 月)
      day_of_week每周的哪几天0-6 或 Sun-Sat(0 = 周日,1 = 周一;1-5 表示工作日)
    3. datetime/timedelta:
      1. datetime:指定一个精确的 UTC 时间(仅执行一次);
      2. timedelta:从当前时间开始,延迟一段时间后执行(仅执行一次)
  2. 将生成的任务发送到消息代理(Broker,如 Redis、RabbitMQ);
  3. 由 Celery Worker 从消息代理中获取并执行任务。

Beat原生只支持提前配置静态定时任务,而不支持动态定时任务的管理。

1. 代码示例

  1. 在 Celery 配置文件中添加定时任务的触发规则:

     # 配置 Celery Beat 任务调度器
     app = Celery('basic_tasks')
     app.conf.beat_schedule = {
         'cron_task': {
             'task': 'cron_task',  # 任务函数路径
             'schedule': 10.0,  # 每 10 秒执行一次
             'args': (),  # 任务参数
         },
     }
    
  2. 任务定义

     @app.task(name="cron_task")
     def cron_task():
         """定时任务"""
         print(f"定时任务执行时间: {time.time()}")
         return f"定时任务执行成功: {time.time()}"
    
  3. 启动Beat

     celery -A basic_tasks beat --loglevel=info
    

2. 控制台输出结果

  1. Beat 控制台输出

     celery beat v5.5.1 (immunity) is starting.
     __    -    ... __   -        _
     LocalTime -> 10:09:44
     Configuration ->
         . broker -> redis://localhost:26379/4
         . loader -> celery.loaders.app.AppLoader
         . scheduler -> celery.beat.PersistentScheduler
         . db -> celerybeat-schedule
         . logfile -> [stderr]@%INFO
         . maxinterval -> 5.00 minutes (300s)
     [10:09:44,691: INFO/MainProcess] beat: Starting...
     [10:09:54,726: INFO/MainProcess] Scheduler: Sending due task cron_task (cron_task)
     [10:10:04,723: INFO/MainProcess] Scheduler: Sending due task cron_task (cron_task)
    
  2. Worker控制台输出

     [10:09:56,751: INFO/MainProcess] Task cron_task[7b4ca853-c3d9-4053-baaf-945450eb28d6] received
     [10:09:56,754: WARNING/ForkPoolWorker-59] 定时任务执行时间: 1761012596.7545104
     [10:09:56,758: INFO/ForkPoolWorker-59] Task cron_task[7b4ca853-c3d9-4053-baaf-945450eb28d6] succeeded in 0.004147124942392111s: '定时任务执行成功: 1761012596.7548997'
     [10:10:04,725: INFO/MainProcess] Task cron_task[5f07fd80-cd41-4170-8f9f-7691652b24fc] received
     [10:10:04,728: WARNING/ForkPoolWorker-59] 定时任务执行时间: 1761012604.7281864
     [10:10:04,729: INFO/ForkPoolWorker-59] Task cron_task[5f07fd80-cd41-4170-8f9f-7691652b24fc] succeeded in 0.0012831370113417506s: '定时任务执行成功: 1761012604.728391'
    

3. 输出结果解读

  1. Beat 控制台输出了相关信息,需要注意的是:
    1. scheduler使用的是默认的PersistentScheduler。
    2. Beat启动后,任务 cron_task 每 10 秒被调度一次,添加到Broker的队列中。
  2. Worker 控制台输出:
    1. 展示了 Celery Worker 接收和执行定时任务的详细信息,包括任务 ID、开始时间、执行时间和成功/失败状态。
    2. 从时间点也可以对得上,worker每隔10s收到一次任务 cron_task,然后执行成功,打印信息并返回结果。

4. 总结

  1. 定时任务需要提前配置在Celery App中,Beat启动后,才能按照规则调度。
  2. Beat 仅负责 发送任务,不处理任务执行失败。异常处理的内容,在之前的文章中已经介绍过,这里不再赘述。

DEVELOP · CELERY
develop python3 celery