首页 > 解决方案 > 在 Apache Pulsar 中使用注册模式发布到主题

问题描述

如Pulsar Schema Registry Docs中的示例所示

Producer<User> producer = client.newProducer(JSONSchema.of(User.class))
    .topic(topic)
    .create();
User user = new User(“Tom”, 28);
producer.send(User);

您可以使用 Java 客户端为生产者和消费者注册模式。还提到其他语言的客户端不支持模式注册表。

现在是否可以从 Python API 生产者发送关于 Pulsar 主题的消息,该消息将由具有注册模式的消费者使用?例如

processor = PulsarClient.builder()
            .serviceUrl("pulsar://pulsarhost:6650")
            .build()
            .newConsumer(JSONSchema.of(User.class))
            .topic("sometopic")
            .subscriptionName("somesubscription")
            .subscribe();

Python:导入脉冲星

client = pulsar.Client('pulsar://pulsarhost:6650')

producer = client.create_producer('sometopic')
client.close()

标签: apache-pulsarpython-pulsar

解决方案


从 Pulsar 2.4 版本开始,您也可以在发布和使用时在 Python 中声明模式。

鉴于 Python 对象的动态特性,我们定义了一个Record可以用来显式声明模式格式的类。例如:

import pulsar
from pulsar.schema import *

class Example(Record):
    a = String()
    b = Integer()
    c = Boolean()


client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
                    topic='my-topic',
                    schema=AvroSchema(Example) )

producer.send(Example(a='Hello', b=1))

Python 客户端文档的更多示例:https ://pulsar.apache.org/docs/en/client-libraries-python/#declaring-and-validating-schema


推荐阅读