develop
Celery Task Flow使用介绍
摘要
本文系统梳理了 Celery 的 Canvas API,帮助开发者用“搭积木”的方式把独立任务组装成复杂工作流。
先对比 delay() 与 apply_async(),指出后者通过 countdown、queue、priority 等参数提供更高阶的调度能力;随后引入 Signature 概念,展示如何用 s()/si() 把任务调用序列化、复用或设为不可变。
在此基础上,逐一演示 6 种原语:
- chain 用
“|”串行传递结果; - group 并行批量执行并返回列表;
- chord 在 group 全部完成后触发回调汇总;
- map/starmap 单消息顺序处理参数列表;
- chunks 把超长列表分块再并行,降低消息开销。
每个原语均给出可运行代码、控制台输出与结果解读,读者可直接复制验证。掌握这些模式后,即可按需组合,实现定时、依赖、异常处理、高吞吐等生产级任务流。
介绍
在 Celery 中,“任务编排”(Task Orchestration)指的是将多个独立任务按照特定逻辑(如串行、并行、依赖关系、条件分支等)组合成一个完整工作流的过程。Celery 提供了一套名为 Canvas API 的工具集,专门用于实现任务编排,同时也支持结合业务逻辑实现更复杂的自定义编排。
任务的调用方式
之前的内容中,我们接触了Celery通过delay()方法来触发任务的执行。
其实,Celery还提供了apply_async()方法来触发任务的执行,它比delay()方法更加强大,支持更多的参数配置。
有时在任务的编排中,apply_async()方法给我们提供了更多的灵活性,比如设置任务的延迟执行时间、指定任务的队列、设置任务的优先级等。所以我们这里要单独介绍一下apply_async()的使用方法。
delay方法 VS apply_async方法
简单举例说明:
| 方法 | 描述 | 示例 |
|---|---|---|
| delay() | 简单触发任务执行,不支持参数配置 | add.delay(2,2) |
| apply_async() | 触发任务执行,支持更多参数配置 | add.apply_async((2,2), countdown=10) |
apply_async方法的参数
具体apply_async的完整参数官网文档可以参考这里,以下是几个常用的参数说明:
| 参数名 | 类型 | 默认值 | 描述 |
|---|---|---|---|
| args | tuple | () | 位置参数,按顺序传递给任务函数 |
| kwargs | dict | {} | 关键字参数,传递给任务函数的命名参数 |
| countdown | int | 0 | 任务延迟执行时间(秒),用于实现定时任务 |
| eta | datetime | None | 任务指定执行时间,用于实现定时任务 |
| expires | int | None | 任务过期时间(秒),任务执行时间超过该值后自动取消 |
| queue | str | None | 任务指定队列,用于将任务分配到不同的 worker 处理 |
| routing_key | str | None | 任务指定路由键,用于将任务分配到不同的 worker 处理 |
| priority | int | None | 任务优先级,数值越大优先级越高,需 broker 支持 |
| link | Signature/list | None | 任务成功后触发的回调签名,支持单签名或列表 |
| link_error | Signature/list | None | 任务失败后触发的回调签名,支持单签名或列表 |
Signatures
有时你希望将任务调用的签名传递给另一个进程,或作为另一个函数的参数。 此时就需要Signatures,它是任务调用的“蓝图”,封装了单个任务的位置参数(args)、关键字参数(kwargs)以及执行选项(如 countdown、queue),可以序列化后传递到其他进程或作为函数参数,从而解决任务调用逻辑的复用与跨进程传递问题。
创建方式
| 创建方式 | 语法示例 | 特点 |
|---|---|---|
| 函数式 | signature('tasks.add', (2,2), countdown=10) | 需指定任务名,适用于任务未直接导入场景 |
| 方法式 | add.signature((2,2), countdown=10) | 基于任务对象创建,直观易懂 |
| 快捷式 | add.s(2,2) | 简洁高效,不可直接设 options(需set()补充,如add.s(2,2).set(countdown=1)) |
关键特性
- Partial Signatures:基于已有签名新增参数,一般是需要父任务的结果作为当前任务参数的场景,规则如下
- 位置参数:新增 args前置到原有 args(如add.s(2).delay(4) → 实际执行add(4,2));
- 关键字参数:新增 kwargs覆盖原有 kwargs(如add.s(2,2).delay(debug=True) → 实际执行add(2,2,debug=True));
- 执行选项:新增 options覆盖原有 options(如add.signature((2,2), countdown=10).apply_async(countdown=1) → 最终countdown=1)。
- Immutability Signature:避免签名被 Partial 扩展,适用于当前任务无需父任务结果传递的场景
- 创建方式:
- add.signature((2,2), immutable=True);
- 快捷方法add.si(2,2);
- 限制:仅可修改执行选项(如countdown),不可添加 / 修改 args/kwargs。
- 创建方式:
- Callback Signatures:用于在任务完成后触发其他任务,一般是需要当前任务的成功执行后,将结果作为其他任务参数的场景,规则如下:
- 如预期所示,这将首先启动一个任务计算 2 + 2,然后启动另一个任务计算 4 + 8
add.apply_async((2, 2), link=add.s(8)) - 如果父任务发生异常,可以使用link_error参数指定回调任务,用于处理异常情况
add.apply_async((2, 2), link=add.s(8), link_error=some_exception_handle_func.s())
- 如预期所示,这将首先启动一个任务计算 2 + 2,然后启动另一个任务计算 4 + 8
原语
原语(Primitives)是组合复杂工作流的基础单元,所有原语均为 Signatures 实例,支持嵌套组合
| 原语 | 功能描述 | 执行方式 | 结果/特性 | 典型场景 |
|---|---|---|---|---|
| chain | 串行执行签名,前任务结果传递给后任务 | 串行 | 支持 | 运算符,parent 属性查中间结果 | 依赖前任务结果的流程(如 a→b→c) |
| group | 并行执行多个签名任务 | 并行 | 返回 GroupResult,支持 ready()/get() | 无依赖的批量并行任务(如批量计算) |
| chord | group + 回调,组任务完成后执行回调 | 并行 + 串行回调 | 需 result_backend,触发 ChordError | 需汇总组结果(如并行计算后求和) |
| map | 串行处理参数列表,1 个任务消息 | 串行 | 单参数传入(如 tsum.map([range(10), range(100)])) | 单任务批量处理单参数数据 |
| starmap | 串行处理参数列表,参数以 *args 传入 | 串行 | 多参数传入(如 add.starmap([(2,2),(4,4)])) | 单任务批量处理多参数数据 |
| chunks | 拆分长参数列表为块,减少消息开销 | 并行(块级) | 支持转为 group,skew() 设倒计时偏移 | 超大量参数任务(如 1000 条分 10 块) |
1. chain 原语
功能描述
chain 原语用于串行执行多个签名任务,前一个任务的结果会作为后一个任务的参数传递。
代码示例
chain定义
from celery import chain
# chain示例
workflow = chain(
add.s(10, 20),
multiply.s(2),
)
# 或者使用 | 运算符,效果一致
workflow = chain(
add.s(10, 20) | multiply.s(2)
)
result = workflow.apply_async()
final_result = result.get()
print(f"链式任务最终结果: {final_result}")
控制台输出
链式任务最终结果: 60
输出结果解读
- 任务1(add.s(10, 20))执行,结果为30
- 任务2(multiply.s(2))执行,相当于执行multiply(30, 2),结果为60
2. group 原语
功能描述
group 原语用于并行执行多个签名任务,所有任务会同时开始执行,返回 GroupResult 对象,可用于查询任务是否完成或获取任务结果。
注意:同时执行多个任务之间无依赖关系。
代码示例
group定义
from celery import group
# group示例
workflow = group(
add.s(10, 20),
multiply.s(10, 20)
)
workflow_result = workflow.apply_async()
final_result = workflow_result.get()
print(f"组式任务结果: {final_result}")
控制台输出
组式任务结果: [30, 200]
输出结果解读
- 任务1(add.s(10, 20))和任务2(multiply.s(10, 20))会同时开始执行
- 任务1执行结果为30,任务2执行结果为200
- 最终结果为[30, 200],顺序与定义顺序一致
3. chord 原语
功能描述
chord 原语用于并行执行多个签名任务,所有任务会同时开始执行,任务完成后触发回调任务,回调任务的参数为并行任务的结果列表。
代码示例
chord定义
# chord示例
group_workflow = group(
add.s(10, 20),
multiply.s(10, 20)
)
chord_workflow = chord(
group_workflow,
sum_all.s()
)
workflow_result = chord_workflow.apply_async()
final_result = workflow_result.get()
print(f"弦式任务结果: {final_result}")
控制台输出
任务控制台输出
弦式任务结果: 230worker 控制台输出
[17:59:57,987: INFO/MainProcess] Task add[5ebabfbf-8654-4e36-ba9d-26894b1f3363] received [17:59:57,990: WARNING/ForkPoolWorker-59] 执行加法任务: 10 + 20 [17:59:57,994: INFO/MainProcess] Task multiply[7270d9cf-dd99-4dde-9266-089feb0e118e] received [17:59:57,994: INFO/ForkPoolWorker-59] Task add[5ebabfbf-8654-4e36-ba9d-26894b1f3363] succeeded in 0.00507375702727586s: 30 [17:59:57,996: WARNING/ForkPoolWorker-59] 执行乘法任务: 10 * 20 [17:59:58,008: INFO/ForkPoolWorker-59] Task multiply[7270d9cf-dd99-4dde-9266-089feb0e118e] succeeded in 0.011834581033326685s: 200 [17:59:58,008: INFO/MainProcess] Task sum_all[08f99cfd-55fa-442a-a948-4983966418d3] received [17:59:58,010: WARNING/ForkPoolWorker-59] 执行求和任务: [30, 200] [17:59:58,011: INFO/ForkPoolWorker-59] Task sum_all[08f99cfd-55fa-442a-a948-4983966418d3] succeeded in 0.0008721619378775358s: 230
输出结果解读
- 任务1(add.s(10, 20))和任务2(multiply.s(10, 20))会同时开始执行
- 任务1执行结果为30,任务2执行结果为200
- 回调任务sum_all.s()会接收到[30, 200]作为参数,执行sum([30, 200]),结果为230
4. map & starmap 原语
功能描述
首先,python3内置的map和starmap函数:
map(func, iterable)把 单个参数 依次喂给函数,返回迭代器;itertools.starmap(func, iterable_of_tuples)把 元组拆包成多参数 后喂给函数。
Celery的map与starmap的语义和使用场景完全类似于python3内置的map和starmap。
Celery的map 和 starmap 用于串行处理参数列表,用于对序列中的每个元素依次调用指定任务。
它们与 Celery group 的区别在于:
- 仅发送一条任务消息;
- 操作按顺序串行执行。
- map 用于单参数传入,starmap 用于多参数传入。
因为map和startmap的使用方法类似,所以以下仅以map的使用方法为例进行说明。
代码示例
map定义
# map示例
map_workflow = sum_all.map([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]])
workflow_result = map_workflow.apply_async()
final_result = workflow_result.get()
print(f"map式任务结果: {final_result}")
控制台输出
任务控制台输出
map式任务结果: [15, 40]worker 控制台输出
[19:09:19,306: INFO/MainProcess] Task celery.map[b3985aec-3bba-4e23-9d78-c7785bee14e0] received [19:09:19,309: WARNING/ForkPoolWorker-59] 执行求和任务: [1, 2, 3, 4, 5] [19:09:19,309: WARNING/ForkPoolWorker-59] 执行求和任务: [6, 7, 8, 9, 10] [19:09:19,313: INFO/ForkPoolWorker-59] Task celery.map[b3985aec-3bba-4e23-9d78-c7785bee14e0] succeeded in 0.005126871052198112s: [15, 40]输出结果解读
- 任务1(sum_all.s([1, 2, 3, 4, 5]))执行,结果为15
- 任务2(sum_all.s([6, 7, 8, 9, 10]))执行,结果为40
- 最终结果为[15, 40],顺序与定义顺序一致
5. chunk 原语
功能描述
用于将参数列表分成多个子列表,每个子列表作为一个任务进行处理。 chunk 原语允许你将一个可迭代的工作集拆分成若干片段,例如当你有一百万个对象时,可以创建 10 个任务,每个任务处理十万个对象。 有人担心分块会降低并行度,但在繁忙的集群中这几乎不会发生;实际上,由于避免了消息传递的开销,分块往往会显著提升性能。
代码示例
chunk定义
#chunk示例
chunk_workflow = add.chunks(
zip(range(6), range(6)),
3
)
workflow_result = chunk_workflow.apply_async()
final_result = workflow_result.get()
print(f"chunk式任务结果: {final_result}")
控制台输出
任务控制台输出
chunk式任务结果: [[0, 2, 4], [6, 8, 10]]worker 控制台输出
[19:24:15,314: INFO/MainProcess] Task celery.starmap[e9bd74e0-0f62-47e1-a767-e1651596efbb] received [19:24:15,317: WARNING/ForkPoolWorker-59] 执行加法任务: 0 + 0 [19:24:15,317: WARNING/ForkPoolWorker-59] 执行加法任务: 1 + 1 [19:24:15,317: WARNING/ForkPoolWorker-59] 执行加法任务: 2 + 2 [19:24:15,318: INFO/ForkPoolWorker-59] Task celery.starmap[e9bd74e0-0f62-47e1-a767-e1651596efbb] succeeded in 0.0017218650318682194s: [0, 2, 4] [19:24:15,326: INFO/MainProcess] Task celery.starmap[a1c7a3ce-9d20-48af-bfab-a809ed477725] received [19:24:15,328: WARNING/ForkPoolWorker-59] 执行加法任务: 3 + 3 [19:24:15,328: WARNING/ForkPoolWorker-59] 执行加法任务: 4 + 4 [19:24:15,328: WARNING/ForkPoolWorker-59] 执行加法任务: 5 + 5 [19:24:15,329: INFO/ForkPoolWorker-59] Task celery.starmap[a1c7a3ce-9d20-48af-bfab-a809ed477725] succeeded in 0.0013732200022786856s: [6, 8, 10]
输出结果解读
- 任务1(add.s(0, 0), add.s(1, 1), add.s(2, 2))执行,结果为[0, 2, 4]
- 任务2(add.s(3, 3), add.s(4, 4), add.s(5, 5))执行,结果为[6, 8, 10]
- 最终结果为[[0, 2, 4], [6, 8, 10]],顺序与定义顺序一致
DEVELOP · CELERY
develop python3 celery
