kafka中某个topic的分区消息大量积压,怎么处理

由于消费者组中一个分区只能被一个消费者消费。所以如果增加消费者组的形式来增加消费者的话,它们的offset也是不一样的,所以没法保证消费被有序的消费完了

回答

请允许我直接做一个假设,在消息中存放了一个user_id, 可以将消息重新发送一次,大致模型如下
image.png

  • 补上一段代码, 这里就不使用kafka进行说明,采用Queue 进行展示

package org.huifer.queue;

import java.util.LinkedList;

import java.util.Queue;

public class QueueBigData {

static Queue<Data> sourceData = new LinkedList<Data>();

static Queue<Data> u1 = new LinkedList<Data>();

static Queue<Data> u2 = new LinkedList<Data>();

static Queue<Data> u3 = new LinkedList<Data>();

public static void main(String[] args) {

sourceData.add(new Data(1, "1号用户的第 1 个行为"));

sourceData.add(new Data(2, "2号用户的第 1 个行为"));

sourceData.add(new Data(3, "3号用户的第 1 个行为"));

sourceData.add(new Data(1, "1号用户的第 2 个行为"));

sourceData.add(new Data(4, "4号用户的第 1 个行为"));

// 设置处理行为

IListener listener = event -> {

if (event instanceof SourceEvent) {

System.out.println(((SourceEvent) event).info + " : " + ((SourceEvent) event).msg);

}

};

for (Data sourceDatum : sourceData) {

Integer userId = sourceDatum.getUserId();

if (1 % userId == 0) {

u1.add(sourceDatum);

event(listener, sourceDatum, "处理1号特征的数据");

} else if (2 % userId == 0) {

u2.add(sourceDatum);

event(listener, sourceDatum, "处理2号特征的数据");

} else if (3 % userId == 0) {

u3.add(sourceDatum);

event(listener, sourceDatum, "处理3号特征的数据");

}

}

System.out.println();

}

private static void event(IListener listener, Data sourceDatum, String info) {

SourceEvent event = new SourceEvent(sourceDatum.getMsg(), info);

event.setListener(listener);

event.event();

}

public interface IListener {

void doEvent(IEvent event);

}

public interface IEvent {

void setListener(IListener listener);

}

private static class Data {

private Integer userId;

private String msg;

public Data() {

}

public Data(Integer userId, String msg) {

this.userId = userId;

this.msg = msg;

}

@Override

public String toString() {

return "{"Data":{"

+ ""userId":"

+ userId

+ ","msg":""

+ msg + '"'

+ "}}";

}

public Integer getUserId() {

return userId;

}

public void setUserId(Integer userId) {

this.userId = userId;

}

public String getMsg() {

return msg;

}

public void setMsg(String msg) {

this.msg = msg;

}

}

public static class SourceEvent implements IEvent {

private final String msg;

private final String info;

private IListener listener;

public SourceEvent(String msg, String info) {

this.msg = msg;

this.info = info;

}

@Override

public void setListener(IListener listener) {

this.listener = listener;

}

public void event() {

listener.doEvent(this);

}

}

}

以上是 kafka中某个topic的分区消息大量积压,怎么处理 的全部内容, 来源链接: utcz.com/a/19765.html

回到顶部