spring - 如何配置 Spring Cloud StreamBridge 来生产 Avro?
问题描述
所以我正在尝试使用 StreamBridge 将消息动态发送到不同的主题。如果我的输出是Message< String>,但不是Message< GenericRecord> ,我就成功了
代码示例:
@StreamListener(Sink.INPUT)
public void process(@Payload GenericRecord messageValue,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) GenericRecord messageKey,
@Header("Type") String type) {
log.info("Processing Event --> " + messageValue);
// Code...
// convert to Message<GenericRecord>
Message<GenericRecord> message = ...
streamBridge.send(type, message);
log.info("Processed Event --> " + messageValue);
}
我得到的错误是Caused by: org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map:
我猜是因为 streamBridge acceptedOutputTypes = application/json
2020-06-28 04:42:55.670 INFO 54347 --- [container-0-C-1] o.s.c.f.c.c.SimpleFunctionRegistry : Looking up function 'streamBridge' with acceptedOutputTypes: [application/json]
我尝试通过在我的属性中设置以下内容来将接受的输出类型修改为 avro,但这不起作用。
spring.cloud.stream.function.definition=streamBridge
spring.kafka.producer.key-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.bindings.streamBridge-out-0.content-type=application/*+avro
spring.cloud.stream.bindings.streamBridge-out-0.producer.use-native-encoding=true
关于如何将 StreamBridge 配置为 avro 的任何想法?
编辑:我也试过streamBridge.send(type, message, MimeType.valueOf("application/*+avro"))
,但也有转换错误。
解决方案
我无法让 StreamBridge 动态工作,所以我转而使用 Function:
@Bean
public Function<Message<GenericRecord>, Message<GenericRecord>> process() {
return message -> {
// Code...
String topic = message.getHeaders().get("type");
// convert to Message<GenericRecord>
Message<GenericRecord> message = MessageBuilder...
.setHeader("spring.cloud.stream.sendto.destination", topic)
.build();
return outgoingMessage;
};
}
属性文件是:
spring.cloud.function.definition=process
spring.cloud.stream.bindings.process-in-0.destination=${consumer_topic}
spring.cloud.stream.bindings.process-in-0.group=${spring.application.name}
spring.cloud.stream.bindings.process-out-0.content-type=application/*+avro
spring.cloud.stream.bindings.process-out-0.producer.use-native-encoding=true
编辑:Streambridge 得到修复以支持此:https ://github.com/spring-cloud/spring-cloud-stream/issues/2007
推荐阅读
- machine-learning - 多层感知器与 SVM / 随机森林
- python - 如何使用openCV将图像保存在文件夹中
- c - 将整数添加到指针有什么作用?
- java - How to check if excel column headers are in certain order in apache poi
- spring-mvc - 没有xml的基于Spring MVC java的配置不起作用
- reactjs - ReactNative 警告:列表中的每个孩子都应该有一个唯一的“关键”道具
- laravel - 我需要从淘汰赛中获取 id 并以表格形式传递
- c# - 调用注入类的事件
- django - 如何检查Django中是否已经存在类?
- php - 如何在codeigniter中使用mobiledetect.net将mydomain.com重定向到m.mydomain.com?