初始化具有不同值的不同Celery Workers

我正在使用celery在Hadoop上运行长时间运行的任务。每个任务都会在Hadoop上执行Pig脚本,该脚本运行大约30分钟-2小时。

我当前的Hadoop设置有4个队列a,b,c和默认队列。当前,所有任务都由一个工人执行,该工人将作业提交到单个队列中。

我想再添加3个将作业提交到其他队列的工作程序,每个队列一个工作程序。

问题是队列当前是硬编码的,我希望为每个工作人员设置此变量。

我进行了很多搜索,但无法找到一种方法来传递每个Celery Workers不同的队列值并在任务中访问它。

我像这样开始我的Celery Workers。

celery -A app.celery worker

我希望在命令行本身中传递一些其他参数,并在我的任务中访问它,但是celery抱怨它不理解我的自定义参数。

我计划通过设置–concurrency=3参数在同一主机上运行所有工作线程。有什么解决办法吗?

当前场景是这样的。我每次尝试执行任务print_something时都说tasks.print_something.delay()只打印队列C。

@celery.task()

def print_something():

print "C"

我需要让工人根据我在启动时传递给他们的值来打印可变字母。

@celery.task()

def print_something():

print "<Variable Value Per Worker Here>"

回答:

第一步涉及在celery中添加对自定义参数的支持。如果不这样做,celery将抱怨它不理解该参数。

由于我用Flask运行celery,所以我像这样初始化celery。

def configure_celery():

app.config.update(

CELERY_BROKER_URL='amqp://:@localhost:5672',

RESULT_BACKEND='db+mysql://root:@localhost:3306/<database_name>'

)

celery = Celery(app.import_name, backend=app.config['RESULT_BACKEND'],

broker=app.config['CELERY_BROKER_URL'])

celery.conf.update(app.config)

TaskBase = celery.Task

class ContextTask(TaskBase):

abstract = True

def __call__(self, *args, **kwargs):

with app.app_context():

return TaskBase.__call__(self, *args, **kwargs)

celery.Task = ContextTask

return celery

我调用此函数来初始化celery并将其存储在一个名为celery的变量中。

celery = configure_celery()

要添加自定义参数,你需要执行以下操作。

def add_hadoop_queue_argument_to_worker(parser):

parser.add_argument(

'--hadoop-queue', help='Hadoop queue to be used by the worker'

)

下面使用的celery是我们从上述步骤中获得的celery。

celery.user_options['worker'].add(add_hadoop_queue_argument_to_worker)

下一步将是使该参数在工作程序中可访问。为此,请按照下列步骤操作。

class HadoopCustomWorkerStep(bootsteps.StartStopStep):

def __init__(self, worker, **kwargs):

worker.app.hadoop_queue = kwargs['hadoop_queue']

通知celery使用此类来创建工人。

celery.steps['worker'].add(HadoopCustomWorkerStep)

现在,任务应该能够访问变量了。

@app.task(bind=True)

def print_hadoop_queue_from_config(self):

print self.app.hadoop_queue

通过在命令行上运行worker进行验证。

celery -A app.celery worker --concurrency=1 --hadoop-queue=A -n aworker@%h

celery -A app.celery worker --concurrency=1 --hadoop-queue=B -n bworker@%h

celery -A app.celery worker --concurrency=1 --hadoop-queue=C -n cworker@%h

celery -A app.celery worker --concurrency=1 --hadoop-queue=default -n defaultworker@%h

以上是 初始化具有不同值的不同Celery Workers 的全部内容, 来源链接: utcz.com/qa/427543.html

回到顶部