Pyspark 2.4.0,使用读取流从kafka读取avro-Python

我正在尝试使用PySpark 2.4.0从Kafka读取avro消息。

spark-avro外部模块可以为读取avro文件提供以下解决方案:

df = spark.read.format("avro").load("examples/src/main/resources/users.avro") 

df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

但是,我需要阅读流式Avro消息。库文档建议使用 from_avro() 函数,该函数仅适用于Scala和Java。

是否有其他模块支持读取从Kafka流式传输的Avro消息?

回答:

您可以包括spark-avro软件包,例如使用--packages(调整版本以匹配spark安装):

bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.0

并提供您自己的包装器:

from pyspark.sql.column import Column, _to_java_column

def from_avro(col, jsonFormatSchema):

sc = SparkContext._active_spark_context

avro = sc._jvm.org.apache.spark.sql.avro

f = getattr(getattr(avro, "package$"), "MODULE$").from_avro

return Column(f(_to_java_column(col), jsonFormatSchema))

def to_avro(col):

sc = SparkContext._active_spark_context

avro = sc._jvm.org.apache.spark.sql.avro

f = getattr(getattr(avro, "package$"), "MODULE$").to_avro

return Column(f(_to_java_column(col)))

用法示例(从官方测试套件中采用):

from pyspark.sql.functions import col, struct

avro_type_struct = """

{

"type": "record",

"name": "struct",

"fields": [

{"name": "col1", "type": "long"},

{"name": "col2", "type": "string"}

]

}"""

df = spark.range(10).select(struct(

col("id"),

col("id").cast("string").alias("id2")

).alias("struct"))

avro_struct_df = df.select(to_avro(col("struct")).alias("avro"))

avro_struct_df.show(3)

+----------+

| avro|

+----------+

|[00 02 30]|

|[02 02 31]|

|[04 02 32]|

+----------+

only showing top 3 rows

avro_struct_df.select(from_avro("avro", avro_type_struct)).show(3)

+------------------------------------------------+

|from_avro(avro, struct<col1:bigint,col2:string>)|

+------------------------------------------------+

| [0, 0]|

| [1, 1]|

| [2, 2]|

+------------------------------------------------+

only showing top 3 rows

以上是 Pyspark 2.4.0,使用读取流从kafka读取avro-Python 的全部内容, 来源链接: utcz.com/qa/411765.html

回到顶部