首页 > 解决方案 > 如何自动调用 kafka-stream-processor 中的 process() 方法?

问题描述

我正在学习 kafka 流并编写了一个简单的应用程序,代码如下:

主应用程序:

        Topology topology = new Topology();

        topology.addSource("SOURCE", "source-topic");
        topology.addProcessor("Processor1", () -> new Processor1(), "SOURCE");
        topology.addProcessor("Processor2", () -> new Processor2(), "Processor1");
        topology.addProcessor("Processor3", () -> new Processor3(), "Processor2");
        topology.addSink("SINK", "sink-topic", "Processor3");

        KafkaStreams streams = new KafkaStreams(topology, config);
        streams.start();

单个流处理器的片段:

public class Processor1 implements Processor<String, String> {

   // Rest of code

    @Override
    public void process(String key, String value) {
        System.out.println("Inside Processor1#process() method");
        context.forward(key, value);
    }

我知道我们需要创建Topology然后启动它,我们调用streams.start();

我无法理解如何process()自动调用方法以及谁调用它?

标签: apache-kafkaapache-kafka-streams

解决方案


Processor process()ProcessorContextImpl类对特定拓扑节点的每个传入消息自动调用的方法。对于您构建的拓扑,当消息到达传入主题时,SOURCE节点会使用它并通过内部调用forward方法将消息转发(传播)到子节点(您可以调试/查看 class 中的代码ProcessorContextImpl)。在您的情况下,SOURCEnode 将 key 和 value 转发给 child node Processor1。之后,触发process()了类中的方法。Processor1当代码到达context.forward()时,消息转发到下一个子节点,Processor2。在该消息传播到Processor3和之后SINK节点以类似的方式,最后,消息产生到出站主题。特定消息的此类管道在单个线程上执行(如果您有 config 的默认值num.stream.threads = 1,则所有消息将在每个应用程序实例的单个线程上处理)。


推荐阅读