首页 > 解决方案 > 通过流式消息中的代码定义主题

问题描述

我有一MessageSender堂课,它向kafka发送消息。

package com.pax.order.workflow.messages;

import org.json.simple.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.ObjectMapper;

@Component
@EnableBinding(Source.class)
public class MessageSender {
  
  @Autowired
  private MessageChannel output; 
  
  @Autowired
  private ObjectMapper objectMapper;
  
  public void send(JSONObject m) {
    try {
      // avoid too much magic and transform ourselves
      String jsonMessage = objectMapper.writeValueAsString(m);
      // wrap into a proper message for the transport (Kafka/Rabbit) and send it      
      output.send(
          MessageBuilder.withPayload(jsonMessage).build());
    } catch (Exception e) {
      throw new RuntimeException("Could not tranform and send message due to: "+ e.getMessage(), e);
    }
  }
}

它的目的地是在这样的属性文件中定义的。

spring.cloud.stream.bindings.output.destination=eventtopic

我如何覆盖它并在需要时向其他主题发送消息?

标签: javaspring-bootapache-kafkaspring-cloud-stream

解决方案


推荐阅读