Personal Blog

develop

Celery Task Flow使用介绍

介绍Celery Task的Flow使用

摘要

本文系统梳理了 Celery 的 Canvas API,帮助开发者用“搭积木”的方式把独立任务组装成复杂工作流。
先对比 delay() 与 apply_async(),指出后者通过 countdown、queue、priority 等参数提供更高阶的调度能力;随后引入 Signature 概念,展示如何用 s()/si() 把任务调用序列化、复用或设为不可变。
在此基础上,逐一演示 6 种原语:

  • chain 用“|”串行传递结果;
  • group 并行批量执行并返回列表;
  • chord 在 group 全部完成后触发回调汇总;
  • map/starmap 单消息顺序处理参数列表;
  • chunks 把超长列表分块再并行,降低消息开销。

每个原语均给出可运行代码、控制台输出与结果解读,读者可直接复制验证。掌握这些模式后,即可按需组合,实现定时、依赖、异常处理、高吞吐等生产级任务流。

介绍

在 Celery 中,“任务编排”(Task Orchestration)指的是将多个独立任务按照特定逻辑(如串行、并行、依赖关系、条件分支等)组合成一个完整工作流的过程。Celery 提供了一套名为 Canvas API 的工具集,专门用于实现任务编排,同时也支持结合业务逻辑实现更复杂的自定义编排。

任务的调用方式

之前的内容中,我们接触了Celery通过delay()方法来触发任务的执行。

其实,Celery还提供了apply_async()方法来触发任务的执行,它比delay()方法更加强大,支持更多的参数配置。

有时在任务的编排中,apply_async()方法给我们提供了更多的灵活性,比如设置任务的延迟执行时间、指定任务的队列、设置任务的优先级等。所以我们这里要单独介绍一下apply_async()的使用方法。

delay方法 VS apply_async方法

简单举例说明:

方法描述示例
delay()简单触发任务执行,不支持参数配置add.delay(2,2)
apply_async()触发任务执行,支持更多参数配置add.apply_async((2,2), countdown=10)

apply_async方法的参数

具体apply_async的完整参数官网文档可以参考这里,以下是几个常用的参数说明:

参数名类型默认值描述
argstuple()位置参数,按顺序传递给任务函数
kwargsdict{}关键字参数,传递给任务函数的命名参数
countdownint0任务延迟执行时间(秒),用于实现定时任务
etadatetimeNone任务指定执行时间,用于实现定时任务
expiresintNone任务过期时间(秒),任务执行时间超过该值后自动取消
queuestrNone任务指定队列,用于将任务分配到不同的 worker 处理
routing_keystrNone任务指定路由键,用于将任务分配到不同的 worker 处理
priorityintNone任务优先级,数值越大优先级越高,需 broker 支持
linkSignature/listNone任务成功后触发的回调签名,支持单签名或列表
link_errorSignature/listNone任务失败后触发的回调签名,支持单签名或列表

Signatures

有时你希望将任务调用的签名传递给另一个进程,或作为另一个函数的参数。 此时就需要Signatures,它是任务调用的“蓝图”,封装了单个任务的位置参数(args)、关键字参数(kwargs)以及执行选项(如 countdown、queue),可以序列化后传递到其他进程或作为函数参数,从而解决任务调用逻辑的复用与跨进程传递问题。

创建方式

创建方式语法示例特点
函数式signature('tasks.add', (2,2), countdown=10)需指定任务名,适用于任务未直接导入场景
方法式add.signature((2,2), countdown=10)基于任务对象创建,直观易懂
快捷式add.s(2,2)简洁高效,不可直接设 options(需set()补充,如add.s(2,2).set(countdown=1)

关键特性

  1. Partial Signatures:基于已有签名新增参数,一般是需要父任务的结果作为当前任务参数的场景,规则如下
    1. 位置参数:新增 args前置到原有 args(如add.s(2).delay(4) → 实际执行add(4,2));
    2. 关键字参数:新增 kwargs覆盖原有 kwargs(如add.s(2,2).delay(debug=True) → 实际执行add(2,2,debug=True));
    3. 执行选项:新增 options覆盖原有 options(如add.signature((2,2), countdown=10).apply_async(countdown=1) → 最终countdown=1)。
  2. Immutability Signature:避免签名被 Partial 扩展,适用于当前任务无需父任务结果传递的场景
    1. 创建方式:
      1. add.signature((2,2), immutable=True);
      2. 快捷方法add.si(2,2);
    2. 限制:仅可修改执行选项(如countdown),不可添加 / 修改 args/kwargs。
  3. Callback Signatures:用于在任务完成后触发其他任务,一般是需要当前任务的成功执行后,将结果作为其他任务参数的场景,规则如下:
    1. 如预期所示,这将首先启动一个任务计算 2 + 2,然后启动另一个任务计算 4 + 8
       add.apply_async((2, 2), link=add.s(8))
      
    2. 如果父任务发生异常,可以使用link_error参数指定回调任务,用于处理异常情况
       add.apply_async((2, 2), 
       link=add.s(8) 
       link_error=some_exception_handle_func.s())
      

原语

原语(Primitives)是组合复杂工作流的基础单元,所有原语均为 Signatures 实例,支持嵌套组合

原语功能描述执行方式结果/特性典型场景
chain串行执行签名,前任务结果传递给后任务串行支持 | 运算符,parent 属性查中间结果依赖前任务结果的流程(如 a→b→c)
group并行执行多个签名任务并行返回 GroupResult,支持 ready()/get()无依赖的批量并行任务(如批量计算)
chordgroup + 回调,组任务完成后执行回调并行 + 串行回调需 result_backend,触发 ChordError需汇总组结果(如并行计算后求和)
map串行处理参数列表,1 个任务消息串行单参数传入(如 tsum.map([range(10), range(100)]))单任务批量处理单参数数据
starmap串行处理参数列表,参数以 *args 传入串行多参数传入(如 add.starmap([(2,2),(4,4)]))单任务批量处理多参数数据
chunks拆分长参数列表为块,减少消息开销并行(块级)支持转为 group,skew() 设倒计时偏移超大量参数任务(如 1000 条分 10 块)

1. chain 原语

功能描述

chain 原语用于串行执行多个签名任务,前一个任务的结果会作为后一个任务的参数传递。

代码示例

chain定义

from celery import chain

# chain示例
workflow = chain(
    add.s(10, 20),
    multiply.s(2),
)
# 或者使用 | 运算符,效果一致
workflow = chain(
    add.s(10, 20) | multiply.s(2)
)
result = workflow.apply_async()
final_result = result.get()
print(f"链式任务最终结果: {final_result}")

控制台输出

链式任务最终结果: 60

输出结果解读

  1. 任务1(add.s(10, 20))执行,结果为30
  2. 任务2(multiply.s(2))执行,相当于执行multiply(30, 2),结果为60

2. group 原语

功能描述

group 原语用于并行执行多个签名任务,所有任务会同时开始执行,返回 GroupResult 对象,可用于查询任务是否完成或获取任务结果。

注意:同时执行多个任务之间无依赖关系。

代码示例

group定义

from celery import group

# group示例
workflow = group(
    add.s(10, 20),
    multiply.s(10, 20)
)
workflow_result = workflow.apply_async()
final_result = workflow_result.get()
print(f"组式任务结果: {final_result}")

控制台输出

组式任务结果: [30, 200]

输出结果解读

  1. 任务1(add.s(10, 20))和任务2(multiply.s(10, 20))会同时开始执行
  2. 任务1执行结果为30,任务2执行结果为200
  3. 最终结果为[30, 200],顺序与定义顺序一致

3. chord 原语

功能描述

chord 原语用于并行执行多个签名任务,所有任务会同时开始执行,任务完成后触发回调任务,回调任务的参数为并行任务的结果列表。

代码示例

chord定义

# chord示例
group_workflow = group(
    add.s(10, 20),
    multiply.s(10, 20)
)
chord_workflow = chord(
    group_workflow,
    sum_all.s()
)
workflow_result = chord_workflow.apply_async()
final_result = workflow_result.get()
print(f"弦式任务结果: {final_result}")

控制台输出

  1. 任务控制台输出

     弦式任务结果: 230
    
  2. worker 控制台输出

     [17:59:57,987: INFO/MainProcess] Task add[5ebabfbf-8654-4e36-ba9d-26894b1f3363] received
     [17:59:57,990: WARNING/ForkPoolWorker-59] 执行加法任务: 10 + 20
     [17:59:57,994: INFO/MainProcess] Task multiply[7270d9cf-dd99-4dde-9266-089feb0e118e] received
     [17:59:57,994: INFO/ForkPoolWorker-59] Task add[5ebabfbf-8654-4e36-ba9d-26894b1f3363] succeeded in 0.00507375702727586s: 30
     [17:59:57,996: WARNING/ForkPoolWorker-59] 执行乘法任务: 10 * 20
     [17:59:58,008: INFO/ForkPoolWorker-59] Task multiply[7270d9cf-dd99-4dde-9266-089feb0e118e] succeeded in 0.011834581033326685s: 200
     [17:59:58,008: INFO/MainProcess] Task sum_all[08f99cfd-55fa-442a-a948-4983966418d3] received
     [17:59:58,010: WARNING/ForkPoolWorker-59] 执行求和任务: [30, 200]
     [17:59:58,011: INFO/ForkPoolWorker-59] Task sum_all[08f99cfd-55fa-442a-a948-4983966418d3] succeeded in 0.0008721619378775358s: 230
    

输出结果解读

  1. 任务1(add.s(10, 20))和任务2(multiply.s(10, 20))会同时开始执行
  2. 任务1执行结果为30,任务2执行结果为200
  3. 回调任务sum_all.s()会接收到[30, 200]作为参数,执行sum([30, 200]),结果为230

4. map & starmap 原语

功能描述

首先,python3内置的map和starmap函数:

  • map(func, iterable)单个参数 依次喂给函数,返回迭代器;
  • itertools.starmap(func, iterable_of_tuples)元组拆包成多参数 后喂给函数。

Celery的map与starmap的语义和使用场景完全类似于python3内置的map和starmap。

Celery的map 和 starmap 用于串行处理参数列表,用于对序列中的每个元素依次调用指定任务。

它们与 Celery group 的区别在于:

  • 仅发送一条任务消息;
  • 操作按顺序串行执行。
  • map 用于单参数传入,starmap 用于多参数传入。

因为map和startmap的使用方法类似,所以以下仅以map的使用方法为例进行说明。

代码示例

map定义

    # map示例
    map_workflow = sum_all.map([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]])
    workflow_result = map_workflow.apply_async()
    final_result = workflow_result.get()
    print(f"map式任务结果: {final_result}")

控制台输出

  1. 任务控制台输出

     map式任务结果: [15, 40]
    
  2. worker 控制台输出

     [19:09:19,306: INFO/MainProcess] Task celery.map[b3985aec-3bba-4e23-9d78-c7785bee14e0] received
     [19:09:19,309: WARNING/ForkPoolWorker-59] 执行求和任务: [1, 2, 3, 4, 5]
     [19:09:19,309: WARNING/ForkPoolWorker-59] 执行求和任务: [6, 7, 8, 9, 10]
     [19:09:19,313: INFO/ForkPoolWorker-59] Task celery.map[b3985aec-3bba-4e23-9d78-c7785bee14e0] succeeded in 0.005126871052198112s: [15, 40]
    

    输出结果解读

  3. 任务1(sum_all.s([1, 2, 3, 4, 5]))执行,结果为15
  4. 任务2(sum_all.s([6, 7, 8, 9, 10]))执行,结果为40
  5. 最终结果为[15, 40],顺序与定义顺序一致

5. chunk 原语

功能描述

用于将参数列表分成多个子列表,每个子列表作为一个任务进行处理。 chunk 原语允许你将一个可迭代的工作集拆分成若干片段,例如当你有一百万个对象时,可以创建 10 个任务,每个任务处理十万个对象。 有人担心分块会降低并行度,但在繁忙的集群中这几乎不会发生;实际上,由于避免了消息传递的开销,分块往往会显著提升性能。

代码示例

chunk定义

    #chunk示例
    chunk_workflow = add.chunks(
        zip(range(6), range(6)),
        3
    )
    workflow_result = chunk_workflow.apply_async()
    final_result = workflow_result.get()
    print(f"chunk式任务结果: {final_result}")

控制台输出

  1. 任务控制台输出

     chunk式任务结果: [[0, 2, 4], [6, 8, 10]]
    
  2. worker 控制台输出

     [19:24:15,314: INFO/MainProcess] Task celery.starmap[e9bd74e0-0f62-47e1-a767-e1651596efbb] received
     [19:24:15,317: WARNING/ForkPoolWorker-59] 执行加法任务: 0 + 0
     [19:24:15,317: WARNING/ForkPoolWorker-59] 执行加法任务: 1 + 1
     [19:24:15,317: WARNING/ForkPoolWorker-59] 执行加法任务: 2 + 2
     [19:24:15,318: INFO/ForkPoolWorker-59] Task celery.starmap[e9bd74e0-0f62-47e1-a767-e1651596efbb] succeeded in 0.0017218650318682194s: [0, 2, 4]
     [19:24:15,326: INFO/MainProcess] Task celery.starmap[a1c7a3ce-9d20-48af-bfab-a809ed477725] received
     [19:24:15,328: WARNING/ForkPoolWorker-59] 执行加法任务: 3 + 3
     [19:24:15,328: WARNING/ForkPoolWorker-59] 执行加法任务: 4 + 4
     [19:24:15,328: WARNING/ForkPoolWorker-59] 执行加法任务: 5 + 5
     [19:24:15,329: INFO/ForkPoolWorker-59] Task celery.starmap[a1c7a3ce-9d20-48af-bfab-a809ed477725] succeeded in 0.0013732200022786856s: [6, 8, 10]
    

输出结果解读

  1. 任务1(add.s(0, 0), add.s(1, 1), add.s(2, 2))执行,结果为[0, 2, 4]
  2. 任务2(add.s(3, 3), add.s(4, 4), add.s(5, 5))执行,结果为[6, 8, 10]
  3. 最终结果为[[0, 2, 4], [6, 8, 10]],顺序与定义顺序一致

DEVELOP · CELERY
develop python3 celery