Pulsar2.5.0之Schema管理

编程

Pulsar 2.5.0 之 Schema 管理
官网原文标题《Manage schema
翻译时间:2020-02-15
官网原文地址: http://pulsar.apache.org/docs/en/schema-manage/

摘要:本文主要学习如何管理模式以及管理模式的方法。

管理模式的方法:

  • 自动

    • Schema 自动更新
  • 手动

    • Schema 手动管理

    • 自定义 schema 存储

模式自动更新

如果schema通过了schema兼容性检查,Pulsar自动更新从主题上,这个过程是默认的。

生产者自动更新

生产者 自动更新 在以下情况发生:

  • 如果主题上模式不存在,Pulsar会自动注册模式

  • 如果主题上模式存在:

    • 生产者不携带模式:

      • 主题所属的命名空间参数isSchemaValidationEnforced or schemaValidationEnforced 禁用状态,生产者允许连接主题和发送数据

      • 主题所属的命名空间参数isSchemaValidationEnforced or schemaValidationEnforced 启用状态,生产者被拒绝连接主题。

    • 生产者携带模式:

      broker根据主题所属命名空间上已配置的兼容性检查策略执行兼容性检查。

      • 如果模式已注册,生产者可以直接连接broker

      • 如果模式未注册:

        • 参数isAllowAutoUpdateSchema 设置 false,生产者被拒绝连接 broker.

        • 参数isAllowAutoUpdateSchema 设置 true:

          • 如果模式通过兼容性检查, broker会自动注册模式,允许连接到该主题。
        • 如果模式未通过兼容性检查, broker不会自动注册模式,拒绝连接到该主题。

消费者自动更新

消费者 自动更新 在以下情形发生:

  • 如果消费者不使用模式连接主题 (意味着消费者接收字节数据),能成功连接主题且不会进行任何兼容性检查。

  • 如果消费者使用模式连接主题

    • 如果一个主题下没有:

      • 参数 isAllowAutoUpdateSchema 设置为 true, 消费者被允许连接到broker.

      • 参数 isAllowAutoUpdateSchema 设置为 false, 消费者被拒绝连接到 broker.

    • 如果一个主题下没有, 就会进行兼容性检查.

      • 容性检查通过, 消费者被允许连接到broker.

      • 容性检查不通过, 消费者被拒绝连接到 broker.

管理schema 自动更新策略

使用 pulsar-admin 命令管理 AutoUpdate 策略 ,方法如下:

  • 启用自动更新

  • 禁用自动更新

  • 调整兼容性

启用自动更新

在命名空间启用 AutoUpdate , 使用 pulsar-admin 命令.

bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable tenant/namespace

禁用自动更新

在命名空间禁用 AutoUpdate , 使用pulsar-admin 命令.

bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace

如果 AutoUpdate 被禁用, pulsar-admin 命令只能注册一个新模式.

调整兼容性

在命名空间级别调整兼容性, 使用 pulsar-admin 命令.

bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility <compatibility-level> tenant/namespace

模式检查

默认情况下,对生产者而言 schemaValidationEnforced 是禁用状态:

  • 这意味着没有模式的生产者可以向带有模式的主题生成任何类型的消息,这可能会导致向主题生成垃圾数据
  • 这使得不支持scheme的非java语言客户机可以生成带有scheme的主题的消息。

但是,如果您希望对模式主题有更有力的保证,则可以在整个集群或每个命名空间上启用schemaValidationEnforced

启用模式检查

在命名空间上使用 schemaValidationEnforced , 通过pulsar-admin 命令.

bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace

禁用模式检查

在命名空间上 schemaValidationEnforced , 通过 pulsar-admin 命令.

bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace

模式手动管理

管理模式方法如下.

方法

描述

Admin CLI

使用 pulsar-admin 工具管理 Pulsar的 schemas, brokers, clusters, sources, sinks, topics, tenants .了解 pulsar-admin 工具更多详情, 请参阅 这里.

REST API

Pulsar在Pulsar的管理RESTful API中公开了与模式相关的管理API。您可以直接访问admin RESTful端点来管理模式。使用 Pulsar REST API 更多详情, 请参阅 这里.

Java Admin API

Pulsar 提供Java admin 包.

更新模式

To upload (register) a new schema for a topic, you can use one of the following methods.

Admin CLI

使用 upload 命令.

$ pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>

schema-definition-file 是 JSON 格式如下

{

"type":"<schema-type>",

"schema":"<an-utf8-encoded-string-of-schema-definition-data>",

"properties":{}// the properties associated with the schema

}

schema-definition-file 包含如下字段:

Field

Description

type

模式类型

schema

模式定义数据采用用UTF 8字符集编码。如果模式是原语模式,则此字段应为空。如果模式是结构模式,则此字段应为Avro模式定义的JSON字符串。

properties

模式扩展属性

这里有一个 schema-definition-file JSON 格式示例.

示例1

{

"type":"JSON",

"schema":"{"type":"record","name":"User","namespace":"com.foo","fields":[{"name":"file1","type":["null","string"],"default":null},{"name":"file2","type":"string","default":null},{"name":"file3","type":["null","string"],"default":"dfdf"}]}",

"properties":{}

}

示例 2

{

"type":"STRING",

"schema":"",

"properties":{

"key1":"value1"

}

}

REST API

发送一个 POST 请求: POST /admin/v2/schemas/:tenant/:namespace/:topic/schema

请求格式JSON

{

"type":"<schema-type>",

"schema":"<an-utf8-encoded-string-of-schema-definition-data>",

"properties":{}// the properties associated with the schema

}

请求包含如下字段:

Field

Description

type

模式类型

schema

模式定义数据采用用UTF 8字符集编码。如果模式是原语模式,则此字段应为空。如果模式是结构模式,则此字段应为Avro模式定义的JSON字符串。

properties

模式扩展属性

Java Admin API

voidcreateSchema(String topic,PostSchemaPayload schemaPayload)

PostSchemaPayload 包含如下字段:

Field

Description

type

模式类型

schema

模式定义数据采用用UTF 8字符集编码。如果模式是原语模式,则此字段应为空。如果模式是结构模式,则此字段应为Avro模式定义的JSON字符串。

properties

模式扩展属性

这里有一个 PostSchemaPayload 示例:

PulsarAdmin admin =;

PostSchemaPayload payload =newPostSchemaPayload();

payload.setType("INT8");

payload.setSchema("");

admin.createSchema("my-tenant/my-ns/my-topic", payload);

读取模式 (最新)

读取一个主题的最新模式,方法如下.

Admin CLI

使用get 子命令.

$ pulsar-admin schemas get <topic-name>

{

"version": 0,

"type": "String",

"timestamp": 0,

"data": "string",

"properties": {

"property1": "string",

"property2": "string"

}

}

REST API

发送一个 GET 请求: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema

返回结果JSON格式如下.

{

"version":"<the-version-number-of-the-schema>",

"type":"<the-schema-type>",

"timestamp":"<the-creation-timestamp-of-the-version-of-the-schema>",

"data":"<an-utf8-encoded-string-of-schema-definition-data>",

"properties":{}// the properties associated with the schema

}

返回结果包含如下字段:

Field

Description

version

模式版本,long数值型

type

模式类型

`timestamp

创建模式版本的时间戳

`data

模式定义数据采用用UTF 8字符集编码。如果模式是原语模式,则此字段应为空。如果模式是结构模式,则此字段应为Avro模式定义的JSON字符串。

properties

模式扩展属性

Java Admin API

SchemaInfocreateSchema(String topic)

SchemaInfo 包含如下字段:

Field

Description

name

模式名称

type

模式类型

schema

模式定义字节数值数据采用用UTF 8字符集编码。如果模式是原语模式,则此字节数值应为空。如果模式是结构模式,则此字段应为Avro模式定义的JSON字符串通过自己数值转换而来。

properties

模式扩展属性

SchemaInfo示例:

PulsarAdmin admin =;

SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic");

读取模式 (具体)

读取一个具体版本的模式,使用如下方法:

Admin CLI

使用 get 子命令.

$ pulsar-admin schemas get <topic-name> --version=<version> 

REST API

发送一个 GET 请求: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema/:version

这里有一个返回结果示例, JSON 格式.

{

"version":"<the-version-number-of-the-schema>",

"type":"<the-schema-type>",

"timestamp":"<the-creation-timestamp-of-the-version-of-the-schema>",

"data":"<an-utf8-encoded-string-of-schema-definition-data>",

"properties":{}// the properties associated with the schema

}

返回结果包含如下字段:

Field

Description

version

模式版本号,long数值型

type

模式类型

timestamp

模式版本号创建的时间戳

data

模式定义数据采用用UTF 8字符集编码。如果模式是原语模式,则此字段应为空。如果模式是结构模式,则此字段应为Avro模式定义的JSON字符串。

properties

模式扩展属性

Java Admin API

SchemaInfocreateSchema(String topic,long version)

SchemaInfo 包含如下字段:

Field

Description

name

模式名称

type

模式类型

schema

模式定义字节数值数据采用用UTF 8字符集编码。如果模式是原语模式,则此字节数值应为空。如果模式是结构模式,则此字段应为Avro模式定义的JSON字符串通过自己数值转换而来。

properties

模式扩展属性

SchemaInfo 示例:

PulsarAdmin admin =;

SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic",1L);

扩展模式

通过主题创建模式,可以使用以下方法

Admin CLI

使用 extract 子命令.

$ pulsar-admin schemas extract --classname <class-name> --jar <jar-path> --type <type-name>

删除schema

在一个主题上删除一个a schema,方法如下

温馨提示

事实上,删除行为会删除注册在主题上的所有版本

Admin CLI

使用 delete 子命令

$ pulsar-admin schemas delete <topic-name>

REST API

发送 DELETE 请求到schema 端点:

DELETE /admin/v2/schemas/:tenant/:namespace/:topic/schema

返回结果是JSON格式,示例如下:

{

"version":"<the-latest-version-number-of-the-schema>",

}

返回结果包含如下字段:

Field

Description

version

The schema version, which is a long number.

Java Admin API

voiddeleteSchema(String topic)

删除schema示例

PulsarAdmin admin =;

admin.deleteSchema("my-tenant/my-ns/my-topic");

自定义模式存储

默认情况下,Pulsar将各种数据类型的模式存储在与Pulsar一起部署的Apache BookKeeper。但是,如果需要,可以使用其他存储系统。

实现

模式使用非默认存储系统, 需要实现以下Java接口:

  • SchemaStorage interface

  • SchemaStorageFactory interface

SchemaStorage interface

SchemaStorage 接口具有以下方法:

publicinterfaceSchemaStorage{

// How schemas are updated

CompletableFuture<SchemaVersion>put(String key,byte[] value,byte[] hash);

// How schemas are fetched from storage

CompletableFuture<StoredSchema>get(String key,SchemaVersion version);

// How schemas are deleted

CompletableFuture<SchemaVersion>delete(String key);

// Utility method for converting a schema version byte array to a SchemaVersion object

SchemaVersionversionFromBytes(byte[] version);

// Startup behavior for the schema storage client

voidstart()throwsException;

// Shutdown behavior for the schema storage client

voidclose()throwsException;

}

温馨提示

模式存储实现的完整示例, 请参见 BookKeeperSchemaStorage class.

SchemaStorageFactory interface

SchemaStorageFactory 接口具有以下方法:

publicinterfaceSchemaStorageFactory{

@NotNull

SchemaStoragecreate(PulsarService pulsar)throwsException;

}

温馨提示

模式存储工厂实现的完整示例,请参见BookKeeperSchemaStorageFactory class.

部署

要使用自定义模式存储实现,请执行以下步骤

  1. 将实现打包到一个JAR 文件

  2. 将JAR文件添加到Pulsar二进制或源发行版的 lib 文件夹中

  3. broker.conf中的schemaRegistryStorageClassName配置更改为自定义工厂类。

    ---Schema storage ---

    # The schema storage implementation used by this broker

    schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory

  4. 启动 Pulsar



 

以上是 Pulsar2.5.0之Schema管理 的全部内容, 来源链接: utcz.com/z/516853.html

回到顶部