java - Spring Cloud Stream 不使用 Kafka 通道绑定器发送消息
问题描述
我正在努力实现以下目标:
将 Spring Cloud Stream 2.1.3.RELEASE 与 Kafka binder 一起使用,将消息发送到输入通道并实现发布订阅行为,其中每个消费者都将收到通知并能够处理发送到 Kafka 主题的消息。
我知道在 Kafka 中,如果每个消费者都属于自己的消费者组,那么将能够读取来自主题的每条消息。在我的例子中,spring 为我运行的 spring boot 应用程序的每个实例创建了一个匿名的唯一消费者组。Spring Boot 应用程序只有一个流侦听器配置为侦听输入通道。
测试示例案例:
使用绑定到 Kafka 主题的输入通道配置了示例 Spring Cloud Stream 应用程序。使用 spring rest 控制器向输入通道发送消息,期望该消息将被传递到每个运行的 spring boot 应用程序实例。在启动时的两个应用程序中,我可以看到 kafka 分区已正确分配。
问题:
但是,当我使用 output().send() 发送消息时,spring 甚至不会向配置的 Kafka 主题发送消息,而是在同一线程中触发同一应用程序实例的 @StreamListener 方法。
在调试期间,我看到 spring 代码有两个消息处理程序。StreamListenerMessageHandler 和 KafkaProducerMessageHandler。Spring 只是将它们链接起来,如果第一个处理程序以成功结束,那么它甚至不会更进一步。StreamListenerMessageHandler 只是在同一个线程中调用我的 @StreamListener 方法,消息永远不会到达 Kafka。
问题:
这是设计使然,在这种情况下,为什么会这样?我怎样才能实现帖子开头提到的行为?
PS。如果我使用 KafkaTemplate 和 @KafkaListener 方法,那么它可以按我的意愿工作。消息被发送到 Kafka 主题,两个应用程序实例都接收消息并在 Kafka 侦听器注释方法中处理它。
代码:
流监听方法配置如下:
@SpringBootApplication
@EnableBinding(Processor.class)
@EnableTransactionManagement
public class ProcessorApplication {
private Logger logger =
LoggerFactory.getLogger(this.getClass().getName());
private PersonRepository repository;
public ProcessorApplication(PersonRepository repository) {
this.repository = repository;
}
public static void main(String[] args) {
SpringApplication.run(ProcessorApplication.class, args);
}
@Transactional
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public PersonEvent process(PersonEvent data) {
logger.info("Received event={}", data);
Person person = new Person();
person.setName(data.getName());
logger.info("Saving person={}", person);
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
logger.info("Sent event={}", event);
return event;
}
}
向输入通道发送消息:
@RestController()
@RequestMapping("/persons")
public class PersonController {
@Autowired
private Sink sink;
@PostMapping("/events")
public void createPerson(@RequestBody PersonEvent event) {
sink.input().send(MessageBuilder.withPayload(event).build());
}
}
Spring Cloud Stream 配置:
spring:
cloud.stream:
bindings:
output.destination: person-event-output
input.destination: person-event-input
解决方案
sink.input().send
您完全绕过活页夹并将其直接发送到流侦听器。
您需要将消息发送到 kafka(发送到person-event-input
主题),然后每个流侦听器都会收到来自 kafka 的消息。
您需要配置另一个output
绑定并将其发送到那里,而不是直接发送到输入通道。
推荐阅读
- sql - TSQL 填充数据
- javascript - Javascript 函数不会在 IE11 上构建 SVG
- javascript - 在 JavaScript 中将多维数组转换为 CSV 文件的列并使用浏览器书签保存
- ssh - 使用 gcloud 上的 vm 作为中介从任何地方访问本地服务器
- ansible - 使用动态库存文件(JSON)运行剧本
- angular - 未捕获(承诺中):TypeError:无法读取 null 的属性“eso”
- php - Google Search Console API 域权限
- c++ - 我想在 C++ 代码中比较这三个术语
- python-3.x - 子列表的频率总和
- javascript - Javascript如何点击元素有preventDefault