apache-kafka - 如何自动调用 kafka-stream-processor 中的 process() 方法?
问题描述
我正在学习 kafka 流并编写了一个简单的应用程序,代码如下:
主应用程序:
Topology topology = new Topology();
topology.addSource("SOURCE", "source-topic");
topology.addProcessor("Processor1", () -> new Processor1(), "SOURCE");
topology.addProcessor("Processor2", () -> new Processor2(), "Processor1");
topology.addProcessor("Processor3", () -> new Processor3(), "Processor2");
topology.addSink("SINK", "sink-topic", "Processor3");
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();
单个流处理器的片段:
public class Processor1 implements Processor<String, String> {
// Rest of code
@Override
public void process(String key, String value) {
System.out.println("Inside Processor1#process() method");
context.forward(key, value);
}
我知道我们需要创建Topology
然后启动它,我们调用streams.start();
我无法理解如何process()
自动调用方法以及谁调用它?
解决方案
Processor process()
ProcessorContextImpl
类对特定拓扑节点的每个传入消息自动调用的方法。对于您构建的拓扑,当消息到达传入主题时,SOURCE
节点会使用它并通过内部调用forward
方法将消息转发(传播)到子节点(您可以调试/查看 class 中的代码ProcessorContextImpl
)。在您的情况下,SOURCE
node 将 key 和 value 转发给 child node Processor1
。之后,触发process()
了类中的方法。Processor1
当代码到达context.forward()
时,消息转发到下一个子节点,Processor2
。在该消息传播到Processor3
和之后SINK
节点以类似的方式,最后,消息产生到出站主题。特定消息的此类管道在单个线程上执行(如果您有 config 的默认值num.stream.threads = 1
,则所有消息将在每个应用程序实例的单个线程上处理)。
推荐阅读
- php - WordPress 电子邮件共享按钮未在 Outlook 中打开
- android - 当应用程序在前台运行时,我没有收到推送通知
- redirect - 在不丢失反向链接的情况下进行 301 NGINX 重定向的正确方法
- tomcat - 在 Tomcat Websocket / Serverendpoint 中使用 CDI/注入
- python - Matplotlib 无法绘制只有 NaN 值的 DateTime 系列
- symfony - 选择线下支付后如何将订单状态更改为已支付?
- c# - C#循环布尔值
- python - 从mysql到reportlab的多个值
- c - 我可以使用 C 中的什么函数将日期与时间分开?
- winforms - mingw g++ Windows子系统WinMain没有得到hInstance值