apache-kafka-streams - Kafka Streams 是否可以使用一种格式的消息并生成另一种格式,例如 AVRO 消息
问题描述
我正在使用 kafka 流来使用来自一个主题的 JSON 字符串,处理并生成要存储在另一个主题中的响应。但是,需要生成到响应主题的消息需要采用 avro 格式。
我尝试使用 key 作为 string serde 和 value 作为SpecificAvroSerde
以下是我创建拓扑的代码:
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> consumerStream =builder.stream(kafkaConfiguration.getConsumerTopic());
consumerStream = consumerStream.map(getKeyValueMapper(keyValueMapperClassName));
consumerStream.to(kafkaConfiguration.getProducerTopic());
以下是我的配置
if (schemaRegistry != null && schemaRegistry.length > 0) {
streamsConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, String.join(",", schemaRegistry));
}
streamsConfig.put(this.keySerializerKeyName, StringSerde.class);
streamsConfig.put(this.valueSerialzerKeyName, SpecificAvroSerde.class);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize);
streamsConfig.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.parseInt(commitIntervalMs));
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfThreads);
streamsConfig.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
streamsConfig.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
streamsConfig.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
streamsConfig.put(StreamsConfig.TOPOLOGY_OPTIMIZATION,StreamsConfig.OPTIMIZE);
streamsConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionMode);
streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
当我尝试使用该示例时,我看到以下错误:
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
解决方案
问题出在 Key Value Serdes 上。您应该在使用流时使用正确的 serdes,在发布流时也应该使用正确的 serdes。
如果您的输入是 JSON 并且您想以 Avro 的形式发布,您可以按以下方式进行:
Properties streamsConfig= new Properties();
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> consumerStream =builder.stream(kafkaConfiguration.getConsumerTopic(),Consumed.with(Serdes.String(), Serdes.String()));
// Replace AvroObjectClass with your avro object type
KStream<String,AvroObjectClass> consumerAvroStream = consumerStream.map(getKeyValueMapper(keyValueMapperClassName));
consumerAvroStream.to(kafkaConfiguration.getProducerTopic());
推荐阅读
- javascript - 如何在 map 函数中重用对象?
- c# - Group By 除了实例化一个类
- javascript - 如何限制我的基于 JS 浏览器的 API 应用程序?
- r - 当其中一个选项产生 NA 时,在 R 中使用 ifelse?
- php - 将base64字符串解密为jpeg图像时损坏的图像
- mysql - 导入大型 SQL 转储时面临的问题
- c - intel编译器下-mkl和-lm的区别
- node.js - 如何在 node.js 搜索栏中添加自动完成功能?
- sql-server - 使用 Case When 将 nvarchar 转换为 Int
- python - 如何对 4 列进行分组并根据另一列进行排名?