celery任务和自定义装饰器

我正在使用django和celery(django-celery)进行项目。我们的团队决定将所有数据访问代码(app-name)/manager.py包装在其中(不要像django这样包装到Manager中),而将代码放入(应用程序名称)/task.py中,仅处理用celery组装和执行任务(因此我们没有django在这一层的ORM依赖性)。

在我的manager.py,我有这样的事情:

def get_tag(tag_name):

ctype = ContentType.objects.get_for_model(Photo)

try:

tag = Tag.objects.get(name=tag_name)

except ObjectDoesNotExist:

return Tag.objects.none()

return tag

def get_tagged_photos(tag):

ctype = ContentType.objects.get_for_model(Photo)

return TaggedItem.objects.filter(content_type__pk=ctype.pk, tag__pk=tag.pk)

def get_tagged_photos_count(tag):

return get_tagged_photos(tag).count()

在我的task.py中,我喜欢将它们包装成任务(然后可以使用这些任务来完成更复杂的任务),因此我编写了这个装饰器:

import manager #the module within same app containing data access functions

class mfunc_to_task(object):

def __init__(mfunc_type='get'):

self.mfunc_type = mfunc_type

def __call__(self, f):

def wrapper_f(*args, **kwargs):

callback = kwargs.pop('callback', None)

mfunc = getattr(manager, f.__name__)

result = mfunc(*args, **kwargs)

if callback:

if self.mfunc_type == 'get':

subtask(callback).delay(result)

elif self.mfunc_type == 'get_or_create':

subtask(callback).delay(result[0])

else:

subtask(callback).delay()

return result

return wrapper_f

然后(仍在task.py):

#@task

@mfunc_to_task()

def get_tag():

pass

#@task

@mfunc_to_task()

def get_tagged_photos():

pass

#@task

@mfunc_to_task()

def get_tagged_photos_count():

pass

一切正常@task。但是,在应用了该@task装饰器之后(按照celery文档的指示放到顶部),事情就开始崩溃了。显然,每次mfunc_to_task.__call__调用时,task.get_tag都会传递与相同的函数f。所以我wrapper_f每次都得到相同的结果,现在我唯一要做的就是得到一个标签。

回答:

不太确定为什么传递参数无效?

如果使用此示例:

@task()

def add(x, y):

return x + y

让我们向MyCoolTask​​添加一些日志记录:

from celery import task

from celery.registry import tasks

import logging

import celery

logger = logging.getLogger(__name__)

class MyCoolTask(celery.Task):

def __call__(self, *args, **kwargs):

"""In celery task this function call the run method, here you can

set some environment variable before the run of the task"""

logger.info("Starting to run")

return self.run(*args, **kwargs)

def after_return(self, status, retval, task_id, args, kwargs, einfo):

#exit point of the task whatever is the state

logger.info("Ending run")

pass

并创建一个扩展类(扩展MyCoolTask​​,但现在带有参数):

class AddTask(MyCoolTask):

def run(self,x,y):

if x and y:

result=add(x,y)

logger.info('result = %d' % result)

return result

else:

logger.error('No x or y in arguments')

tasks.register(AddTask)

并确保将kwargs作为json数据传递:

{"x":8,"y":9}

我得到结果:

[2019-03-05 17:30:25,853: INFO/MainProcess] Starting to run

[2019-03-05 17:30:25,855: INFO/MainProcess] result = 17

[2019-03-05 17:30:26,739: INFO/MainProcess] Ending run

[2019-03-05 17:30:26,741: INFO/MainProcess] Task iamscheduler.tasks.AddTask[6a62641d-16a6-44b6-a1cf-7d4bdc8ea9e0] succeeded in 0.888684988022s: 17

以上是 celery任务和自定义装饰器 的全部内容, 来源链接: utcz.com/qa/434326.html

回到顶部