Personal Blog

develop

Celery代码整合

将Celery系列中所有例子涉及到的代码整合在一起

Table of Contents

代码

整个Celery系列涉及到的概念通过代码总结,致力于将每一个概念用最简单的代码来说明其功能,辅助读者更好的理解Celery本身的概念。

最后,恭喜你看完了本系列的所有文章,希望你能对Celery有一个更深入的理解。

最后的最后作者强调:此类文章只是快速入门,作为一个合格的开发者,能够一点点啃下官方文档才是真正的核心能力,不要被英文吓到,日拱一卒方可至。

Celery系列完结撒花~

import time
from celery import Celery, chain, chord, group
from celery.exceptions import SoftTimeLimitExceeded

# 创建 Celery 应用实例
app = Celery('basic_tasks')

# 配置消息代理和结果后端
app.conf.broker_url = 'redis://localhost:26379/4'
app.conf.result_backend = 'redis://localhost:26379/5'

# 任务序列化配置
app.conf.task_serializer = 'json'
app.conf.accept_content = ['json']
app.conf.result_serializer = 'json'

# 时区配置
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = True

app.conf.beat_schedule = {
    'cron_task': {
        'task': 'cron_task',  # 任务函数路径
        'schedule': 10.0,  # 每 10 秒执行一次
        'args': (),  # 任务参数
    },
}

@app.task(name='add')
def add(x, y):
    """
    简单的加法任务
    演示最基本的任务定义
    """
    print(f"执行加法任务: {x} + {y}")
    return x + y


@app.task(name='multiply')
def multiply(x, y):
    """
    乘法任务
    演示带参数的任务
    """
    print(f"执行乘法任务: {x} * {y}")
    return x * y

@app.task(name='sum_all')
def sum_all(data_list):
    """
    求和任务
    演示求和任务,传入一个列表,返回列表中所有元素的和
    """
    print(f"执行求和任务: {data_list}")
    return sum(data_list)

@app.task(name='exception_demo')
def exception_demo():
    """
    异常任务
    演示异常任务
    """
    print(f"执行抛出异常任务")
    raise Exception(f"执行抛出异常任务")

@app.task(bind=True, name='exception_recover')
def exception_recover(self):
    """
    异常恢复任务
    演示异常恢复机制
    """
    try:
        if self.request.retries < 2:
            raise Exception(f"执行第{self.request.retries+1}次抛出异常任务")
        else:
            return "异常恢复任务成功"
    except Exception as e:
        print(f"异常恢复任务失败: {str(e)}")
        raise self.retry(exc=e, countdown=10, max_retries=3) # 重试3次,每次间隔10秒

@app.task(
    bind=True, 
    time_limit=10,         # 硬超时:10秒后强制终止
    soft_time_limit=8,     # 软超时:8秒后抛出异常
    name='timeout_task'
)
def timeout_task(self, duration):
    try:
        print(f"任务开始,预计运行 {duration} 秒...")
        # 模拟耗时操作(如处理数据、网络请求等)
        time.sleep(duration)
        return f"任务成功完成,实际运行 {duration}"
    
    except SoftTimeLimitExceeded:
        # 捕获软超时异常,执行清理逻辑
        print("触发软超时!执行紧急清理...")
        # 例如:关闭数据库连接、保存中间结果等
        time.sleep(4)
        print("紧急清理完成")
        raise  # 重新抛出异常,标记任务为失败
    
    finally:
        print("任务结束(无论成功/失败)")

# 定义任务时设置默认速率:每秒最多 1 次
@app.task(rate_limit="1/s", name="rate_limit_task")
def rate_limit_task():
    """速率限制任务"""
    print(f"任务开始时间: {time.time()}")
    return f"速率限制任务执行成功: {time.time()}"

# 定时任务
@app.task(name="cron_task")
def cron_task():
    """定时任务"""
    print(f"定时任务执行时间: {time.time()}")
    return f"定时任务执行成功: {time.time()}"
    
if __name__ == '__main__':
    # 触发加法任务
    async_result = add.delay(2, 3)
    print("加法任务ID:", async_result.id)  # 输出任务ID
    print(async_result.get())

    # 触发乘法任务
    async_result = multiply.delay(4, 5)
    print("乘法任务ID:", async_result.id)  # 输出任务ID
    print(async_result.get())
    
    # 触发异常任务
    async_result = exception_demo.delay()
    print("异常任务ID:", async_result.id)  # 输出任务ID
    try:
        task_result = async_result.get() # 不可能执行成功
    except Exception as e:
        print(f"异常任务失败: {str(e)}")
        print(f"异常任务状态: {async_result.status}")
        print(f"异常任务信息: {async_result.info}")
        
    # 触发异常恢复任务
    async_result = exception_recover.delay()
    print("异常恢复任务ID:", async_result.id)  # 输出任务ID
    try:
        task_result = async_result.get() # retry次数小于2次时,会抛出异常,否则会返回成功
        print(f"异常恢复任务成功: {task_result}")
    except Exception as e:
        print(f"异常恢复任务失败: {str(e)}")

    # 触发超时任务
    async_result = timeout_task.delay(15)
    print("超时任务ID:", async_result.id)  # 输出任务ID
    try:
        task_result = async_result.get() # retry次数小于2次时,会抛出异常,否则会返回成功
        print(f"超时任务执行成功: {task_result}")
    except Exception as e:
        print(f"超时任务执行失败: {str(e)}")
        
    # 循环触发速率限制任务5次
    for i in range(3):
        async_result = rate_limit_task.delay()
        print(f"{i+1}次速率限制任务ID:", async_result.id)  # 输出任务ID
        task_result = async_result.get() # 每秒最多执行1次
        print(f"{i+1}次速率限制任务执行成功: {task_result}")
        
    # 定时任务不需要手动执行,会按照配置的定时任务执行,注意查看beat\worker日志
    
    # chain示例
    workflow = chain(
        add.s(10, 20) | multiply.s(2)
    )
    workflow_result = workflow.apply_async()
    final_result = workflow_result.get()
    print(f"链式任务最终结果: {final_result}")
    
    # group示例
    workflow = group(
        add.s(10, 20),
        multiply.s(10, 20)
    )
    workflow_result = workflow.apply_async()
    final_result = workflow_result.get()
    print(f"组式任务结果: {final_result}")
    
    # chord示例
    group_workflow = group(
        add.s(10, 20),
        multiply.s(10, 20)
    )
    chord_workflow = chord(
        group_workflow,
        sum_all.s()
    )
    workflow_result = chord_workflow.apply_async()
    final_result = workflow_result.get()
    print(f"弦式任务结果: {final_result}")
    
    # map示例
    map_workflow = sum_all.map([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]])
    workflow_result = map_workflow.apply_async()
    final_result = workflow_result.get()
    print(f"map式任务结果: {final_result}")
    
    #chunk示例
    chunk_workflow = add.chunks(
        zip(range(6), range(6)),
        3
    )
    workflow_result = chunk_workflow.apply_async()
    final_result = workflow_result.get()
    print(f"chunk式任务结果: {final_result}")

DEVELOP · CELERY
develop python3 celery