Pyflink 作业,本地运行正常,提交集群时错误,该如何提交集群运行?

主要报错
【ClassNotFoundException: org.apache.kafka.clients.consumer.OffsetResetStrategy】

flink 版本 1.17.1
python 版本 3.10

demo 代码

import json

from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode

from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer, FlinkKafkaProducer

from pyflink.common.serialization import SimpleStringSchema

from pyflink.common.watermark_strategy import WatermarkStrategy

from pyflink.common.typeinfo import Types

from json import dumps

if __name__ == '__main__':

brokers = '172.18.98.96:9092'

source_topic = "test1" # 源数据

sink_topic = "test3" # 结果

env = StreamExecutionEnvironment.get_execution_environment()

env.set_runtime_mode(RuntimeExecutionMode.AUTOMATIC)

env.add_jars("file:///home/demo/jar/flink-sql-connector-kafka-1.17.1.jar")

# env.add_jars("file:///usr/local/lib/python3.10/dist-packages/lib/flink-sql-connector-kafka-1.17.1.jar")

source = KafkaSource.builder() \

.set_bootstrap_servers(brokers) \

.set_topics(source_topic) \

.set_group_id("demo") \

.set_starting_offsets(KafkaOffsetsInitializer.latest()) \

.set_value_only_deserializer(SimpleStringSchema()) \

.build()

ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")

def str_to_dict(data):

json_data = json.loads(data)

action=json_data.get('action')

is_click=1 if action=='click' else 0

return json_data.get('name'), is_click

def format_json(data):

return json.dumps({'name':data[0] ,'click_num':data[1]},ensure_ascii=False)

ds = ds.map(str_to_dict,output_type=Types.TUPLE([Types.STRING(), Types.INT()]))

ds = ds.key_by(lambda x: x[0]).sum(1).map(format_json,output_type=Types.STRING())

serialization_schema = SimpleStringSchema()

kafka_producer = FlinkKafkaProducer(

topic=sink_topic,

serialization_schema=serialization_schema,

producer_config={'bootstrap.servers': brokers, 'group.id': 'my-group'})

ds.add_sink(kafka_producer)

env.execute('demo')

本地运行 python demo.py 正常
提交flink 时 执行

flink run -m 172.19.98.96:8081 -py demo.py --jarfile /home/demo/jar/flink-sql-connector-kafka-1.17.1.jar

报错

root@a68045bb7b7a:/home/demo# flink run -m 172.19.98.96:8081 -py demo.py --jarfile /home/demo/jar/flink-sql-connector-kafka-1.17.1.jar

Traceback (most recent call last):

File "/home/demo/demo.py", line 22, in <module>

source = KafkaSource.builder() \

File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/connectors/kafka.py", line 387, in builder

File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/connectors/kafka.py", line 430, in __init__

File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__

File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco

File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.connector.kafka.source.KafkaSource.builder.

: java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/OffsetResetStrategy

at org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.earliest(OffsetsInitializer.java:147)

at org.apache.flink.connector.kafka.source.KafkaSourceBuilder.<init>(KafkaSourceBuilder.java:106)

at org.apache.flink.connector.kafka.source.KafkaSource.builder(KafkaSource.java:123)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)

at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)

at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)

at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.lang.Thread.run(Thread.java:750)

Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.OffsetResetStrategy

at java.net.URLClassLoader.findClass(URLClassLoader.java:387)

at java.lang.ClassLoader.loadClass(ClassLoader.java:418)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)

at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

... 14 more

org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1

at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)

at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)

at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)

at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)

Caused by: java.lang.RuntimeException: Python process exits with code: 1

at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)

... 14 more

补充 jobmanager 类库
Pyflink 作业,本地运行正常,提交集群时错误,该如何提交集群运行?


回答:

先说下我这里测试运行发现的可能的报错原因:
缺少org.apache.kafka.clients.consumer.OffsetResetStrategy类。猜测可能是因为你的Flink集群中缺少相关依赖项。
我的建议是将flink-sql-connector-kafka-1.17.1.jar上传到集群的lib目录中。使用这个命令

cp /home/demo/jar/flink-sql-connector-kafka-1.17.1.jar /opt/flink/lib/

将其复制到Flink的lib目录,重启后应该就可以解决这个环境问题。

以上是 Pyflink 作业,本地运行正常,提交集群时错误,该如何提交集群运行? 的全部内容, 来源链接: utcz.com/p/938973.html

回到顶部