如何在Airflow中运行Spark代码?

地球人你好!我正在使用Airflow计划和运行Spark任务。我这次发现的所有内容都是Airflow可以管理的python DAG。

DAG示例:

spark_count_lines.py

import logging

from airflow import DAG

from airflow.operators import PythonOperator

from datetime import datetime

args = {

'owner': 'airflow'

, 'start_date': datetime(2016, 4, 17)

, 'provide_context': True

}

dag = DAG(

'spark_count_lines'

, start_date = datetime(2016, 4, 17)

, schedule_interval = '@hourly'

, default_args = args

)

def run_spark(**kwargs):

import pyspark

sc = pyspark.SparkContext()

df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')

logging.info('Number of lines in people.txt = {0}'.format(df.count()))

sc.stop()

t_main = PythonOperator(

task_id = 'call_spark'

, dag = dag

, python_callable = run_spark

)

问题是我的Python代码不好,并且有一些用Java编写的任务。我的问题是如何在python DAG中运行Spark Java

jar?或者,也许还有其他方法吗?我发现了spark提交:http : //spark.apache.org/docs/latest/submitting-

applications.html

但我不知道如何将所有内容连接在一起。也许有人以前使用过它并有可行的例子。感谢您的时间!

回答:

您应该可以使用BashOperator。保持其余代码不变,导入所需的类和系统软件包:

from airflow.operators.bash_operator import BashOperator

import os

import sys

设置所需的路径:

os.environ['SPARK_HOME'] = '/path/to/spark/root'

sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))

并添加运算符:

spark_task = BashOperator(

task_id='spark_java',

bash_command='spark-submit --class {{ params.class }} {{ params.jar }}',

params={'class': 'MainClassName', 'jar': '/path/to/your.jar'},

dag=dag

)

您可以使用Jinja模板轻松扩展它以提供其他参数。

您当然可以通过替换bash_command适合您情况的模板来针对非火花场景进行调整,例如:

bash_command = 'java -jar {{ params.jar }}'

和调整params

以上是 如何在Airflow中运行Spark代码? 的全部内容, 来源链接: utcz.com/qa/428506.html

回到顶部