Personal Blog

develop

Celery安装及使用

介绍Celery的安装和使用

摘要

本文介绍了Celery的安装及使用方法。安装方面,提供了基础安装命令和包含Redis依赖的安装命令。使用部分详细说明了Celery的配置和操作流程,包括:

  1. 定义Celery应用实例并配置消息代理、结果后端、序列化和时区等参数。
  2. 定义简单的加法和乘法任务。
  3. 启动Celery Worker,展示启动命令、控制台输出及输出解读。
  4. 启动Celery Flower监控工具,给出启动命令、输出结果和界面访问方式。
  5. 演示如何触发任务,介绍异步执行模型,并给出任务触发代码、输出结果及解读。

通过本篇文章,可以对celery的使用有一个清晰的认知,最主要的是如何通过task的触发来实现异步执行,然后通过异步结果对象进行任务状态、属性和结果的获取。

Celery安装

安装 Celery

pip install celery

安装 Celery同时安装依赖

我们使用redis作为消息代理节点(broker)和存储代理节点(Backend),这里也同时安装redis的相关依赖

pip install celery[redis]

如何使用

1. 首先定义Celery的app

# 创建 Celery 应用实例
app = Celery('basic_tasks')

# 配置消息代理和结果后端
app.conf.broker_url = 'redis://localhost:6379/0'
app.conf.result_backend = 'redis://localhost:6379/1'

# 任务序列化配置
app.conf.task_serializer = 'json'
app.conf.accept_content = ['json']
app.conf.result_serializer = 'json'

# 时区配置
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = True

2. 定义任务

@app.task(name='add')
def add(x, y):
    """
    简单的加法任务
    演示最基本的任务定义
    """
    print(f"执行加法任务: {x} + {y}")
    return x + y

@app.task(name='multiply')
def multiply(x, y):
    """
    乘法任务
    演示带参数的任务
    """
    print(f"执行乘法任务: {x} * {y}")
    return x * y

@app.task(name='sum_all')
def sum_all(data_list):
    """
    求和任务
    演示求和任务,传入一个列表,返回列表中所有元素的和
    """
    print(f"执行求和任务: {data_list}")
    return sum(data_list)    

3. 启动Celery Worker

1. 启动命令

celery -A basic_tasks worker --loglevel=info -E

2. 控制台输出结果

[root@localhost online]# celery -A basic_tasks worker --loglevel=info -E

User information: uid=0 euid=0 gid=0 egid=0

warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format(

-------------- celery@localhost.localdomain v5.5.1 (immunity)
--- ***** ----- 
-- ******* ---- Linux-5.10.0-182.0.0.95.oe2203sp3.x86_64-x86_64-with-glibc2.34 19:04:56
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         basic_tasks:0x7f7954b096d0
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 64 (prefork)
-- ******* ---- .> task events: ON
--- ***** ----- 
-------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
. add
. multiply

[19:04:59,073: INFO/MainProcess] Connected to redis://localhost:6379/0
[19:04:59,077: INFO/MainProcess] mingle: searching for neighbors
[19:05:00,088: INFO/MainProcess] mingle: all alone
[19:05:00,106: INFO/MainProcess] celery@localhost.localdomain ready.

3. 输出结果解读

  1. 输出结果中包含了Celery的版本信息为v5.5.1
  2. config部分说明了Celery的配置信息,包括app名称、消息代理和结果后端、任务序列化配置、时区配置等。
  3. queue部分说明Celery使用的队列信息,有一个direct类型的exchange,key为celery,用于接收默认的任务队列。
  4. tasks部分列出了Celery应用中我们自定义的任务,包括了add和multiply两个任务。

4. 启动Celery Flower

1. 启动命令

celery -A basic_tasks flower --loglevel=info

2. 控制台输出结果

[root@localhost online]# celery -A basic_tasks flower --loglevel=info
[I 19:13:54 command:168] Visit me at http://0.0.0.0:5555
[I 19:13:54 command:176] Broker: redis://localhost:6379/0
[I 19:13:54 command:177] Registered tasks: 
    ['add',
    'celery.accumulate',
    'celery.backend_cleanup',
    'celery.chain',
    'celery.chord',
    'celery.chord_unlock',
    'celery.chunks',
    'celery.group',
    'celery.map',
    'celery.starmap',
    'multiply']
[I 19:13:54 mixins:228] Connected to redis://localhost:6379/0

3. 输出结果解读

  1. 可以通过访问http://【服务器IP】:5555,来查看Celery Flower的监控界面,包括任务队列、任务状态、任务统计等信息。
  2. Broker部分说明Celery使用的消息代理节点为redis://localhost:6379/0
  3. Registered tasks部分列出了Celery应用中注册的任务,包括了我们自定义的add和multiply任务,以及Celery提供的一些默认任务。

4. 访问Celery Flower监控界面

  1. 打开浏览器,访问http://【服务器IP】:5555
  2. 可以在监控界面查看任务队列、任务状态、任务统计等信息。
    1. worker信息 worker_info
    2. 任务信息 task_info
    3. add任务的具体信息 specific_task_info

5. 触发生成任务

1. 触发任务

from basic_tasks import add, multiply

# 触发加法任务
async_result = add.delay(2, 3)
print(async_result.id)  # 输出任务ID
print(async_result.get())

# 触发乘法任务
async_result = multiply.delay(4, 5)
print(async_result.id)  # 输出任务ID
print(async_result.get())

2. 控制台输出结果

e650fd68-40b2-4204-b08d-347f170b824a # 加法任务ID
5 # 2 + 3
f56e5599-68b5-4218-a202-180ea4dc32b6 # 乘法任务ID
20 # 4 * 5

3. 输出结果解读

  1. 首先import相关的任务定义,这里是add和multiply
  2. 然后通过调用delay方法来触发任务,这里是add.delay(2, 3)和multiply.delay(4, 5)
  3. 返回的result对象不是对应任务的结果,而是一个任务状态对象,用于查询任务的执行状态和结果。这就是Celery的异步执行模型的关键
    1. 通过调用result.id可以获取任务的ID,另外还有很多其他属性,比如state、status、info等,用于查询任务的执行状态和结果。
    2. 通过调用result.get()可以阻塞等待任务执行完成,并返回任务的结果。如果任务还未完成,会一直阻塞等待,直到任务完成。
    3. 从输出结果可知:
      1. 加法任务ID为e650fd68-40b2-4204-b08d-347f170b824a,结果为5,说明2 + 3 = 5。
      2. 乘法任务ID为f56e5599-68b5-4218-a202-180ea4dc32b6,结果为20,说明4 * 5 = 20。

DEVELOP · CELERY
develop python3 celery