java - 我正在尝试使用 kafka 流放置 avro 消息,但它作为二进制数据类型-java
问题描述
我正在尝试将 jsonSerde 作为主题的输入,应该处理记录并需要使用 kafka 流将其作为 Avro 消息放在不同的主题中。输出看起来是二进制的,数据不是实际的 JSON 格式。看起来,喜欢它正在使用默认的 bytearrayserde 作为值和键。我不知道为什么,但我将序列化程序作为 SpecificAvroSerde 提供。
private final static JsonSerde<JsonNode> jsonSerde = new JsonSerde<JsonNode>(JsonNode.class);
private static Map<String, Object> props;
//Serde of specific record
private static SpecificAvroSerde<SpecificRecord> productValueSerde;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kafkaStreamsConfig()
throws UnknownHostException {
props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,
"*****processor-3");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092,localhost:19092,localhost:39092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://localhost:18081");
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients"
+ ".consumer.RoundRobinAssignor");
productValueSerde = new SpecificAvroSerde<SpecificRecord>();
productValueSerde.configure((Collections.singletonMap(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://localhost:18081")),false);
return new StreamsConfig(props);
}
@Bean
public KStream<JsonNode,JsonNode> KStream(StreamsBuilder kStreamBuilder){
KStream<JsonNode,JsonNode> stream = kStreamBuilder.stream("localtest",Consumed.with(jsonSerde, jsonSerde));
try {
KStream<JsonNode,SpecificRecord> avroStream = stream.flatMap((K,V)->actNationalPaperHelper.mapToCoreAvro(K, V));
//avroStream.flatMap((K,V)->System.out.println(V); return avroStream));
avroStream.through("serdetest16",Produced.with(jsonSerde, productValueSerde));
}
catch(Exception e) {
System.out.println(e);
}
return stream;
}
T
解决方案
推荐阅读
- regex - RegEx用于匹配bash中方括号中的字符
- mongodb - Mongodb查找第二列匹配的重复项
- java - 带名字的preparestatement
- c - 为什么在不同作用域中声明的同名变量会被分配相同的内存地址?
- android - Xamarin.Forms 列表视图未在 Android 中更新
- python - 双击批处理文件并在 cronjob 中执行的不同行为
- sql - 如何根据出现多次的月份获取日期?
- r - 如何将函数应用于四分位数子集?
- jenkins - 脚本化和声明性管道之间的 Jenkins 错误处理
- ios - 为什么定时器/倒计时总是用 UITableViewCell 中的初始值重置