Pulsar2.5.0之Javaclient

编程

Pulsar 2.5.0 之Java client

官网原文标题《Pulsar Java client》

翻译时间:2020-02-14

官网原文地址:http://pulsar.apache.org/docs/en/client-libraries-java/

译者:本文介绍如何使用javaClient创建生产者、消费者以及通过管理后台接口读取消息。

Pulsar Java client

通过Java client 可以创建生产者、消费者以及读取消息,当前API版本为2.5.0,包括两大块内容

描述

Maven Artifact

org.apache.pulsar.client.api

创建生产和创建消息者 API

org.apache.pulsar:pulsar-client:2.5.0

org.apache.pulsar.client.admin

admin API

org.apache.pulsar:pulsar-client-admin:2.5.0

本章重点是创建生产和创建消息者 API如何使用,关于 admin client API 仔细阅读文档 Pulsar admin interface

Java Client 导入包方式

Maven

在pom.xml 文件添加如下

<!-- in your <properties> block -->

<pulsar.version>2.5.0</pulsar.version>

<!-- in your <dependencies> block -->

<dependency>

<groupId>org.apache.pulsar</groupId>

<artifactId>pulsar-client</artifactId>

<version>${pulsar.version}</version>

</dependency>

Gradle

build.gradle 文件添加如下信息

def pulsarVersion = "2.5.0"

dependencies {

compile group: "org.apache.pulsar", name: "pulsar-client", version: pulsarVersion

}

Java Client URLS

客户端client 通过pulsar协议来进行通讯,类型列表如下

  • 集群模式:pulsar://localhost:6650
  • Brokers :pulsar://localhost:6550,localhost:6651,localhost:6652
  • 生产集群:pulsar://pulsar.us-west.example.com:6650
  • TLS 认证:pulsar+ssl://pulsar.us-west.example.com:6651

如何创建 PulsarClient 对象

默认示例:

PulsarClient client =PulsarClient.builder()

.serviceUrl("pulsar://localhost:6650")

.build();

多brokers示例

PulsarClient client =PulsarClient.builder()

.serviceUrl("pulsar://localhost:6650,localhost:6651,localhost:6652")

.build();

本地集群模式 默认 broker URL pulsar://localhost:6650

通过loadConf 可以进行自定义 PulsarClient 参数

参数配置

参数类型

参数名称

描述

默认值

String

serviceUrl

 

String

authPluginClassName

 

String

authParams

 

long

operationTimeoutMs

 

30000

long

statsIntervalSeconds

 

60

int

numIoThreads

 

1

int

numListenerThreads

 

1

boolean

useTcpNoDelay

 

true

boolean

useTls

 

false

string

tlsTrustCertsFilePath

 

boolean

tlsAllowInsecureConnection

 

false

boolean

tlsHostnameVerificationEnable

 

false

int

concurrentLookupRequest

 

5000

int

maxLookupRequest

 

50000

int

maxNumberOfRejectedRequestPerConnection

 

50

int

keepAliveIntervalSeconds

 

30

int

connectionTimeoutMs

 

10000

int

requestTimeoutMs

 

60000

int

defaultBackoffIntervalNanos

 

TimeUnit.MILLISECONDS.toNanos(100);

long

maxBackoffIntervalNanos

 

TimeUnit.SECONDS.toNanos(30)

Producer 对象创建与使用

发送消息,需要创建Producer对象对指定的topic发送消息

PulsarClient client =PulsarClient.builder()

.serviceUrl("pulsar://localhost:6650")

.build();

Producer<byte[]> producer = client.newProducer()

.topic("my-topic")

.create();

*// You can then send messages to the broker and topic you specified:*

producer.send("My message".getBytes());

默认情况下 messages schema. 是字节数组,schema是可以根据自己的业务场景进行选择的。例如我们可以使用string 类型schema。

Producer<String> stringProducer = client.newProducer(Schema.STRING)

.topic("my-topic")

.create();

stringProducer.send("My message");

对象在使用完之后需要进行释放

producer.close();

consumer.close();

client.close();

关闭释放操作也有异步方法

producer.closeAsync()

.thenRun(()->System.out.println("Producer closed"))

.exceptionally((ex)->{

System.err.println("Failed to close producer: "+ ex);

returnnull;

});

Producer 对象可以使用默认配置,也可以自定义配置参数,自定义配置是通过loadConf 来进行配置。

参数类型

参数名称

描述

默认值

String

topicName

 

null

String

producerName

 

null

long

sendTimeoutMs

 

30000

boolean

blockIfQueueFull

 

false

int

maxPendingMessages

 

1000

int

maxPendingMessagesAcrossPartitions

 

50000

MessageRoutingMode

messageRoutingMode

 

pulsar.RoundRobinDistribution

HashingScheme

hashingScheme

 

HashingScheme.JavaStringHash

ProducerCryptoFailureAction

cryptoFailureAction

 

ProducerCryptoFailureAction.FAIL

long

batchingMaxPublishDelayMicros

 

TimeUnit.MILLISECONDS.toMicros(1)

int

batchingMaxMessages

 

1000

boolean

batchingEnabled

 

true

CompressionType

compressionType

 

No compression

Producer自定义配置示例:

Producer<byte[]> producer = client.newProducer()

.topic("my-topic")

.batchingMaxPublishDelay(10,TimeUnit.MILLISECONDS)

.sendTimeout(10,TimeUnit.SECONDS)

.blockIfQueueFull(true)

.create();

如果Producer 创建了分片topic,发送消息时需要消息路由模式发送,消息路由了解更多,请阅读 Partitioned Topics cookbook.

异步发送消息,返回MessageId包装对象

producer.sendAsync("my-async-message".getBytes()).thenAccept(msgId ->{

System.out.printf("Message with ID %s successfully sent", msgId);

});

Message 对象自定义配置示例:

producer.newMessage()

.key("my-message-key")

.value("my-async-message".getBytes())

.property("my-key","my-value")

.property("my-other-key","my-other-value")

.send();

还可以使用sendAsync()方法,这个方法有返回值

Consumer

通过PulsarClient 对象来创建Consumer对象,Consumer对象在创建的时候需要指定主题与订阅名称。对象创建完成,就可以来对订阅的主题进行消费

示例:

Consumer consumer = client.newConsumer()

.topic("my-topic")

.subscriptionName("my-subscription")

.subscribe();

通过while 循环来监听与接收内容,如果接收失败,可以通过negativeAcknowledge方法,重新投递消息

while(true){

// Wait for a message

Message msg = consumer.receive();

try{

// Do something with the message

System.out.printf("Message received: %s",newString(msg.getData()));

// Acknowledge the message so that it can be deleted by the message broker

consumer.acknowledge(msg);

}catch(Exception e){

// Message failed to process, redeliver later

consumer.negativeAcknowledge(msg);

}

}

可以使用默认配置,也可以通过loadConf 进行自定义配置

参数类型

参数名称

描述

默认值

Set<String>

topicNames

 

Sets.newTreeSet()

Pattern

topicsPattern

 

None

String

subscriptionName

 

None

SubscriptionType

subscriptionType

 

SubscriptionType.Exclusive

int

receiverQueueSize

 

1000

long

acknowledgementsGroupTimeMicros

 

TimeUnit.MILLISECONDS.toMicros(100)

long

negativeAckRedeliveryDelayMicros

 

TimeUnit.MINUTES.toMicros(1)

int

maxTotalReceiverQueueSizeAcrossPartitions

 

50000

String

consumerName

 

null

long

ackTimeoutMillis

 

0

long

tickDurationMillis

 

1000

int

priorityLevel

 

0

ConsumerCryptoFailureAction

cryptoFailureAction

 

ConsumerCryptoFailureAction.FAIL

SortedMap<String, String>

properties

 

new TreeMap<>()

boolean

readCompacted

 

false

SubscriptionInitialPosition

subscriptionInitialPosition

 

SubscriptionInitialPosition.Latest

int

patternAutoDiscoveryPeriod

 

1

RegexSubscriptionMode

regexSubscriptionMode

 

RegexSubscriptionMode.PersistentOnly

DeadLetterPolicy

deadLetterPolicy

 

None

boolean

autoUpdatePartitions

 

true

boolean

replicateSubscriptionState

 

false

示例

Consumer consumer = client.newConsumer()

.topic("my-topic")

.subscriptionName("my-subscription")

.ackTimeout(10,TimeUnit.SECONDS)

.subscriptionType(SubscriptionType.Exclusive)

.subscribe();

Consumer接收方式

  • 异步接收

    CompletableFuture<Message> asyncMessage = consumer.receiveAsync();

  • 批量接收

    Messages messages = consumer.batchReceive();

    for(message in messages){

    // do something

    }

    consumer.acknowledge(messages)

自定义批量接收策略

Consumer consumer = client.newConsumer()

.topic("my-topic")

.subscriptionName("my-subscription")

.batchReceivePolicy(BatchReceivePolicy.builder()

.maxNumMessages(100)

.maxNumBytes(1024*1024)

.timeout(200,TimeUnit.MILLISECONDS)

.build())

.subscribe();

默认批量接收策略

BatchReceivePolicy.builder()

.maxNumMessage(-1)

.maxNumBytes(10*1024*1024)

.timeout(100,TimeUnit.MILLISECONDS)

.build();

多主题订阅

当consumer订阅pulsar的主题,默认情况下,它订阅了一个指定的主题,例如:persistent://public/default/my-topic。从Pulsar的1.23.0-incubating的版本,Pulsar消费者可以同时订阅多个topic。你可以用以下两种方式定义topic的列表:

  • 通过基础的正则表达式(regex),例如 persistent://public/default/finance-.*
  • 通过明确定义的topic列表

通过正则订阅多主题时,所有的主题必须在同一个命名空间(namespace)

当订阅多主题时,pulsar客户端会自动调用Pulsar的API来发现匹配表达式的所有topic,然后全部订阅。如果此时有暂不存在的topic,那么一旦这些topic被创建,conusmer会自动订阅。

不能保证顺序性

当消费者订阅多主题时,pulsar所提供对单一主题订阅的顺序保证,就hold不住了。如果你在使用pulsar的时候,遇到必须保证顺序的需求,我们强烈建议不要使用此特性

正则表达式订阅

importorg.apache.pulsar.client.api.Consumer;

importorg.apache.pulsar.client.api.PulsarClient;

importjava.util.Arrays;

importjava.util.List;

importjava.util.regex.Pattern;

ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()

.subscriptionName(subscription);

// Subscribe to all topics in a namespace

Pattern allTopicsInNamespace =Pattern.compile("persistent://public/default/.*");

Consumer allTopicsConsumer = consumerBuilder

.topicsPattern(allTopicsInNamespace)

.subscribe();

// Subscribe to a subsets of topics in a namespace, based on regex

Pattern someTopicsInNamespace =Pattern.compile("persistent://public/default/foo.*");

Consumer allTopicsConsumer = consumerBuilder

.topicsPattern(someTopicsInNamespace)

.subscribe();

列表订阅

List<String> topics =Arrays.asList(

"topic-1",

"topic-2",

"topic-3"

);

Consumer multiTopicConsumer = consumerBuilder

.topics(topics)

.subscribe();

*// Alternatively:*

Consumer multiTopicConsumer = consumerBuilder

.topics(

"topic-1",

"topic-2",

"topic-3"

)

.subscribe();

异步订阅

Pattern allTopicsInNamespace =Pattern.compile("persistent://public/default.*");

consumerBuilder

.topics(topics)

.subscribeAsync()

.thenAccept(this::receiveMessageFromConsumer);

privatevoidreceiveMessageFromConsumer(Consumer consumer){

consumer.receiveAsync().thenAccept(message ->{

// Do something with the received message

receiveMessageFromConsumer(consumer);

});

}

订阅模式

Pulsar 提供了多样化的订阅模式来满足实际中的业务场景。

为了能更好的理解订阅多样化的模式,我们通过创建一个生产者,往指定主题名称为my-topic的主题发送10条消息,来进行分析对比。

发送示例:

Producer<String> producer = client.newProducer(Schema.STRING)

.topic("my-topic")

.enableBatching(false)

.create();

// 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"

producer.newMessage().key("key-1").value("message-1-1").send();

producer.newMessage().key("key-1").value("message-1-2").send();

producer.newMessage().key("key-1").value("message-1-3").send();

producer.newMessage().key("key-2").value("message-2-1").send();

producer.newMessage().key("key-2").value("message-2-2").send();

producer.newMessage().key("key-2").value("message-2-3").send();

producer.newMessage().key("key-3").value("message-3-1").send();

producer.newMessage().key("key-3").value("message-3-2").send();

producer.newMessage().key("key-4").value("message-4-1").send();

producer.newMessage().key("key-4").value("message-4-2").send();

Exclusive订阅模式

独占模式,只能有一个消费者绑定到订阅(subscription)上。如果多于一个消费者尝试以同样方式去订阅主题,消费者将会收到错误。

Consumer consumer = client.newConsumer()

.topic("my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Exclusive)

.subscribe()

Failover订阅模式

灾备模式中,多个consumer可以绑定到同一个subscription。consumer将会按字典顺序排序,第一个consumer被初始化为唯一接受消息的消费者。这个consumer被称为master consumer。当master consumer断开时,所有的消息(未被确认和后续进入的)将会被分发给队列中的下一个consumer。

Consumer consumer1 = client.newConsumer()

.topic("my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Failover)

.subscribe()

Consumer consumer2 = client.newConsumer()

.topic("my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Failover)

.subscribe()

运行结果

//conumser1 is the active consumer, consumer2 is the standby consumer.

//consumer1 receives 5 messages and then crashes, consumer2 takes over as an active consumer.

Shared 订阅模式

shared或者round robin模式中,多个消费者可以绑定到同一个订阅上。消息通过round robin轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。

Consumer consumer1 = client.newConsumer()

.topic("my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Shared)

.subscribe()

Consumer consumer2 = client.newConsumer()

.topic("my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Shared)

.subscribe()

//Both consumer1 and consumer 2 is active consumers.

运行结果

消费者1

("key-1", "message-1-1")

("key-1", "message-1-3")

("key-2", "message-2-2")

("key-3", "message-3-1")

("key-4", "message-4-1")

消费者2

("key-1", "message-1-2")

("key-2", "message-2-1")

("key-2", "message-2-3")

("key-3", "message-3-2")

("key-4", "message-4-2")

Key_Shared订阅模式

2.4.0 版本之后新扩展的订阅模式

Consumer consumer1 = client.newConsumer()

.topic("my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Key_Shared)

.subscribe()

Consumer consumer2 = client.newConsumer()

.topic("my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Key_Shared)

.subscribe()

//Both consumer1 and consumer2 are active consumers.

运行结果

消费者1

("key-1", "message-1-1")

("key-1", "message-1-2")

("key-1", "message-1-3")

("key-3", "message-3-1")

("key-3", "message-3-2")

消费者2

("key-2", "message-2-1")

("key-2", "message-2-2")

("key-2", "message-2-3")

("key-4", "message-4-1")

("key-4", "message-4-2")

Reader

使用Pulsar的 读取器接口, 应用程序可以手动管理游标。 当使用读取器连接到一个主题而非消费者时,在读取器连接到主题的时候就需要指定读取器从哪个位置开始读消息。当连接到某个主题时, 读取器从以下位置开始读消息:

  • ​ 主题中最早的可用消息
  • ​ 主题中最新可用消息
  • ​ 指定的消息ID

示例:

ReaderConfiguration conf =newReaderConfiguration();

byte[] msgIdBytes =*// Some message ID byte array*

MessageId id =MessageId.fromByteArray(msgIdBytes);

Reader reader = pulsarClient.newReader()

.topic(topic)

.startMessageId(id)

.create();

while(true){

Message message = reader.readNext();

*// Process message*

}

Reader 配置

Reader 配置

参数类型

参数名称

描述

默认值

String

topicName

 

None

int

receiverQueueSize

 

1000

ReaderListener<T>

readerListener

 

None

String

readerName

 

null

String

subscriptionRolePrefix

 

null

CryptoKeyReader

cryptoKeyReader

 

null

ConsumerCryptoFailureAction

cryptoFailureAction

 

ConsumerCryptoFailureAction.FAIL

boolean

readCompacted

 

false

boolean

resetIncludeHead

 

false

reader范围读取策略

范围值区间0~65535,最大值不能超过65535

示例

pulsarClient.newReader()

.topic(topic)

.startMessageId(MessageId.earliest)

.keyHashRange(Range.of(0,10000),Range.of(20001,30000))

.create();

Schema

Pulsar应用中,如果开发者没有为topic指定schema,producer和consumer将会处理原始字节。但实际情况我们的期望不是这样的,我们期望能按自己的数据格式进行发送,我们需要支持不同的数据类型。 Message schemas 就能很好的支持这种业务场景。

Schema示例

publicclassSensorReading{

publicfloat temperature;

publicSensorReading(float temperature){

this.temperature = temperature;

}

// A no-arg constructor is required

publicSensorReading(){

}

publicfloatgetTemperature(){

return temperature;

}

publicvoidsetTemperature(float temperature){

this.temperature = temperature;

}

}

Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))

.topic("sensor-readings")

.create();

目前java client支持shcema 如下

  • Schema.BYTES

Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)

.topic("some-raw-bytes-topic")

.create();

//or

Producer<byte[]> bytesProducer = client.newProducer()

.topic("some-raw-bytes-topic")

.create();

  • Schema.STRING

Producer<String> stringProducer = client.newProducer(Schema.STRING)

.topic("some-string-topic")

.create();

  • Schema.JSON

Producer<MyPojo> pojoProducer = client.newProducer(Schema.JSON(MyPojo.class))

.topic("some-pojo-topic")

.create();

  • Schema.PROTOBUF

Producer<MyProtobuf> protobufProducer = client.newProducer(Schema.PROTOBUF(MyProtobuf.class))

.topic("some-protobuf-topic")

.create();

  • Schema.AVRO

Producer<MyAvro> avroProducer = client.newProducer(Schema.AVRO(MyAvro.class))

.topic("some-avro-topic")

.create();

身份验证与认证

Pulsar支持两种方式

  • TLS
  • Athenz

TLS 身份认证使用需要将setUseTls设置为true ,同时需要设置TLS cert 路径

示例

Map<String,String> authParams =newHashMap<>();

authParams.put("tlsCertFile","/path/to/client-cert.pem");

authParams.put("tlsKeyFile","/path/to/client-key.pem");

Authentication tlsAuth =AuthenticationFactory

.create(AuthenticationTls.class.getName(), authParams);

PulsarClient client =PulsarClient.builder()

.serviceUrl("pulsar+ssl://my-broker.com:6651")

.enableTls(true)

.tlsTrustCertsFilePath("/path/to/cacert.pem")

.authentication(tlsAuth)

.build();

Athenz 需要设置TLS,同时需要初始化如下参数

  • tenantDomain
  • tenantService
  • providerDomain
  • privateKey

privateKey支持三种格式

  • file:///path/to/file
  • file:/path/to/file
  • data:application/x-pem-file;base64,<base64-encoded value>

示例

Map<String,String> authParams =newHashMap<>();

authParams.put("tenantDomain","shopping");// Tenant domain name

authParams.put("tenantService","some_app");// Tenant service name

authParams.put("providerDomain","pulsar");// Provider domain name

authParams.put("privateKey","file:///path/to/private.pem");// Tenant private key path

authParams.put("keyId","v1");// Key id for the tenant private key (optional, default: "0")

Authentication athenzAuth =AuthenticationFactory

.create(AuthenticationAthenz.class.getName(), authParams);

PulsarClient client =PulsarClient.builder()

.serviceUrl("pulsar+ssl://my-broker.com:6651")

.enableTls(true)

.tlsTrustCertsFilePath("/path/to/cacert.pem")

.authentication(athenzAuth)

.build();


 

以上是 Pulsar2.5.0之Javaclient 的全部内容, 来源链接: utcz.com/z/516822.html

回到顶部