develop
Celery代码整合
Table of Contents
代码
整个Celery系列涉及到的概念通过代码总结,致力于将每一个概念用最简单的代码来说明其功能,辅助读者更好的理解Celery本身的概念。
最后,恭喜你看完了本系列的所有文章,希望你能对Celery有一个更深入的理解。
最后的最后作者强调:此类文章只是快速入门,作为一个合格的开发者,能够一点点啃下官方文档才是真正的核心能力,不要被英文吓到,日拱一卒方可至。
Celery系列完结撒花~
import time
from celery import Celery, chain, chord, group
from celery.exceptions import SoftTimeLimitExceeded
# 创建 Celery 应用实例
app = Celery('basic_tasks')
# 配置消息代理和结果后端
app.conf.broker_url = 'redis://localhost:26379/4'
app.conf.result_backend = 'redis://localhost:26379/5'
# 任务序列化配置
app.conf.task_serializer = 'json'
app.conf.accept_content = ['json']
app.conf.result_serializer = 'json'
# 时区配置
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = True
app.conf.beat_schedule = {
'cron_task': {
'task': 'cron_task', # 任务函数路径
'schedule': 10.0, # 每 10 秒执行一次
'args': (), # 任务参数
},
}
@app.task(name='add')
def add(x, y):
"""
简单的加法任务
演示最基本的任务定义
"""
print(f"执行加法任务: {x} + {y}")
return x + y
@app.task(name='multiply')
def multiply(x, y):
"""
乘法任务
演示带参数的任务
"""
print(f"执行乘法任务: {x} * {y}")
return x * y
@app.task(name='sum_all')
def sum_all(data_list):
"""
求和任务
演示求和任务,传入一个列表,返回列表中所有元素的和
"""
print(f"执行求和任务: {data_list}")
return sum(data_list)
@app.task(name='exception_demo')
def exception_demo():
"""
异常任务
演示异常任务
"""
print(f"执行抛出异常任务")
raise Exception(f"执行抛出异常任务")
@app.task(bind=True, name='exception_recover')
def exception_recover(self):
"""
异常恢复任务
演示异常恢复机制
"""
try:
if self.request.retries < 2:
raise Exception(f"执行第{self.request.retries+1}次抛出异常任务")
else:
return "异常恢复任务成功"
except Exception as e:
print(f"异常恢复任务失败: {str(e)}")
raise self.retry(exc=e, countdown=10, max_retries=3) # 重试3次,每次间隔10秒
@app.task(
bind=True,
time_limit=10, # 硬超时:10秒后强制终止
soft_time_limit=8, # 软超时:8秒后抛出异常
name='timeout_task'
)
def timeout_task(self, duration):
try:
print(f"任务开始,预计运行 {duration} 秒...")
# 模拟耗时操作(如处理数据、网络请求等)
time.sleep(duration)
return f"任务成功完成,实际运行 {duration} 秒"
except SoftTimeLimitExceeded:
# 捕获软超时异常,执行清理逻辑
print("触发软超时!执行紧急清理...")
# 例如:关闭数据库连接、保存中间结果等
time.sleep(4)
print("紧急清理完成")
raise # 重新抛出异常,标记任务为失败
finally:
print("任务结束(无论成功/失败)")
# 定义任务时设置默认速率:每秒最多 1 次
@app.task(rate_limit="1/s", name="rate_limit_task")
def rate_limit_task():
"""速率限制任务"""
print(f"任务开始时间: {time.time()}")
return f"速率限制任务执行成功: {time.time()}"
# 定时任务
@app.task(name="cron_task")
def cron_task():
"""定时任务"""
print(f"定时任务执行时间: {time.time()}")
return f"定时任务执行成功: {time.time()}"
if __name__ == '__main__':
# 触发加法任务
async_result = add.delay(2, 3)
print("加法任务ID:", async_result.id) # 输出任务ID
print(async_result.get())
# 触发乘法任务
async_result = multiply.delay(4, 5)
print("乘法任务ID:", async_result.id) # 输出任务ID
print(async_result.get())
# 触发异常任务
async_result = exception_demo.delay()
print("异常任务ID:", async_result.id) # 输出任务ID
try:
task_result = async_result.get() # 不可能执行成功
except Exception as e:
print(f"异常任务失败: {str(e)}")
print(f"异常任务状态: {async_result.status}")
print(f"异常任务信息: {async_result.info}")
# 触发异常恢复任务
async_result = exception_recover.delay()
print("异常恢复任务ID:", async_result.id) # 输出任务ID
try:
task_result = async_result.get() # retry次数小于2次时,会抛出异常,否则会返回成功
print(f"异常恢复任务成功: {task_result}")
except Exception as e:
print(f"异常恢复任务失败: {str(e)}")
# 触发超时任务
async_result = timeout_task.delay(15)
print("超时任务ID:", async_result.id) # 输出任务ID
try:
task_result = async_result.get() # retry次数小于2次时,会抛出异常,否则会返回成功
print(f"超时任务执行成功: {task_result}")
except Exception as e:
print(f"超时任务执行失败: {str(e)}")
# 循环触发速率限制任务5次
for i in range(3):
async_result = rate_limit_task.delay()
print(f"第{i+1}次速率限制任务ID:", async_result.id) # 输出任务ID
task_result = async_result.get() # 每秒最多执行1次
print(f"第{i+1}次速率限制任务执行成功: {task_result}")
# 定时任务不需要手动执行,会按照配置的定时任务执行,注意查看beat\worker日志
# chain示例
workflow = chain(
add.s(10, 20) | multiply.s(2)
)
workflow_result = workflow.apply_async()
final_result = workflow_result.get()
print(f"链式任务最终结果: {final_result}")
# group示例
workflow = group(
add.s(10, 20),
multiply.s(10, 20)
)
workflow_result = workflow.apply_async()
final_result = workflow_result.get()
print(f"组式任务结果: {final_result}")
# chord示例
group_workflow = group(
add.s(10, 20),
multiply.s(10, 20)
)
chord_workflow = chord(
group_workflow,
sum_all.s()
)
workflow_result = chord_workflow.apply_async()
final_result = workflow_result.get()
print(f"弦式任务结果: {final_result}")
# map示例
map_workflow = sum_all.map([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]])
workflow_result = map_workflow.apply_async()
final_result = workflow_result.get()
print(f"map式任务结果: {final_result}")
#chunk示例
chunk_workflow = add.chunks(
zip(range(6), range(6)),
3
)
workflow_result = chunk_workflow.apply_async()
final_result = workflow_result.get()
print(f"chunk式任务结果: {final_result}")
DEVELOP · CELERY
develop python3 celery
