Kafka之Producer生产者分区

编程

    kafka-client的版本:0.10

    有个很重要的类Partitioner

    List-1

public interface Partitioner extends Configurable {

/**

* Compute the partition for the given record.

*

* @param topic The topic name

* @param key The key to partition on (or null if no key)

* @param keyBytes The serialized key to partition on( or null if no key)

* @param value The value to partition on or null

* @param valueBytes The serialized value to partition on or null

* @param cluster The current cluster metadata

*/

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

public void close();

}

    如List-1所示,发送消息时,传入到partition方法中,返回的int值就是分区号,即发送到哪个分区,默认的实现是DefaultPartitioner,如下List-2

    List-2

public class DefaultPartitioner implements Partitioner {

private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());

private static int toPositive(int number) {

return number & 0x7fffffff;

}

public void configure(Map<String, ?> configs) {}

/**

* Compute the partition for the given record.

*

* @param topic The topic name

* @param key The key to partition on (or null if no key)

* @param keyBytes serialized key to partition on (or null if no key)

* @param value The value to partition on or null

* @param valueBytes serialized value to partition on or null

* @param cluster The current cluster metadata

*/

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

//1获取所有的分区信息

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

int numPartitions = partitions.size();

if (keyBytes == null) {

       //2如果没有指定key  

int nextValue = counter.getAndIncrement();

List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);

if (availablePartitions.size() > 0) {

          //3    

int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();

return availablePartitions.get(part).partition();

} else {

// no partitions are available, give a non-available partition

//4

return DefaultPartitioner.toPositive(nextValue) % numPartitions;

}

} else {

// hash the keyBytes to choose a partition

//5 如果有key,那么根据key进行hash计算得到hash值,之后对分区数取模

return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

}

}

public void close() {}

}

  • 从cluster中获取集群的分区信息
  • 如果我们没有指定消息key,那个获取下一个递增int值
  • 3和4中,用递增值对分区数取模,这样达到轮询发送到各个分区
  • 如果指定了指定了key,那么对key取hash值,之后用hash值对分区数取模

    所以如果没有指定key,那么使用轮询发送到各个分区;如果指定了key,那么同key的发送到相同的分区

    如果我们想随机发送到分区,那个我们应该实现这个接口,而后指定使用这个分区算法。

 

    

以上是 Kafka之Producer生产者分区 的全部内容, 来源链接: utcz.com/z/518227.html

回到顶部