首页 > 解决方案 > 如何在ProducerRecord中设置kafka message key

问题描述

目前我正在使用 org.springframework.kafka.core.KafkaTemplate 发布带有标题的主题的 avro 消息。

@Override
public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
    ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
    if (!producerRecord.headers().iterator().hasNext()) { // possibly no Jackson
        byte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
        if (correlationId != null) {
            producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);
        }
    }
    return doSend((ProducerRecord<K, V>) producerRecord);
}

在 Message<?> 中,我们可以设置 value 和 headers 但不能设置 key。有没有办法在标题中输入密钥?如果是这样,请让我知道密钥的标题名称吗?有没有办法使用 KafkaTemplate 发送键、值和标头

标签: spring-bootapache-kafkaspring-kafka

解决方案


您必须在 上设置标题ProducerRecord,例如:

RecordHeader yourHeader = new RecordHeader("yourHeaderName", "yourValue".toByteArray())
record.headers().add(recordHeaderKafkaMessageKey)

编辑:误读要求,对不起。消息密钥有一个已定义的 kafka 标头,即KafkaHeaders.MESSAGE_KEY.


推荐阅读