java - 使用低级 Kafka Stream API 执行 context.forward() 时的 NPE
问题描述
我使用 Low-level Kafka API 构建了一个普通的 Kafka 流 API。拓扑是线性的。
p1 -> p2 -> p3
在执行 context.forward() 时,我得到了 NPE,代码片段如下:
NAjava.lang.NullPointerException: null
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:178)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
...
我正在使用 Kafka Stream 2.3.0。
我看到一个类似的 SO 问题 [here][1],该问题基于非常旧的版本。所以,不确定这是否是同一个错误?
编辑
我正在提供更多信息,保持我正在做的事情的要点
public class SP1Processor implements StreamProcessor {
private StreamProcessingContext ctxt;
// In init(), create a single thread pool
// which does some processing and sends the
// data to next processor
@Override
void init(StreamProcessingContext ctxt) {
this.ctxt = ctxt;
// Create a thread pool, do some work
// and then do this.ctxt.forward(K,V)
// Not showing code of Thread pool
// Strangely, inside this thread pool,
// this.ctxt isn't same what I see in process()
// shouldn't it be same? ctxt is member variable
// and shouldn't it be same
// this.ctxt.forward(K,V) here in this thread pool is causing NPE
// why does it happen?
this.ctxt.forward(K,V);
}
@Override
void process(K,V) {
// Here do some processing and go to the next processor chain
// This works fine
this.ctxt.forward(K,V);
}
}
[1]: https://stackoverflow.com/questions/39067846/periodic-npe-in-kafka-streams-processor-context
解决方案
看起来它可能与链接的问题是同一个问题,尽管在您的情况下我们正在谈论一个更现代的版本。确保ProcessorSupplier.get()
每次调用它都会返回一个新实例。
推荐阅读
- java - 从 Java 8 迁移到 Java 9
- python - Caffe CNN 训练过程陷入循环
- python - l_dumpid = str('{:08x}'.format(l_dumpid)) ValueError: 格式中的零长度字段名称
- javascript - 如何在javascript中按月计算两个日期之间的天数
- java - OSGi 中的 Apache POI 3.17
- postgresql - Postgresql 查询的基本性能
- python - 在散景图上悬停时显示的工具提示也应在鼠标悬停在工具提示本身上时显示
- django - 缓存失效在使用或应用程序 Rest API 的浏览器中不起作用?
- java - 没有找到类“com.sun.xml.ws.spi.ProviderImpl”
- python - 在 python 中通过 matplotlib 绘制图形