用python语言使用spark streaming 读取kafka数据?

用python语言使用spark streaming 读取kafka数据?

各位大神, 求解!

用python来实现spark streaming 读取kafka数据

本人首先查阅了大量的资料, 最开始使用spark-streaming-kafka,经过踩坑后返现spark-streaming-kafka-0.8版本的支持python语言,但是不支持kafka的身份认证。更高版本的只支持scala和java, 最后无奈放弃。

后来参考了官方文档:
https://spark.apache.org/docs/2.4.0/structured-streaming-kafk...

看了官方文档之后打算使用spark-sql-kafka-0-10_2.11,这个应该是支持kafka身份认证的。
但是写的代码报错,报错如下:

py4j.protocol.Py4JJavaError: An error occurred while calling o39.load.

: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer

at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:487)

at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)

at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:414)

at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:66)

at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:209)

at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:95)

at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:95)

at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)

at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:171)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at py4j.Gateway.invoke(Gateway.java:282)

at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at py4j.commands.CallCommand.execute(CallCommand.java:79)

at py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer

at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

at java.lang.ClassLoader.loadClass(ClassLoader.java:418)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)

at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

... 20 more

贴出我写的代码如下:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("KafkaExample")\

.getOrCreate()

kafkaConf = {

"kafka.bootstrap.servers": "xxxxxx:9092",

"subscribe": "topic",

"kafka.auto.offset.reset": "earliest",

"kafka.group.id": "default",

"kafka.security.protocol": "SASL_PLAINTEXT",

"kafka.sasl.mechanism": "SCRAM-SHA-256",

"kafka.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username='${username}' password='${password}';",

"kafka.partition.assignment.strategy": "org.apache.kafka.clients.consumer.RangeAssignor"

}

# 创建 Kafka 数据源

df = spark.readStream.format("kafka").options(**kafkaConf).load()

# .option("kafka.partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor") \

# 对从Kafka接收到的数据进行处理

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show# 查询数据

query = df.writeStream \

.outputMode("append") \

.format("console") \

.start()

query.awaitTermination()


回答:

根据错误信息,看起来是缺少 Kafka 的 ByteArrayDeserializer 类。可能是你的环境没有正确配置 Kafka 客户端的依赖项。请尝试确认是否已经正确安装并配置了 Kafka 客户端。另外,你可以尝试使用 pip 安装 Kafka 客户端的依赖,例如:

pip install kafka-python

此外,你还需要在 Spark 中将 Kafka 客户端依赖项添加到 classpath 中,你可以在代码中添加以下行来执行此操作:

spark.sparkContext.addPyFile('/path/to/kafka-clients.jar')

注意,这里的路径需要指向正确的 Kafka 客户端 jar 包。

如果问题仍然存在,请尝试使用其他 Python 的 Kafka 客户端库来实现读取 Kafka 数据,例如 kafka-python 或 confluent-kafka-python。同时,也可以考虑使用其他的流处理框架,例如 Streamlit 等,来读取 Kafka 数据。


在使用spark.sparkContext.addPyFile('/path/to/kafka-clients.jar')添加依赖包后,需要确保在 Spark 应用程序中正确引入 Kafka 相关的类库。

一种常见的方法是使用spark-submit命令提交 Spark 应用程序时,将 Kafka 相关的 JAR 包以 --jars 的参数形式添加到 Spark 的 Classpath 中。例如:

$ spark-submit --master yarn \

--deploy-mode client \

--jars /path/to/kafka-clients.jar \

my_spark_app.py

其中 my_spark_app.py 是你的 Spark 应用程序的入口文件。这样,Spark 应用程序就可以在运行时正确地找到并加载 Kafka 相关的类库。

另外,确保 Kafka 服务器正常运行,并在你的应用程序代码中正确配置了 Kafka 连接参数,以便正确连接和操作 Kafka。

以上是 用python语言使用spark streaming 读取kafka数据? 的全部内容, 来源链接: utcz.com/p/938878.html

回到顶部