Pyflink 作业,本地运行正常,提交集群时错误,该如何提交集群运行?
主要报错
【ClassNotFoundException: org.apache.kafka.clients.consumer.OffsetResetStrategy】
flink 版本 1.17.1
python 版本 3.10
demo 代码
import jsonfrom pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer, FlinkKafkaProducer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common.typeinfo import Types
from json import dumps
if __name__ == '__main__':
brokers = '172.18.98.96:9092'
source_topic = "test1" # 源数据
sink_topic = "test3" # 结果
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.AUTOMATIC)
env.add_jars("file:///home/demo/jar/flink-sql-connector-kafka-1.17.1.jar")
# env.add_jars("file:///usr/local/lib/python3.10/dist-packages/lib/flink-sql-connector-kafka-1.17.1.jar")
source = KafkaSource.builder() \
.set_bootstrap_servers(brokers) \
.set_topics(source_topic) \
.set_group_id("demo") \
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
def str_to_dict(data):
json_data = json.loads(data)
action=json_data.get('action')
is_click=1 if action=='click' else 0
return json_data.get('name'), is_click
def format_json(data):
return json.dumps({'name':data[0] ,'click_num':data[1]},ensure_ascii=False)
ds = ds.map(str_to_dict,output_type=Types.TUPLE([Types.STRING(), Types.INT()]))
ds = ds.key_by(lambda x: x[0]).sum(1).map(format_json,output_type=Types.STRING())
serialization_schema = SimpleStringSchema()
kafka_producer = FlinkKafkaProducer(
topic=sink_topic,
serialization_schema=serialization_schema,
producer_config={'bootstrap.servers': brokers, 'group.id': 'my-group'})
ds.add_sink(kafka_producer)
env.execute('demo')
本地运行 python demo.py 正常
提交flink 时 执行
flink run -m 172.19.98.96:8081 -py demo.py --jarfile /home/demo/jar/flink-sql-connector-kafka-1.17.1.jar
报错
root@a68045bb7b7a:/home/demo# flink run -m 172.19.98.96:8081 -py demo.py --jarfile /home/demo/jar/flink-sql-connector-kafka-1.17.1.jar
Traceback (most recent call last):
File "/home/demo/demo.py", line 22, in <module>
source = KafkaSource.builder() \
File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/connectors/kafka.py", line 387, in builder
File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/connectors/kafka.py", line 430, in __init__
File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco
File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.connector.kafka.source.KafkaSource.builder.
: java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/OffsetResetStrategy
at org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.earliest(OffsetsInitializer.java:147)
at org.apache.flink.connector.kafka.source.KafkaSourceBuilder.<init>(KafkaSourceBuilder.java:106)
at org.apache.flink.connector.kafka.source.KafkaSource.builder(KafkaSource.java:123)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.OffsetResetStrategy
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 14 more
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
... 14 more
补充 jobmanager 类库
回答:
先说下我这里测试运行发现的可能的报错原因:
缺少org.apache.kafka.clients.consumer.OffsetResetStrategy类。猜测可能是因为你的Flink集群中缺少相关依赖项。
我的建议是将flink-sql-connector-kafka-1.17.1.jar上传到集群的lib目录中。使用这个命令
cp /home/demo/jar/flink-sql-connector-kafka-1.17.1.jar /opt/flink/lib/
将其复制到Flink的lib目录,重启后应该就可以解决这个环境问题。
以上是 Pyflink 作业,本地运行正常,提交集群时错误,该如何提交集群运行? 的全部内容, 来源链接: utcz.com/p/938973.html