首页 > 解决方案 > Spring Cloud 上的 KafkaHeaders.RECEIVED_MESSAGE_KEY 与 KafkaHeaders.MESSAGE_KEY 标头

问题描述

我正在使用弹簧云流。KafkaHeaders.RECEIVED_MESSAGE_KEY我想知道和之间有什么区别KafkaHeaders.MESSAGE_KEY

我有 2 个项目,第一个生成消息KafkaHeaders.MESSAGE_KEY用作标题:

    public void sendResponse(ThirdPartyResponse thirdPartyResponse) {

        log.info("Sending response of type 'completed' [{}].", thirdPartyResponse);
        integrations.send(
                withPayload(ApplicationSubmissionSuccessPayload.success(thirdPartyResponse))
                        .setHeader(KafkaHeaders.MESSAGE_KEY, thirdPartyResponse.getData().getApplicationId())
                        .build());

    }

第二个使用KafkaHeaders.RECEIVED_MESSAGE_KEY

@StreamListener(target = "ofaOut")
public void receive(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String applicationId, @Payload String payload) throws JsonProcessingException {

...
}

但是我收到了这个错误

    2020-03-23 16:13:27.924 ERROR 1 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : 
org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class java.lang.String], failedMessage=GenericMessage [payload=byte[739],
 headers={kafka_offset=285, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@67c19b7c, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, 
kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, 
contentType=application/json, kafka_receivedTopic=com.product.foo.ofa.out, kafka_receivedTimestamp=1584715870225, kafka_groupId=aop-foo-kyc}]

它缺少标题

Missing header 'kafka_receivedMessageKey'

我该如何解决?

标签: javaspring-cloudspring-cloud-stream

解决方案


RECEIVED...设置在入站消息上;另一种是为应用程序指定出站消息的键值。

它们是不同的,以避免在应用程序接收到消息并执行某些工作并将消息重新发布到另一个主题时意外传播。

使用 Spring Integration 时,标头会在消息遍历流时自动复制。

出站消息映射器不映射RECEIVED...标头,因此它们不会出现在ProducerRecord.

... kafka_receivedMessageKey=null ...

表示入站记录中的键为空。

要接收空键,请使用

@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)

推荐阅读