聊聊MaxwellKafkaProducer

编程

MaxwellKafkaProducer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java

public class MaxwellKafkaProducer extends AbstractProducer {

private final ArrayBlockingQueue<RowMap> queue;

private final MaxwellKafkaProducerWorker worker;

public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) {

super(context);

this.queue = new ArrayBlockingQueue<>(100);

this.worker = new MaxwellKafkaProducerWorker(context, kafkaProperties, kafkaTopic, this.queue);

Thread thread = new Thread(this.worker, "maxwell-kafka-worker");

thread.setDaemon(true);

thread.start();

}

@Override

public void push(RowMap r) throws Exception {

this.queue.put(r);

}

@Override

public StoppableTask getStoppableTask() {

return this.worker;

}

@Override

public KafkaProducerDiagnostic getDiagnostic() {

return new KafkaProducerDiagnostic(worker, context.getConfig(), context.getPositionStoreThread());

}

}

  • MaxwellKafkaProducer继承了AbstractProducer,其构造器会创建ArrayBlockingQueue、ArrayBlockingQueue;其push方法则往queue中put数据

MaxwellKafkaProducerWorker

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java

class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask {

static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class);

private final Producer<String, String> kafka;

private final String topic;

private final String ddlTopic;

private final MaxwellKafkaPartitioner partitioner;

private final MaxwellKafkaPartitioner ddlPartitioner;

private final KeyFormat keyFormat;

private final boolean interpolateTopic;

private final ArrayBlockingQueue<RowMap> queue;

private Thread thread;

private StoppableTaskState taskState;

private String deadLetterTopic;

private final ConcurrentLinkedQueue<Pair<ProducerRecord<String,String>, KafkaCallback>> deadLetterQueue;

public static MaxwellKafkaPartitioner makeDDLPartitioner(String partitionHashFunc, String partitionKey) {

if ( partitionKey.equals("table") ) {

return new MaxwellKafkaPartitioner(partitionHashFunc, "table", null, "database");

} else {

return new MaxwellKafkaPartitioner(partitionHashFunc, "database", null, null);

}

}

public MaxwellKafkaProducerWorker(MaxwellContext context, String kafkaTopic, ArrayBlockingQueue<RowMap> queue,

Producer<String,String> producer)

{

super(context);

if ( kafkaTopic == null ) {

this.topic = "maxwell";

} else {

this.topic = kafkaTopic;

}

this.interpolateTopic = this.topic.contains("%{");

this.kafka = producer;

String hash = context.getConfig().kafkaPartitionHash;

String partitionKey = context.getConfig().producerPartitionKey;

String partitionColumns = context.getConfig().producerPartitionColumns;

String partitionFallback = context.getConfig().producerPartitionFallback;

this.partitioner = new MaxwellKafkaPartitioner(hash, partitionKey, partitionColumns, partitionFallback);

this.ddlPartitioner = makeDDLPartitioner(hash, partitionKey);

this.ddlTopic = context.getConfig().ddlKafkaTopic;

this.deadLetterTopic = context.getConfig().deadLetterTopic;

this.deadLetterQueue = new ConcurrentLinkedQueue<>();

if ( context.getConfig().kafkaKeyFormat.equals("hash") )

keyFormat = KeyFormat.HASH;

else

keyFormat = KeyFormat.ARRAY;

this.queue = queue;

this.taskState = new StoppableTaskState("MaxwellKafkaProducerWorker");

}

public MaxwellKafkaProducerWorker(MaxwellContext context, Properties kafkaProperties, String kafkaTopic,

ArrayBlockingQueue<RowMap> queue)

{

this(context, kafkaTopic, queue,

new KafkaProducer<String,String>(kafkaProperties, new StringSerializer(), new StringSerializer()));

}

@Override

public void run() {

this.thread = Thread.currentThread();

while ( true ) {

try {

drainDeadLetterQueue();

RowMap row = queue.take();

if (!taskState.isRunning()) {

taskState.stopped();

return;

}

this.push(row);

} catch ( Exception e ) {

taskState.stopped();

context.terminate(e);

return;

}

}

}

void drainDeadLetterQueue() {

Pair<ProducerRecord<String, String>, KafkaCallback> pair;

while ((pair = deadLetterQueue.poll()) != null) {

sendAsync(pair.getLeft(), pair.getRight());

}

}

//......

}

  • MaxwellKafkaProducerWorker继承了AbstractAsyncProducer,实现了Runnable及StoppableTask接口;其run方法使用while循环,不断执行drainDeadLetterQueue、queue.take()、this.push(row);drainDeadLetterQueue方法从deadLetterQueue拉取数据,然后通过sendAsync再次发送

AbstractAsyncProducer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java

public abstract class AbstractAsyncProducer extends AbstractProducer {

public class CallbackCompleter {

private InflightMessageList inflightMessages;

private final MaxwellContext context;

private final MaxwellConfig config;

private final Position position;

private final boolean isTXCommit;

private final long messageID;

public CallbackCompleter(InflightMessageList inflightMessages, Position position, boolean isTXCommit, MaxwellContext context, long messageID) {

this.inflightMessages = inflightMessages;

this.context = context;

this.config = context.getConfig();

this.position = position;

this.isTXCommit = isTXCommit;

this.messageID = messageID;

}

public void markCompleted() {

inflightMessages.freeSlot(messageID);

if(isTXCommit) {

InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position);

if (message != null) {

context.setPosition(message.position);

long currentTime = System.currentTimeMillis();

long age = currentTime - message.sendTimeMS;

messagePublishTimer.update(age, TimeUnit.MILLISECONDS);

messageLatencyTimer.update(Math.max(0L, currentTime - message.eventTimeMS - 500L), TimeUnit.MILLISECONDS);

if (age > config.metricsAgeSlo) {

messageLatencySloViolationCount.inc();

}

}

}

}

}

private InflightMessageList inflightMessages;

public AbstractAsyncProducer(MaxwellContext context) {

super(context);

this.inflightMessages = new InflightMessageList(context);

Metrics metrics = context.getMetrics();

String gaugeName = metrics.metricName("inflightmessages", "count");

metrics.register(gaugeName, (Gauge<Long>) () -> (long) inflightMessages.size());

}

public abstract void sendAsync(RowMap r, CallbackCompleter cc) throws Exception;

@Override

public final void push(RowMap r) throws Exception {

Position position = r.getNextPosition();

// Rows that do not get sent to a target will be automatically marked as complete.

// We will attempt to commit a checkpoint up to the current row.

if(!r.shouldOutput(outputConfig)) {

if ( position != null ) {

inflightMessages.addMessage(position, r.getTimestampMillis(), 0L);

InflightMessageList.InflightMessage completed = inflightMessages.completeMessage(position);

if (completed != null) {

context.setPosition(completed.position);

}

}

return;

}

// back-pressure from slow producers

long messageID = inflightMessages.waitForSlot();

if(r.isTXCommit()) {

inflightMessages.addMessage(position, r.getTimestampMillis(), messageID);

}

CallbackCompleter cc = new CallbackCompleter(inflightMessages, position, r.isTXCommit(), context, messageID);

sendAsync(r, cc);

}

}

  • AbstractAsyncProducer继承了AbstractProducer,其push方法主要执行inflightMessages.addMessage及sendAsync

小结

MaxwellKafkaProducer继承了AbstractProducer,其构造器会创建ArrayBlockingQueue、ArrayBlockingQueue;其push方法则往queue中put数据;MaxwellKafkaProducerWorker继承了AbstractAsyncProducer,实现了Runnable及StoppableTask接口;其run方法使用while循环,不断执行drainDeadLetterQueue、queue.take()、this.push(row);drainDeadLetterQueue方法从deadLetterQueue拉取数据,然后通过sendAsync再次发送

doc

  • MaxwellKafkaProducer

以上是 聊聊MaxwellKafkaProducer 的全部内容, 来源链接: utcz.com/z/516109.html

回到顶部