KafkaGolang客户端介绍

目前常用的Golang Kafka客户端有以下几种,每种都有相应的优缺点
客户端名称 优缺点 
sarama
使用人数比较多,但相对较为难用,性能较好
confluent-kafka-go
对C语言版本的Kafka进行的封装,性能强,但依赖于librdkafka
kafka-go
使用操作简单,但性能较差,生产环境上普通的双核CPU机器,一秒钟只处理300条左右
healer
操作极为简单,性能和sarama类似,此产品为携程一位大牛的作品,目前社区使用人数较少,缺少相应的支持维护
以上几种客户端再测试环境中简单的测试过,经过对比,最终还是采用sarama作为生产环境的Kfaka客户端。
目前看到网上一些消费Kafka topic指定group的例子都是使用sarama-cluster来完成的,目前sarama也是支持指定consumer group的。
Consumer Group是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。同一个topic的数据,会广播给不同的group;同一个group中的worker,只有一个worker能拿到这个数据。换句话说,对于同一个topic,每个group都可以拿到同样的所有数据,但是数据进入group后只能被其中的一个worker消费。group内的worker可以使用多线程或多进程来实现,也可以将进程分散在多台机器上,worker的数量通常不超过partition的数量,且二者最好保持整数倍关系,因为Kafka在设计时假定了一个partition只能被一个worker消费(同一group内)
sarama 生产者例子
package mainimport (
    "fmt"
    "github.com/Shopify/sarama"
)
/*
	初始化NewConfig配置 sarama.NewConfig 
	创建生产者sarama.NewSyncProducer 
	创建消息sarama.ProducerMessage 
	发送消息client.SendMessage
*/
func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true
    msg := &sarama.ProducerMessage{}
    msg.Topic = "TestTopic"
    msg.Value = sarama.StringEncoder("this is a test")
    client, err := sarama.NewSyncProducer([]string{"172.0.0.1:9092"}, config)
    if err != nil {
        fmt.Println("producer close, err:", err)
        return
    }
    defer client.Close()
    pid, offset, err := client.SendMessage(msg)
    if err != nil {
        fmt.Println("send message failed,", err)
        return
    }
    fmt.Printf("pid:%v offset:%v
", pid, offset)
}
运行结果:
pid:0 offset:0再次运行:
pid:0 offset:1再次运行:
pid:0 offset:sarama 消费者例子
例子一:不指定group进行消费
package mainimport (
    "fmt"
    "strings"
    "sync"
    "github.com/Shopify/sarama"
)
var (
    wg sync.WaitGroup
)
func main() {
    //创建消费者
    consumer, err := sarama.NewConsumer(strings.Split("192.168.1.125:9092", ","), nil)
    if err != nil {
        fmt.Println("Failed to start consumer: %s", err)
        return
    }
    //设置分区
    partitionList, err := consumer.Partitions("nginx_log")
    if err != nil {
        fmt.Println("Failed to get the list of partitions: ", err)
        return
    }
    fmt.Println(partitionList)
    //循环分区
    for partition := range partitionList {
        pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("Failed to start consumer for partition %d: %s
", partition, err)
            return
        }
        defer pc.AsyncClose()
        wg.Add(1)
        go func(pc sarama.PartitionConsumer) {
            defer wg.Done()
            for msg := range pc.Messages() {
                fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
                fmt.Println()
            }
        }(pc)
    }
    //time.Sleep(time.Hour)
    wg.Wait()
    consumer.Close()
}
例子二:指定group进行消费
package mainimport (
	"context"
	"flag"
	"log"
	"os"
	"os/signal"
	"strings"
	"sync"
	"syscall"
	"github.com/Shopify/sarama"
)
// Sarama configuration options
var (
	brokers  = ""
	version  = ""
	group    = ""
	topics   = ""
	assignor = ""
	oldest   = true
	verbose  = false
)
func init() {
	flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list")
	flag.StringVar(&group, "group", "", "Kafka consumer group definition")
	flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version")
	flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma seperated list")
	flag.StringVar(&assignor, "assignor", "range", "Consumer group partition assignment strategy (range, roundrobin, sticky)")
	flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial offset from oldest")
	flag.BoolVar(&verbose, "verbose", false, "Sarama logging")
	flag.Parse()
	if len(brokers) == 0 {
		panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
	}
	if len(topics) == 0 {
		panic("no topics given to be consumed, please set the -topics flag")
	}
	if len(group) == 0 {
		panic("no Kafka consumer group defined, please set the -group flag")
	}
}
func main() {
	log.Println("Starting a new Sarama consumer")
	if verbose {
		sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
	}
	version, err := sarama.ParseKafkaVersion(version)
	if err != nil {
		log.Panicf("Error parsing Kafka version: %v", err)
	}
	/**
	 * Construct a new Sarama configuration.
	 * The Kafka cluster version has to be defined before the consumer/producer is initialized.
	 */
	config := sarama.NewConfig()
	config.Version = version
	switch assignor {
	case "sticky":
		config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
	case "roundrobin":
		config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
	case "range":
		config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
	default:
		log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
	}
	if oldest {
		config.Consumer.Offsets.Initial = sarama.OffsetOldest
	}
	/**
	 * Setup a new Sarama consumer group
	 */
	consumer := Consumer{
		ready: make(chan bool),
	}
	ctx, cancel := context.WithCancel(context.Background())
	client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
	if err != nil {
		log.Panicf("Error creating consumer group client: %v", err)
	}
	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
				log.Panicf("Error from consumer: %v", err)
			}
			// check if context was cancelled, signaling that the consumer should stop
			if ctx.Err() != nil {
				return
			}
			consumer.ready = make(chan bool)
		}
	}()
	<-consumer.ready // Await till the consumer has been set up
	log.Println("Sarama consumer up and running!...")
	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-ctx.Done():
		log.Println("terminating: context cancelled")
	case <-sigterm:
		log.Println("terminating: via signal")
	}
	cancel()
	wg.Wait()
	if err = client.Close(); err != nil {
		log.Panicf("Error closing client: %v", err)
	}
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
	ready chan bool
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
	// Mark the consumer as ready
	close(consumer.ready)
	return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim"s Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	// NOTE:
	// Do not move the code below to a goroutine.
	// The `ConsumeClaim` itself is called within a goroutine, see:
	// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
	for message := range claim.Messages() {
		log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
		session.MarkMessage(message, "")
	}
	return nil
}
$ go run main.go -brokers="127.0.0.1:9092" -topics="sarama" -group="example"以上是 KafkaGolang客户端介绍 的全部内容, 来源链接: utcz.com/z/512092.html






