初始化具有不同值的不同Celery Workers
我正在使用celery在Hadoop上运行长时间运行的任务。每个任务都会在Hadoop上执行Pig脚本,该脚本运行大约30分钟-2小时。
我当前的Hadoop设置有4个队列a,b,c和默认队列。当前,所有任务都由一个工人执行,该工人将作业提交到单个队列中。
我想再添加3个将作业提交到其他队列的工作程序,每个队列一个工作程序。
问题是队列当前是硬编码的,我希望为每个工作人员设置此变量。
我进行了很多搜索,但无法找到一种方法来传递每个Celery Workers不同的队列值并在任务中访问它。
我像这样开始我的Celery Workers。
celery -A app.celery worker
我希望在命令行本身中传递一些其他参数,并在我的任务中访问它,但是celery抱怨它不理解我的自定义参数。
我计划通过设置–concurrency=3
参数在同一主机上运行所有工作线程。有什么解决办法吗?
当前场景是这样的。我每次尝试执行任务print_something
时都说tasks.print_something.delay()
只打印队列C。
@celery.task()def print_something():
print "C"
我需要让工人根据我在启动时传递给他们的值来打印可变字母。
@celery.task()def print_something():
print "<Variable Value Per Worker Here>"
回答:
第一步涉及在celery中添加对自定义参数的支持。如果不这样做,celery将抱怨它不理解该参数。
由于我用Flask运行celery,所以我像这样初始化celery。
def configure_celery(): app.config.update(
CELERY_BROKER_URL='amqp://:@localhost:5672',
RESULT_BACKEND='db+mysql://root:@localhost:3306/<database_name>'
)
celery = Celery(app.import_name, backend=app.config['RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
我调用此函数来初始化celery并将其存储在一个名为celery的变量中。
celery = configure_celery()
要添加自定义参数,你需要执行以下操作。
def add_hadoop_queue_argument_to_worker(parser): parser.add_argument(
'--hadoop-queue', help='Hadoop queue to be used by the worker'
)
下面使用的celery是我们从上述步骤中获得的celery。
celery.user_options['worker'].add(add_hadoop_queue_argument_to_worker)
下一步将是使该参数在工作程序中可访问。为此,请按照下列步骤操作。
class HadoopCustomWorkerStep(bootsteps.StartStopStep): def __init__(self, worker, **kwargs):
worker.app.hadoop_queue = kwargs['hadoop_queue']
通知celery使用此类来创建工人。
celery.steps['worker'].add(HadoopCustomWorkerStep)
现在,任务应该能够访问变量了。
@app.task(bind=True)def print_hadoop_queue_from_config(self):
print self.app.hadoop_queue
通过在命令行上运行worker进行验证。
celery -A app.celery worker --concurrency=1 --hadoop-queue=A -n aworker@%hcelery -A app.celery worker --concurrency=1 --hadoop-queue=B -n bworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=C -n cworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=default -n defaultworker@%h
以上是 初始化具有不同值的不同Celery Workers 的全部内容, 来源链接: utcz.com/qa/427543.html