Personal Blog

develop

Celery异常处理

介绍Celery的异常处理方法,包括异常捕获和异常重试

摘要

本文介绍了 Celery 的异常处理方法,主要内容包括:

  1. 异常捕获:通过对 async_result.get() 方法进行异常捕获,可在任务调用处得知任务执行状态和失败原因,处理方式与普通函数异常捕获类似。
  2. 异常重试:当任务因临时外部依赖问题失败时,可设置重试逻辑。任务定义需使用 bind=True 以访问上下文信息,通过 self.retry() 函数设置重试间隔和最大重试次数等参数。还介绍了 retry_backoffretry_backoff_maxretry_jitterdont_autoretry_for 等相关参数。
  3. Worker 异常退出:Worker 意外退出时,Broker 会在 visibility_timeout 后将任务重新标记为可见,实现任务重试。需注意设置合适的 visibility_timeout,确保任务幂等性,且 acks_late 参数默认为 False

异常处理

前面系列文章中的任务只是简单的做加法和乘法,但是想要真正的在生产中使用celery,还有一些事情不得不考虑:

  1. 任务抛出异常怎么办?
  2. 调用任务的地方如何知道异常的发生?
  3. 如果异常是因为临时的外部原因引起的,是否可以对任务进行重试以使得任务更加健壮?
  4. 如果worker本身在执行task的过程中异常退出,正在执行中的task又会怎么样?

一、异常捕获

1. 代码示例

  1. 任务定义

     @app.task(name='exception_demo')
     def exception_demo():
         """
         异常任务
         演示异常任务
         """
         print(f"执行抛出异常任务")
         raise Exception(f"执行抛出异常任务")
    
  2. 任务调用

     # 触发异常任务
     async_result = exception_demo.delay()
     print(async_result.id)  # 输出任务ID
     try:
         task_result = async_result.get() # 不可能执行成功
     except Exception as e:
         print(f"异常任务失败: {str(e)}")
         print(f"异常任务状态: {async_result.status}")
         print(f"异常任务信息: {async_result.info}")
    

2. 控制台输出结果

268e6861-8ed1-404a-a459-62cafa8db5ed
异常任务失败: 执行抛出异常任务
异常任务状态: FAILURE
异常任务信息: 执行抛出异常任务

3. 输出结果解读

  1. 通过对async_result的get()方法进行异常捕获,成功捕获了任务抛出的异常,使用方法和对普通函数的异常捕获是一样的。
  2. 抛出的Exception异常信息为”执行抛出异常任务”,和任务中定义的一样。
  3. 异常任务的状态为FAILURE,说明任务执行失败。

4. 总结

可见异常会正常进行传递被捕获,我们就可以在任务调用的地方知道任务是否执行成功,以及任务执行失败的原因,处理起来就像我们对普通函数的异常捕获一样。

二、异常重试

如果是因为网络抖动或其他临时的外部依赖的原因导致任务执行失败,稍后等外部依赖恢复正常后,再次执行任务就可以成功。所以此时为了避免人为的再次触发,我们可以改造一下任务,在任务中设置重试逻辑。

1. 代码示例

  1. 任务定义

     @app.task(bind=True, name='exception_recover')
     def exception_recover(self):
         """
         异常恢复任务
         演示异常恢复机制
         """
         try:
             if self.request.retries < 2:
                 raise Exception(f"执行第{self.request.retries+1}次抛出异常任务")
             else:
                 return "异常恢复任务成功"
         except Exception as e:
             print(f"异常恢复任务失败: {str(e)}")
             raise self.retry(exc=e, countdown=10, max_retries=3) # 重试3次,每次间隔10秒
    
  2. 任务调用

     # 触发异常恢复任务
     async_result = exception_recover.delay()
     print(async_result.id)  # 输出任务ID
     try:
         task_result = async_result.get() # retry次数小于2次时,会抛出异常,否则会返回成功
         print(f"异常恢复任务成功: {task_result}")
     except Exception as e:
         print(f"异常恢复任务失败: {str(e)}")
    

2. 控制台输出结果

  1. 任务触发控制台的log

     d0b98e05-6ffa-4409-95e6-c0ae776b2f82
     异常恢复任务成功: 异常恢复任务成功
    
  2. Celery Worker的log

     [16:31:44,708: INFO/MainProcess] Task exception_recover[d0b98e05-6ffa-4409-95e6-c0ae776b2f82] received
     [16:31:44,710: WARNING/ForkPoolWorker-59] 异常恢复任务失败: 执行第1次抛出异常任务
     [16:31:44,722: INFO/MainProcess] Task exception_recover[d0b98e05-6ffa-4409-95e6-c0ae776b2f82] received
     [16:31:44,726: INFO/ForkPoolWorker-59] Task exception_recover[d0b98e05-6ffa-4409-95e6-c0ae776b2f82] retry: Retry in 10s: Exception('执行第1次抛出异常任务')
     [16:31:54,716: WARNING/ForkPoolWorker-59] 异常恢复任务失败: 执行第2次抛出异常任务
     [16:31:54,721: INFO/MainProcess] Task exception_recover[d0b98e05-6ffa-4409-95e6-c0ae776b2f82] received
     [16:31:54,724: INFO/ForkPoolWorker-59] Task exception_recover[d0b98e05-6ffa-4409-95e6-c0ae776b2f82] retry: Retry in 10s: Exception('执行第2次抛出异常任务')
     [16:32:04,721: INFO/ForkPoolWorker-59] Task exception_recover[d0b98e05-6ffa-4409-95e6-c0ae776b2f82] succeeded in 0.0013180049718357623s: '异常恢复任务成功'
    

3. 输出结果解读

  1. 任务触发控制台的log中,输出了任务ID和任务最终执行成功的信息。
  2. Celery Worker的log中,首先收到了任务异常的通知,打印出了异常信息”执行第1次抛出异常任务”。
  3. 然后根据任务定义中的重试逻辑,Celery Worker在10秒后重试了任务,打印出了异常信息”执行第2次抛出异常任务”。
  4. 最后在20秒后,任务成功执行,打印出了任务执行成功的信息”异常恢复任务成功”。

4. 总结

  1. 任务的定义中,使用了bind=True的参数,这是为了在任务执行过程中,能够访问到任务的上下文信息,比如任务ID、重试次数等。self.request.retries就需要bind之后才可以访问到。
  2. retry的countdown参数指定了每次重试的间隔时间为10秒;max_retries参数指定了最大重试次数为3次。
  3. 任务总共执行了3次,包括初始执行和2次重试,第二次重试成功。所以没有超过最大重试次数的3次。
  4. 可见重试的核心是raise self.retry(exc=e, countdown=10, max_retries=3)函数,里面定义了重试的基本逻辑,包括异常信息、重试间隔时间和最大重试次数。
  5. 其他相关的参数可以参考Celery官方文档
    1. retry_backoff: 布尔值或数字。
      1. 如果此选项设置为 True,自动重试将按照指数退避规则进行延迟。第一次重试将延迟 1 秒,第二次重试将延迟 2 秒,第三次延迟 4 秒,第四次延迟 8 秒,依此类推。(不过,如果启用了 retry_jitter,此延迟值会被修改。)
      2. 如果此选项设置为一个数字,则该数字将用作延迟因子。例如,如果此选项设置为 3,第一次重试将延迟 3 秒,第二次延迟 6 秒,第三次延迟 12 秒,第四次延迟 24 秒,依此类推。默认情况下,此选项设置为 False,自动重试不会延迟。
    2. retry_backoff_max:一个数字。如果启用了 retry_backoff,此选项将设置任务自动重试之间的最大延迟时间(以秒为单位)。默认情况下,此选项设置为 600,即 10 分钟。
    3. retry_jitter:一个布尔值。Jitter 用于在指数退避延迟中引入随机性,以防止队列中的所有任务同时执行。如果此选项设置为 True,则 retry_backoff 计算出的延迟值将被视为最大值,实际的延迟值将是介于 0 和该最大值之间的随机数。默认情况下,此选项设置为 True。
    4. dont_autoretry_for: 一个异常类的列表/元组。这些异常不会被自动重试。

三、worker异常退出

1. 退出的场景

某些任务可能需要比较长的时间来运行,比如3min。在这期间,除了上述提到的任务的外部依赖异常外,celery worker也可能因为特殊因为被异常退出,比如操作系统将触发OOM后,刚好将worker选为被杀死的进程。

2. 重试的原理

当 Worker 从 Broker(如 Redis、RabbitMQ)获取任务后,任务会被标记为 “不可见”(避免被其他 Worker 重复消费获取)。如果 Worker 在执行过程中意外退出(未发送确认信号 ack),Broker 会在 visibility_timeout 时间后,将该 “不可见” 的任务重新标记为 “可见”,允许新的 Worker 重新获取并执行。 通过这种方式实现了worker异常退出后的任务重试机制,确保任务不会因为worker异常退出而丢失。

3. 配置

因为可见与不可见的状态是Broker来维护的,所以在配置Celery App的Broker时,需要对Broker进行设置。

app = Celery('basic_tasks')
app.conf.broker_transport_options = {'visibility_timeout': 3600}

visibility_timeout参数,默认值为5秒。这个参数表示任务在被worker获取后,在未被确认前的可见时间。如果任务在这个时间内没有被确认,Broker会将任务重新标记为可见,允许其他worker获取并执行。

4. 注意事项

  1. visibility_timeout 时间的设置: visibility_timeout 是 Broker 用于控制任务可见性的时间。如果设置得过短,可能会导致任务在 Worker 退出后立即被其他 Worker 获取并执行,从而导致重复执行。
  2. 任务的幂等性: 由于任务可能会被重试,因此在设计任务时需要确保任务是幂等的。如果任务不是幂等的,可能会导致重复执行产生副作用。
  3. 确保acks_late参数为False,也是默认的行为;
    1. 在task的装饰器中可以设置acks_late的参数(默认False)
    2. 在app的定义中设置全局的task_acks_late参数(默认False)

DEVELOP · CELERY
develop python3 celery