kafka中某个topic的分区消息大量积压,怎么处理
由于消费者组中一个分区只能被一个消费者消费。所以如果增加消费者组的形式来增加消费者的话,它们的offset也是不一样的,所以没法保证消费被有序的消费完了
回答
请允许我直接做一个假设,在消息中存放了一个user_id
, 可以将消息重新发送一次,大致模型如下
- 补上一段代码, 这里就不使用kafka进行说明,采用Queue 进行展示
package org.huifer.queue;import java.util.LinkedList;
import java.util.Queue;
public class QueueBigData {
static Queue<Data> sourceData = new LinkedList<Data>();
static Queue<Data> u1 = new LinkedList<Data>();
static Queue<Data> u2 = new LinkedList<Data>();
static Queue<Data> u3 = new LinkedList<Data>();
public static void main(String[] args) {
sourceData.add(new Data(1, "1号用户的第 1 个行为"));
sourceData.add(new Data(2, "2号用户的第 1 个行为"));
sourceData.add(new Data(3, "3号用户的第 1 个行为"));
sourceData.add(new Data(1, "1号用户的第 2 个行为"));
sourceData.add(new Data(4, "4号用户的第 1 个行为"));
// 设置处理行为
IListener listener = event -> {
if (event instanceof SourceEvent) {
System.out.println(((SourceEvent) event).info + " : " + ((SourceEvent) event).msg);
}
};
for (Data sourceDatum : sourceData) {
Integer userId = sourceDatum.getUserId();
if (1 % userId == 0) {
u1.add(sourceDatum);
event(listener, sourceDatum, "处理1号特征的数据");
} else if (2 % userId == 0) {
u2.add(sourceDatum);
event(listener, sourceDatum, "处理2号特征的数据");
} else if (3 % userId == 0) {
u3.add(sourceDatum);
event(listener, sourceDatum, "处理3号特征的数据");
}
}
System.out.println();
}
private static void event(IListener listener, Data sourceDatum, String info) {
SourceEvent event = new SourceEvent(sourceDatum.getMsg(), info);
event.setListener(listener);
event.event();
}
public interface IListener {
void doEvent(IEvent event);
}
public interface IEvent {
void setListener(IListener listener);
}
private static class Data {
private Integer userId;
private String msg;
public Data() {
}
public Data(Integer userId, String msg) {
this.userId = userId;
this.msg = msg;
}
@Override
public String toString() {
return "{"Data":{"
+ ""userId":"
+ userId
+ ","msg":""
+ msg + '"'
+ "}}";
}
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
public static class SourceEvent implements IEvent {
private final String msg;
private final String info;
private IListener listener;
public SourceEvent(String msg, String info) {
this.msg = msg;
this.info = info;
}
@Override
public void setListener(IListener listener) {
this.listener = listener;
}
public void event() {
listener.doEvent(this);
}
}
}
以上是 kafka中某个topic的分区消息大量积压,怎么处理 的全部内容, 来源链接: utcz.com/a/19765.html