首页 > 解决方案 > 我们如何为 nodejs Kafka 生产者中的模式配置 value.subject.name.strategy?

问题描述

我们如何配置 avro 模式的主题命名策略,我想配置key.subject.name.strategyvalue.subject.name.strategy。我知道这可以在Java中完成,但找不到在节点中完成的方法。到目前为止,我已经想出了下面的代码。

function getSchemaRegistryClient() {
        let saslUsername = ACCESS_KEY;
        let saslPassword = SECRET_KEY;
        let schemaRegistryUrl = SCHEMA_REGISTRY_URL;

        var registryClient = new SchemaRegistry({
            host: `https://${saslUsername}:${saslPassword}@${schemaRegistryUrl}`
        });

    return registryClient;
}

function getKafkaClient() {
  console.log('Fetching Kafka Client');

  var kafkaClient = new Kafka({
    brokers: ['url'],
    sasl: {
      mechanism: 'SCRAM-SHA-512',
      username: USERNAME,
      password: PASSWORD
    },
  });
  return kafkaClient;
}

function getKafkaProducerClient() {
      console.log('Trying to fetch kafka producer');

      var producer = getKafkaClient().producer();
      return producer;
}

async function publishToProton(eventToPublish) {

    console.log(`Event before publishing ${JSON.stringify(event)}`); 
    
    var registryClient = getSchemaRegistryClient();

    var encodedMessage = await registryClient.encode(registryId, event);
    let sendParams = {
        topic: topicName,
        messages: [
            {
                key: 'EventKey',
                value: encodedMessage
            }
        ]
    };

    let producer = getKafkaProducerClient();

    await producer.connect();
    await producer.send(sendParams);
    await producer.disconnect();
    console.log(`Proton event is published`);
};

try {
    publishToProton(event);
}catch(e){
    console.log(e);
}

标签: node.jsapache-kafkaconfluent-schema-registry

解决方案


推荐阅读