任务
Celery的任务装饰器 分两种,一种是基本任务app.task
, 一种是绑定任务app.task(bind=True)
绑定任务
基本任务仅仅提供异步执行的一个功能, 对于被装饰的函数本身想要获取任务信息, 或者遇到错误需要进行重试操作则需要装饰绑定任务
获取任务信息
绑定任务中被装饰函数的第一个参数为任务本身,也就是self
, self
中有个request
属性包含了任务本身的状态和其他信息
@app.task(bind=True)
def test(self, ):
print(self.request.__dict__)
return None
test.delay()
执行这段代码后,会在celery的worker终端打印出本次任务request的各项属性
发起http请求的例子
写个发起http请求的任务,try捕捉超时错误,最大重试次数为两次
import requests
@app.task(bind=True, max_retries=2)
def send_request(self, url):
try:
response = requests.get(url)
return response
except ConnectionError as e:
print('retry')
raise self.retry(exc=e)
执行send_request.delay("https://www.google.com/")
,得到结果
[INFO/MainProcess] Received task: tasks.send_request
[WARNING/ForkPoolWorker-1] retry
[INFO/MainProcess] Received task: tasks.send_request
可以看到try捕捉到异常后输出了一次retry然后又重新执行了一次send_request
函数
自定义task类
利用celery.Task
作为基类,可以对任务类进一步封装, Task
类已经提供了各种接口,重载这些接口,可以很方便的实现各种功能
举个记录日志的例子:
from celery import Task
import logging
import requests
class LogTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
# 当任务结束时执行
logging.error('task [%s] running failed' % task_id)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
# 任务return后的回调函数
logging.info('task [%s] return [%s]' % (task_id, retval))
def on_retry(self, exc, task_id, args, kwargs, einfo):
# 任务重试回调
logging.warning('task [%s] has been retry' % task_id)
def on_success(self, retval, task_id, args, kwargs):
# 任务成功回调
logging.info('task [%s] has been success.' % task_id)
@app.task(bind=True, base=LogTask, max_retries=2)
def send_request(self, url):
try:
response = requests.get(url)
return response
except ConnectionError as e:
raise self.retry(exc=e)
关于任务类的request属性具体见
app.Task.request
关于绑定任务的各种可传入参数具体见list options
关于任务类的各种接口具体见Handlers
感谢楼主的分析,赞赞。