Kafka之拦截器Interceptor

编程

    Kafka client版本0.10

    ProducerInterceptor

    List-1

public interface ProducerInterceptor<K, V> extends Configurable {

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

public void onAcknowledgement(RecordMetadata metadata, Exception exception);

public void close();

}

  •     onSend方法,在消息发送到Broker之前会调用
  •     onAcknowledgement,是Broker端返回确认消息后调用

     ConsumerInterceptor

        List-2

public interface ConsumerInterceptor<K, V> extends Configurable {

public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);

public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);

public void close();

}

  •     onConsume方法是从Broker端取到消息,但是poll方法返回前调用
  •     onCommit是提交offset后调用

    使用场景:我们可以在Producer端统一拦截,加上处理时间,再在consumer端统一拦截统计端到端的处理时间,这也是一种监控方式。

 

以上是 Kafka之拦截器Interceptor 的全部内容, 来源链接: utcz.com/z/518288.html

回到顶部