运行期间添加到DAG中的任务未能安排

我的想法是有一个任务foo,它生成输入列表(用户,报告,日志文件等),并为输入列表中的每个元素启动任务。目标是利用Airflow的重试和其他逻辑,而不是重新实现它。运行期间添加到DAG中的任务未能安排

所以,理想情况下,我应该DAG看起来是这样的:

这里唯一的变量是生成的任务数。在完成所有这些任务之后,我想做更多的任务,因此为每项任务启动新的DAG似乎并不合适。

这是我的代码:

default_args = { 

'owner': 'airflow',

'depends_on_past': False,

'start_date': datetime(2015, 6, 1)

}

dag = DAG('dynamic_dag_generator', schedule_interval=None, default_args=default_args)

foo_operator = BashOperator(

task_id='foo',

bash_command="echo '%s'" % json.dumps(range(0, random.randint(40,60))),

xcom_push=True,

dag=dag)

def gen_nodes(**kwargs):

ti = kwargs['ti']

workers = json.loads(ti.xcom_pull(task_ids='foo'))

for wid in workers:

print("Iterating worker %s" % wid)

op = PythonOperator(

task_id='test_op_%s' % wid,

python_callable=lambda: print("Dynamic task!"),

dag=dag

)

op.set_downstream(bar_operator)

op.set_upstream(dummy_op)

gen_subdag_node_op = PythonOperator(

task_id='gen_subdag_nodes',

python_callable=gen_nodes,

provide_context=True,

dag=dag

)

gen_subdag_node_op.set_upstream(foo_operator)

dummy_op = DummyOperator(

task_id='dummy',

dag=dag

)

dummy_op.set_upstream(gen_subdag_node_op)

bar_operator = DummyOperator(

task_id='bar',

dag=dag)

bar_operator.set_upstream(dummy_op)

在日志,我可以看到,gen_nodes正确执行(即Iterating worker 5等)。但是,新任务没有安排,也没有证据表明它们已经被执行。

我在网上找到相关的代码示例such as this,但无法使其工作。我错过了什么吗?

另外,是否有更合适的方法来解决这个问题(隔离单位的工作)?

回答:

此时,气流不支持在dag运行时添加/删除任务。

工作流程顺序将是在DAG运行开始时评估的任何内容。

See the second paragraph here.

这意味着基于发生的事情在运行,你不能添加/删除任务。您可以基于与运行无关的某些内容在for循环中添加X任务,但在运行开始后,不会更改工作流程形状/顺序。

很多时候,您可以使用BranchPythonOperator来做出决定,并且这些决定可以基于您的xcom值,但它们必须决定是否已经存在于工作流中。

Dag运行,并且Dag定义在气流中以不完全直观的方式分开,但或多或​​少在dag运行(xcomdag_run.conf等)中创建/生成的任何内容不可用于定义达格本身。

以上是 运行期间添加到DAG中的任务未能安排 的全部内容, 来源链接: utcz.com/qa/261678.html

回到顶部