首页 > 解决方案 > 如何从消息头中获取我的主题名称并发布

问题描述

我是弹簧集成的新手。我正在尝试构建一个 spring rest 服务,它将获取带有一些 json 消息的任何 HTTP 请求并发布到 kafka 主题。

我的 json 消息将通过 RequestBody 发布,其中将在消息头中包含主题名称。

我可以将消息从我的控制器发布到 kafka 频道,但是我很难从我的 json 消息头中获取主题名称。

任何人都可以建议一种从我的消息头中获取主题名称的方法(通常一个 HTTP 请求包含一个带有主题名称的 json 消息)并使用该主题来发布消息。

我的 json :

{"resourceType": "MessageHeader",
"topicName": "testToptic",
"messagePayload":{
    "location": "chennai",
    "messageDetail": {
        "department-id": 123,
        "department-name": "SSS",
        "pincode": 600009
    }
}
}}

这是我的 bean 和处理程序

@Bean
public IntegrationFlow hanldeGenericKafka() {
    return IntegrationFlows.from(sendToKafkaChannel)

            .handle(
                    kafkaGenericMessageHandler(producerFactory),
                    e -> e.id("kafkaProducer2"))
            .get();
}

public KafkaProducerMessageHandlerTemplateSpec<String, String> kafkaGenericMessageHandler(
        ProducerFactory<String, String> producer) {

    return Kafka
            .outboundChannelAdapter(producer)
            .sync(true)
            .headerMapper(kafkaDefaultHeaderMapper())
            .messageKey(m -> m.getHeaders()
                    .get("topicname"))
            .configureKafkaTemplate(t -> t.id("kafkaTemplate"));
}

标签: jsonapache-kafkahttp-headersspring-integrationkafka-producer-api

解决方案


您可以使用带有内置JsonPath SpEL 函数的表达式从 JSON 有效负载中提取字段值。

使用适配器中的表达式.topicExpression()


推荐阅读