java - Spring Cloud 上的 KafkaHeaders.RECEIVED_MESSAGE_KEY 与 KafkaHeaders.MESSAGE_KEY 标头
问题描述
我正在使用弹簧云流。KafkaHeaders.RECEIVED_MESSAGE_KEY
我想知道和之间有什么区别KafkaHeaders.MESSAGE_KEY
我有 2 个项目,第一个生成消息KafkaHeaders.MESSAGE_KEY
用作标题:
public void sendResponse(ThirdPartyResponse thirdPartyResponse) {
log.info("Sending response of type 'completed' [{}].", thirdPartyResponse);
integrations.send(
withPayload(ApplicationSubmissionSuccessPayload.success(thirdPartyResponse))
.setHeader(KafkaHeaders.MESSAGE_KEY, thirdPartyResponse.getData().getApplicationId())
.build());
}
第二个使用KafkaHeaders.RECEIVED_MESSAGE_KEY
@StreamListener(target = "ofaOut")
public void receive(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String applicationId, @Payload String payload) throws JsonProcessingException {
...
}
但是我收到了这个错误
2020-03-23 16:13:27.924 ERROR 1 --- [container-0-C-1] o.s.integration.handler.LoggingHandler :
org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class java.lang.String], failedMessage=GenericMessage [payload=byte[739],
headers={kafka_offset=285, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@67c19b7c, deliveryAttempt=3, kafka_timestampType=CREATE_TIME,
kafka_receivedMessageKey=null, kafka_receivedPartitionId=0,
contentType=application/json, kafka_receivedTopic=com.product.foo.ofa.out, kafka_receivedTimestamp=1584715870225, kafka_groupId=aop-foo-kyc}]
它缺少标题
Missing header 'kafka_receivedMessageKey'
我该如何解决?
解决方案
RECEIVED...
设置在入站消息上;另一种是为应用程序指定出站消息的键值。
它们是不同的,以避免在应用程序接收到消息并执行某些工作并将消息重新发布到另一个主题时意外传播。
使用 Spring Integration 时,标头会在消息遍历流时自动复制。
出站消息映射器不映射RECEIVED...
标头,因此它们不会出现在ProducerRecord
.
... kafka_receivedMessageKey=null ...
表示入站记录中的键为空。
要接收空键,请使用
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
推荐阅读
- azure-devops - 您将如何访问与发布管道中的构建相关的工作项(功能、用户故事、错误或任务)的属性?
- python-3.x - 用http.server如何访问被调用的IP地址?
- spring-boot - 如何根据 Application.yaml 中的另一个属性设置属性名称
- sql - 如何在 SQLite 中对类似于 Postgresql 中的 ROW_NUMBER 窗口函数的一些值进行排名?
- vbscript - 读取 AR System Database 并通过 VBScript 导出为 TXT 文件
- visual-studio-code - 为什么 vscode 在其构建中不包含 ffmpeg?
- schema - 从 sch 文件中获取失败的断言
- python-3.x - 从跨度标题中抓取信息
- python - python 多处理似乎不适用于类
- amazon-web-services - Kibana 运行状况为 RED