
如何为Kafka设置Java选项?
我一直在试验Kafka,并从主站点的文档中看到,您可以为jvm设置不同的选项,例如堆大小和它使用的垃圾收集器:http://kafka.apache.org/documentation.html#java但是,没有说的是如何/在何处设置这些选项。该应用程序带有一个/ config目录,其中包含许多用于配置目的的文件,但没有用于Java的文件。它还带有一个...
2024-01-10
如何在Kafka中使用多个消费者?
我是一名学习Kafka的新学生,在了解多个消费者(到目前为止,文章,文档等对他们没有太大帮助)方面,我遇到了一些基本问题。我尝试做的一件事是编写我自己的高级Kafka生产者和消费者并同时运行它们,将100条简单消息发布到某个主题,然后让消费者检索它们。我已经成功地做到了这一点,但是...
2024-01-10
Kafka如何存储每个主题的偏移量?
在轮询Kafka时,我已经使用该subscribe()功能订阅了多个主题。现在,我想设置的偏离,我想从每个主题阅读,而无需每次重新订阅后seek(),并poll()从一个话题。seek(),是否可以迭代调用每个主题名称 达到结果?偏移量如何精确存储在Kafka中?我每个主题有一个分区,并且只有一个使用者可以读取所有主...
2024-01-10
Kafka-经纪人:小组协调员不可用
我有以下结构:zookeeper: 3.4.12kafka: kafka_2.11-1.1.0server1: zookeeper + kafkaserver2: zookeeper + kafkaserver3: zookeeper + kafka通过kafka-topics shell脚本创建了具有复制因子3和分区3的主题。./kafka-topics.sh --create --zookeeper localhost:2181 --topic test-flow --partitions 3 --re...
2024-01-10
如何通过Java在Kafka中创建主题
我想通过Java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题。如果我在命令提示符下创建一个主题,并且如果我通过javaapi推送消息,它也可以正常工作。但是我想通过java api创建一个主题。经过长时间的搜索,我发现了以下代码,ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);AdminUtils.createTopic(zkClient, myTo...
2024-01-10
Kafka KStreams-处理超时
我试图用<KStream>.process()一个TimeWindows.of("name", 30000)批量一些 值,并送他们。似乎30秒钟超出了使用者超时间隔,在此间隔之后,Kafka认为该使用者已失效并释放了分区。我尝试提高 和 的频率来避免这种情况:config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");config.put(StreamsConfig.POLL_MS_CONFIG, "5000");不幸的...
2024-01-10
将自定义Java对象发送到Kafka主题
我有我的自定义Java对象,希望利用JVM的内置序列化将其发送到Kafka主题,但是序列化失败并出现以下错误org.apache.kafka.common.errors.SerializationException:无法将com.spring.kafka.Payload类的值转换为value.serializer中指定的org.apache.kafka.common.serialization.ByteArraySerializer类。public class Payload implements Seri...
2024-01-10
Kafka 0.8.2.2-无法发布消息
我们已经编写了一个Java客户端,用于将消息发布到kafka。代码如下所示Properties props = new Properties();props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "202.xx.xx.xxx:9092");props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG,Integer.toString(5 * 1000));props.put(P...
2024-01-10
Kafka-使用高级使用者的延迟队列实施
想要使用高级消费者API实现延迟的消费者大意:按键生成消息(每个消息包含创建时间戳记),以确保每个分区按生成时间对消息进行排序。auto.commit.enable = false(将在每个消息处理之后显式提交)消费一条消息检查消息时间戳,并检查是否经过了足够的时间处理消息(此操作将永不失败)提交1...
2024-01-10
在Kafka用户中重试用尽时如何设置确认
我有一个重试5次的Kafka使用者,并且我正在使用带有重试模板的SpringKafka。现在,如果所有重试都失败了,那么在这种情况下如何确认工作。另外,如果我将确认模式设置为手动,那么如何确认这些消息消费者@Bean("kafkaListenerContainerFactory")public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContain...
2024-01-10
Kafka“未在JAAS配置中指定登录模块”
与sasl使用控制台脚本保护的Kafka通信时出现问题。Kafka受保护sasl,监听器受保护SASL_PLAINTEXT,机制由PLAIN。我做了什么:我尝试使用kafka脚本之一列出一些数据:bin/kafka-consumer-groups.sh --bootstrap-server (address) --list但是我明白了WARN Bootstrap broker (address) disconnected (org.apache.kafka.clients.NetworkClient)命令失...
2024-01-10
Kafka:使用Java更改特定主题的分区数
我是Kafka的新手,正在使用新的KafkaProducer和KafkaConsumer,版本:0.9.0.1在创建特定主题之后,java中是否有任何方法可以更改/更新特定主题的分区数。我没有使用Zookeeper创建主题。当发布请求到达时,我的KafkaProducer会自动创建主题。如果还不够,我还可以提供更多详细信息回答:是的,有可能。您必须...
2024-01-10
春云流Kafka粘结剂:“尝试从状态IN_TRANSACTION到状态IN_TRANSACTION的无效转换”
我正在尝试使用Spring云流+ Kafka绑定对Apache Kafka进行“恰好一个交付”概念的一些PoC。春云流Kafka粘结剂:“尝试从状态IN_TRANSACTION到状态IN_TRANSACTION的无效转换”我安装了Apache Kafka“kafka_2.11-1.0.0”,并在生产者中定义了“transactionIdPrefix”,我知道这是我在Spring Kafka中启用事务所需要做的唯一事情,但...
2024-01-10
如何在kafka中创建自定义序列化程序?
序列化器很少,例如org.apache.kafka.common.serialization.StringSerializerorg.apache.kafka.common.serialization.StringSerializer我们如何创建自己的自定义序列化程序?回答:在这里,您有一个示例,将自己的序列化器/解串器用于Kafka消息值。对于Kafka消息密钥是同一回事。我们希望将MyMessage的序列化版本作为Kafka值发送...
2024-01-10
kafka获取主题的分区计数
如何从代码中获取任何kafka主题的分区数。我研究了许多链接,但似乎没有一个起作用。提及一些:http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count-through-simpleclient-apihttp://grokbase.com/t/kafka/users/151cv3htga/get-replication-and-partition-count-of-a-topichttp://qnalist.com/que...
2024-01-10
如何获取kafka主题分区的最后/结束偏移量?
我正在kafka使用Java编写使用者。我想保持消息的实时性,因此,如果有太多消息在等待使用,例如1000条或更多,我应该放弃未使用的消息,并从最后一个偏移量开始使用。对于此问题,我尝试比较主题的最后提交的偏移量和主题的结束偏移量(仅1个分区),如果这两个偏移量之间的差大于某个值,则...
2024-01-10
kafka Avro消息反序列化器,可用于多个主题
我正在尝试使用以下代码以avro格式反序列化kafka消息:[https](https://github.com/ivangfr/springboot-kafka-debezium-ksql/blob/master/kafka-research-consumer/src/main/java/com/mycompany/kafkaresearchconsumer/kafka/ReviewsConsumerConfig.java)//github.com/ivangfr/springboot-kafka-deb...
2024-01-10
Apache Kafka:生产者-Consume API没有在GCP上运行
我一直试图在Kafka集群上运行我的生产者和消费者api,但它不工作。Apache Kafka:生产者-Consume API没有在GCP上运行1)动物园管理员是在一个GCP VM实例运行2)卡夫卡正在另一个GCP VM实例运行步骤如下:步骤1)使用运行动物园管理员以下命令:bin/zookeeper-server-start.sh config/zookeeper.properties 个Zookeeper.propert...
2024-01-10
Apache Spark计数记录每个组的空值
当我尝试计算每个组的记录数时,我发现该组具有空值但没有记录,但这是不正确的。Apache Spark计数记录每个组的空值输入数据帧:+--------+ | Name| +--------+ | Andrei| | Andrei| | null| | null| |Grigorii| +--------+ 代码:Dataset<Row> df = inputDf.groupBy("Name") .agg(functions.count("Name").as("Name_count")); 实际数...
2024-01-10
在Apache Spark中读取多行JSON
我试图将JSON文件用作小型数据库。在DataFrame上创建模板表后,我使用SQL查询了该表并得到了异常。这是我的代码:val df = sqlCtx.read.json("/path/to/user.json")df.registerTempTable("user_tt")val info = sqlCtx.sql("SELECT name FROM user_tt")info.show()df.printSchema() 结果:root |-- _corrupt_record: string (nul...
2024-01-10
如何从Java中的Apache POI库调用宏?
我有 :名为“ process”的宏,它为我的工作表完成所有处理工作。我希望使用Apache POI的Java代码调用此宏,以便它可以为我处理工作表。如何在Java的Apache POI中调用宏?我import org.apache.poi.ss.usermodel在Apache POI中使用。请提供示例代码。(我是Apache POI和Java的新手。)回答:我认为这是不...
2024-01-10
Apache Spark中的分层数据处理
我在Spark(v2.1.1)中有一个包含分层数据的3列(如下所示)的数据集。Apache Spark中的分层数据处理我的目标的目标是增量编号分配给基础上,父子层次的每一行。从图形上可以说,分层数据是一个树的集合。根据下表,我已经有基于'Global_ID'分组的行。现在我想以 的增量顺序生成'Value'列,但是基于 ...
2024-01-10
Java,如何获取Apache Kafka中某个主题的消息数
我正在使用apache kafka进行消息传递。我已经用Java实现了生产者和消费者。我们如何获取主题中的消息数量?回答:从消费者的角度来看,想到此的唯一方法是实际消费消息并计数。Kafka代理公开了自启动以来收到的消息数量的JMX计数器,但是您不知道已经清除了其中的多少。在最常见的情况下,最好...
2024-01-10
使用Apache POI的Java程序让我感到奇怪例外
我有一些严重的困难让我的项目离开地面。我有以下代码:使用Apache POI的Java程序让我感到奇怪例外FileInputStream file = new FileInputStream(new File("src/retestchecker/test_sheet.xlsx")); //Get the workbook instance for XLS file XSSFWorkbook workbook = new XSSFWorkbook(file); //Get first sheet from ...
2024-01-10
如何为Kafka 2.2实现FlinkKafkaProducer序列化程序
我一直在努力更新从Kafka读取然后写入Kafka的Flink处理器(Flink 1.9版)。我们已经将此处理器编写为可以朝着Kafka0.10.2集群运行,现在我们已经部署了一个运行2.2版的新Kafka集群。因此,我着手更新处理器以使用最新的FlinkKafkaConsumer和FlinkKafkaProducer(由Flink文档建议)。但是我遇到了卡夫卡制片人的一些问...
2024-01-10
带有Apache POI的Java中的运行时错误
我得到错误:java.lang.NoSuchMethodError: org.apache.xmlbeans.XmlOptions.setSaveAggressiveNamespaces()Lorg/apache/xmlbeans/XmlOptions;at org.apache.poi.POIXMLDocumentPart.<clinit>(POIXMLDocumentPart.java:56)从第56行开始:public static Workbook wb = new XSSFWorkbook();我...
2024-01-10
