首页 > 解决方案 > 如何使用 Reactor 执行无限 Kafka 轮询?

问题描述

使用 Reactor 实现无限轮询循环的反应方式是什么?理想情况下,我想从 Producer 应用程序发送消息,而 Consumer 应用程序应该无限监听并在每次收到消息时以相同的方式处理消息的通量,并将结果的通量发回。我是否需要一个阻塞循环,或者有没有办法让发布者保持活动状态,可以接收通量处理并将其发回?

标签: apache-kafkaproject-reactor

解决方案


从kafka接收数据时尝试使用处理器将其发送到处理器

      DirectProcessor <Strings> directProcessor = DirectProcessor.create();
    directProcessor.subscribe();

    public void itemEmitt(String string){
        directProcessor.onNext(string);
    }

现在 directprocessor 是无限监听事件并发出事件的通量


推荐阅读