kafka消费者偏移量提交

编程

同步提交当前偏移量

把auto.commit.offset设为false,方应用程序决定何时提交偏移量,使用commitSync()方法提交偏移量最简单可靠,这个api会提交

poll()方法返回的最新偏移量,提交成功后马上返回,如果提交失败,则跑出异常

注意: commitSync()方法提交的偏移量是最新的偏移量,需要在处理完所有消息之后再调用

代码示例

    @Test

void test() {

Properties properties = new Properties()

properties.put(KafkaConstants.BOOTSTRAP_SERVERS_KEY, KafkaConstants.BOOTSTRAP_SERVERS)

properties.put(KafkaConstants.KEY_DESERIALIZER_KEY, KafkaConstants.STRING_DESERIALIZER)

properties.put(KafkaConstants.VALUE_DESERIALIZER_KEY, KafkaConstants.STRING_DESERIALIZER)

properties.put(KafkaConstants.KEY_GROUP_ID, KafkaConstants.DEFAULT_GROUP_ID)

properties.put("auto.commit.offset", false)

Consumer consumer = null

try{

consumer = new KafkaConsumer<String, String>(properties)

consumer.subscribe(["test"])

while (true){

ConsumerRecords<String, String> records = consumer.poll(100)

records.each {

println "key:${it.key()}, val:${it.value()}"

}

// 同步提交当前偏移量

consumer.commitSync()

}

}finally {

consumer.close()

}

}

异步提交当前偏移量

同步提交的不足之处在于,在Broker对请求做出相应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量.可以通过异步提交来解决这个

问题,我们只管发送提交请求,不需要等待Broker相应

commitAsync不支持重试,但是可以在其回调中实现此功能,但是需要注意提交的顺序,否则可能会导致消息的重复

代码示例

    @Test

void test() {

Properties properties = new Properties()

properties.put(KafkaConstants.BOOTSTRAP_SERVERS_KEY, KafkaConstants.BOOTSTRAP_SERVERS)

properties.put(KafkaConstants.KEY_DESERIALIZER_KEY, KafkaConstants.STRING_DESERIALIZER)

properties.put(KafkaConstants.VALUE_DESERIALIZER_KEY, KafkaConstants.STRING_DESERIALIZER)

properties.put(KafkaConstants.KEY_GROUP_ID, KafkaConstants.DEFAULT_GROUP_ID)

properties.put("auto.commit.offset", false)

Consumer consumer = null

try{

consumer = new KafkaConsumer<String, String>(properties)

consumer.subscribe(["test"])

while (true){

ConsumerRecords<String, String> records = consumer.poll(100)

records.each {

println "key:${it.key()}, val:${it.value()}"

}

// 异步提交当前偏移量

consumer.commitAsync({Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception ->

})

}

}finally {

consumer.close()

}

}

提交特定的偏移量

class Demo_4_6_2_ConsumerSpecialOffsetCommit {

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>()

@Test

void test() {

Properties properties = new Properties()

properties.put(KafkaConstants.BOOTSTRAP_SERVERS_KEY, KafkaConstants.BOOTSTRAP_SERVERS)

properties.put(KafkaConstants.KEY_DESERIALIZER_KEY, KafkaConstants.STRING_DESERIALIZER)

properties.put(KafkaConstants.VALUE_DESERIALIZER_KEY, KafkaConstants.STRING_DESERIALIZER)

properties.put(KafkaConstants.KEY_GROUP_ID, KafkaConstants.DEFAULT_GROUP_ID)

properties.put("auto.commit.offset", false)

Consumer consumer = null

try{

consumer = new KafkaConsumer<String, String>(properties)

consumer.subscribe(["test"])

while (true){

ConsumerRecords<String, String> records = consumer.poll(100)

records.each {

println "key:${it.key()}, val:${it.value()}"

// 记录当前偏移量

currentOffsets.put(

new TopicPartition(it.topic(), it.partition()), new OffsetAndMetadata(it.offset() + 1, "no metadata"))

// 提交偏移量

consumer.commitAsync(currentOffsets, null)

}

}

}finally {

consumer.close()

}

}

}

以上是 kafka消费者偏移量提交 的全部内容, 来源链接: utcz.com/z/513548.html

回到顶部