kafka中某个topic的分区消息大量积压,怎么处理
由于消费者组中一个分区只能被一个消费者消费。所以如果增加消费者组的形式来增加消费者的话,它们的offset也是不一样的,所以没法保证消费被有序的消费完了
回答
请允许我直接做一个假设,在消息中存放了一个user_id, 可以将消息重新发送一次,大致模型如下
- 补上一段代码, 这里就不使用kafka进行说明,采用Queue 进行展示
 
package org.huifer.queue;import java.util.LinkedList;
import java.util.Queue;
public class QueueBigData {
  static Queue<Data> sourceData = new LinkedList<Data>();
  static Queue<Data> u1 = new LinkedList<Data>();
  static Queue<Data> u2 = new LinkedList<Data>();
  static Queue<Data> u3 = new LinkedList<Data>();
  public static void main(String[] args) {
    sourceData.add(new Data(1, "1号用户的第 1 个行为"));
    sourceData.add(new Data(2, "2号用户的第 1 个行为"));
    sourceData.add(new Data(3, "3号用户的第 1 个行为"));
    sourceData.add(new Data(1, "1号用户的第 2 个行为"));
    sourceData.add(new Data(4, "4号用户的第 1 个行为"));
    // 设置处理行为
    IListener listener = event -> {
      if (event instanceof SourceEvent) {
        System.out.println(((SourceEvent) event).info + " : " + ((SourceEvent) event).msg);
      }
    };
    for (Data sourceDatum : sourceData) {
      Integer userId = sourceDatum.getUserId();
      if (1 % userId == 0) {
        u1.add(sourceDatum);
        event(listener, sourceDatum, "处理1号特征的数据");
      } else if (2 % userId == 0) {
        u2.add(sourceDatum);
        event(listener, sourceDatum, "处理2号特征的数据");
      } else if (3 % userId == 0) {
        u3.add(sourceDatum);
        event(listener, sourceDatum, "处理3号特征的数据");
      }
    }
    System.out.println();
  }
  private static void event(IListener listener, Data sourceDatum, String info) {
    SourceEvent event = new SourceEvent(sourceDatum.getMsg(), info);
    event.setListener(listener);
    event.event();
  }
  public interface IListener {
    void doEvent(IEvent event);
  }
  public interface IEvent {
    void setListener(IListener listener);
  }
  private static class Data {
    private Integer userId;
    private String msg;
    public Data() {
    }
    public Data(Integer userId, String msg) {
      this.userId = userId;
      this.msg = msg;
    }
    @Override
    public String toString() {
      return "{"Data":{"
          + ""userId":"
          + userId
          + ","msg":""
          + msg + '"'
          + "}}";
    }
    public Integer getUserId() {
      return userId;
    }
    public void setUserId(Integer userId) {
      this.userId = userId;
    }
    public String getMsg() {
      return msg;
    }
    public void setMsg(String msg) {
      this.msg = msg;
    }
  }
  public static class SourceEvent implements IEvent {
    private final String msg;
    private final String info;
    private IListener listener;
    public SourceEvent(String msg, String info) {
      this.msg = msg;
      this.info = info;
    }
    @Override
    public void setListener(IListener listener) {
      this.listener = listener;
    }
    public void event() {
      listener.doEvent(this);
    }
  }
}
以上是 kafka中某个topic的分区消息大量积压,怎么处理 的全部内容, 来源链接: utcz.com/a/19765.html




