Kafka连接自定义timestamp.extractor
我试图从卡夫卡读取消息到S3,有问题添加jar到Kafka连接类路径。Kafka连接自定义timestamp.extractor
目标是根据时间戳在分区中写入消息,该时间戳是卡夫卡消息中密钥的一部分。
为了使故事简短,我必须提供自定义时间戳提取器。在文档here之后创建了一个类,该类实现TimestampExtractor
接口并将一个JAR位置添加到plugin.path
属性。
问题是,当我开始连接时,找不到类。不知怎的,罐子是不是在classpath中,我越来越
org.apache.kafka.common.config.ConfigException: Invalid timestamp extractor: partitioner.SpotadDateTimeExtractor
其他数据:
版本:铺满4.0.0
连接:连接独立
起动指令:
sudo /home/ubuntu/confluent-4.0.0/bin/connect-standalone \ /home/ubuntu/confluent-4.0.0/etc/kafka/connect-standalone.properties \ /home/ubuntu/confluent-4.0.0/etc/kafka-connect-s3/quickstart-s3.properties
Aprreaciate any help。
回答:
为了可以定制时间戳提取类,你的S3接口,您将需要以下内容:
与您的自定义类添加的jar与其他连接器的依赖一起。例如:
保存
./share/java/kafka-connect-s3
下,如果你想这是 仅在S3连接器,或在./share/java/kafka-connect-storage-common
将其提供给 所有存储片连接器(S3目前和HDFS连接器)。- 确保您的自定义类实现了
io.confluent.connect.storage.partitioner.TimestampExtractor
界面。 当您在连接器的配置中设置
timestamp.extractor
属性时,请使用完全限定的类名,当然还要确保它与您在jar中定义和打包的包相匹配。例如:timestamp.extractor=me.connectors.MyTimestampExtractor
最后,你会遵循类似的过程,以使自定义分区提供给您的连接器。
以上是 Kafka连接自定义timestamp.extractor 的全部内容, 来源链接: utcz.com/qa/262688.html