聊聊MaxwellKafkaPartitioner

编程

本文主要研究一下MaxwellKafkaPartitioner

MaxwellKafkaPartitioner

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/partitioners/MaxwellKafkaPartitioner.java

public class MaxwellKafkaPartitioner extends AbstractMaxwellPartitioner {

HashFunction hashFunc;

public MaxwellKafkaPartitioner(String hashFunction, String partitionKey, String csvPartitionColumns, String partitionKeyFallback) {

super(partitionKey, csvPartitionColumns, partitionKeyFallback);

int MURMUR_HASH_SEED = 25342;

switch (hashFunction) {

case "murmur3": this.hashFunc = new HashFunctionMurmur3(MURMUR_HASH_SEED);

break;

case "default":

default:

this.hashFunc = new HashFunctionDefault();

break;

}

}

public int kafkaPartition(RowMap r, int numPartitions) {

return Math.abs(hashFunc.hashCode(this.getHashString(r)) % numPartitions);

}

}

  • MaxwellKafkaPartitioner继承了AbstractMaxwellPartitioner,其构造器根据hashFunction类型创建HashFunctionMurmur3或者HashFunctionDefault;其kafkaPartition方法则通过Math.abs(hashFunc.hashCode(this.getHashString(r)) % numPartitions)计算partition

AbstractMaxwellPartitioner

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/partitioners/AbstractMaxwellPartitioner.java

public abstract class AbstractMaxwellPartitioner {

List<String> partitionColumns = new ArrayList<String>();

private final PartitionBy partitionBy, partitionByFallback;

private PartitionBy partitionByForString(String key) {

if ( key == null )

return PartitionBy.DATABASE;

switch(key) {

case "table":

return PartitionBy.TABLE;

case "database":

return PartitionBy.DATABASE;

case "primary_key":

return PartitionBy.PRIMARY_KEY;

case "transaction_id":

return PartitionBy.TRANSACTION_ID;

case "column":

return PartitionBy.COLUMN;

case "random":

return PartitionBy.RANDOM;

default:

throw new RuntimeException("Unknown partitionBy string: " + key);

}

}

public AbstractMaxwellPartitioner(String partitionKey, String csvPartitionColumns, String partitionKeyFallback) {

this.partitionBy = partitionByForString(partitionKey);

this.partitionByFallback = partitionByForString(partitionKeyFallback);

if ( csvPartitionColumns != null )

this.partitionColumns = Arrays.asList(csvPartitionColumns.split("\s*,\s*"));

}

static protected String getDatabase(RowMap r) {

return r.getDatabase();

}

static protected String getTable(RowMap r) {

return r.getTable();

}

public String getHashString(RowMap r, PartitionBy by) {

switch ( by ) {

case TABLE:

String t = r.getTable();

if ( t == null && partitionByFallback == PartitionBy.DATABASE )

return r.getDatabase();

else

return t;

case DATABASE:

return r.getDatabase();

case PRIMARY_KEY:

return r.getRowIdentity().toConcatString();

case TRANSACTION_ID:

return String.valueOf(r.getXid());

case COLUMN:

String s = r.buildPartitionKey(partitionColumns);

if ( s.length() > 0 )

return s;

else

return getHashString(r, partitionByFallback);

case RANDOM:

return RandomStringUtils.random(10, true, true);

}

return null; // thx java

}

public String getHashString(RowMap r) {

if ( r.getPartitionString() != null )

return r.getPartitionString();

else

return getHashString(r, partitionBy);

}

}

  • AbstractMaxwellPartitioner的构造器通过partitionByForString确定PartitionBy;其getHashString方法根据PartitionBy返回指定的值

HashFunction

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/partitioners/HashFunction.java

public interface HashFunction {

int hashCode(String s);

}

  • HashFunction接口定义了hashCode方法

HashFunctionDefault

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/partitioners/HashFunctionDefault.java

public class HashFunctionDefault implements HashFunction {

public int hashCode(String s) {

return s.hashCode();

}

}

  • HashFunctionDefault实现了HashFunction接口,其hashCode直接返回string的hashCode

HashFunctionMurmur3

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/partitioners/HashFunctionMurmur3.java

public class HashFunctionMurmur3 implements HashFunction {

private int seed;

public HashFunctionMurmur3(int seed){

this.seed = seed;

}

public int hashCode(String s) {

return MurmurHash3.murmurhash3_x86_32(s, 0, s.length(), seed);

}

}

  • HashFunctionMurmur3实现了HashFunction接口,其hashCode方法返回MurmurHash3.murmurhash3_x86_32(s, 0, s.length(), seed)

MaxwellKafkaProducerWorker

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java

class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask {

static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class);

private final Producer<String, String> kafka;

private final String topic;

private final String ddlTopic;

private final MaxwellKafkaPartitioner partitioner;

private final MaxwellKafkaPartitioner ddlPartitioner;

//......

ProducerRecord<String, String> makeProducerRecord(final RowMap r) throws Exception {

RowIdentity pk = r.getRowIdentity();

String key = r.pkToJson(keyFormat);

String value = r.toJSON(outputConfig);

ProducerRecord<String, String> record;

if (r instanceof DDLMap) {

record = new ProducerRecord<>(this.ddlTopic, this.ddlPartitioner.kafkaPartition(r, getNumPartitions(this.ddlTopic)), key, value);

} else {

String topic;

// javascript topic override

topic = r.getKafkaTopic();

if ( topic == null ) {

topic = generateTopic(this.topic, pk);

}

LOGGER.debug("context.getConfig().producerPartitionKey = " + context.getConfig().producerPartitionKey);

record = new ProducerRecord<>(topic, this.partitioner.kafkaPartition(r, getNumPartitions(topic)), key, value);

}

return record;

}

//......

}

  • MaxwellKafkaProducerWorker的makeProducerRecord方法针对DDLMap使用ddlPartitioner.kafkaPartition(r, getNumPartitions(this.ddlTopic))确定partition;非DDLMap的使用partitioner.kafkaPartition(r, getNumPartitions(topic))来确定partition

小结

MaxwellKafkaPartitioner继承了AbstractMaxwellPartitioner,其构造器根据hashFunction类型创建HashFunctionMurmur3或者HashFunctionDefault;其kafkaPartition方法则通过Math.abs(hashFunc.hashCode(this.getHashString(r)) % numPartitions)计算partition

doc

  • MaxwellKafkaPartitioner

以上是 聊聊MaxwellKafkaPartitioner 的全部内容, 来源链接: utcz.com/z/516262.html

回到顶部