java - Kafka Stream forward方法抛出NullPointerException,因为ProcessorNode currentNode对象为null
问题描述
我们已经编写了从源主题读取的 Kafka 流应用程序,它在 2 个处理器中执行一些业务逻辑,然后将输出写入接收主题。
下面是创建拓扑、添加源、处理器(请注意我们添加 2 个处理器)和接收器的代码
Topology topology = new Topology();
topology.addSource("sourceProcessor", "source-topic")
.addProcessor("Process", ()->fileExtractProcessorObject , "sourceProcessor")
.addProcessor("XMLJSON", () -> xmlJson , "Process")
.addSink("sinkProcessor", "sink-topic", "XMLJSON");
KafkaStreams streams = new KafkaStreams(topology, getProperties(appConfig,sourceConfig));
streams.start()
下面是第一个处理器代码
public class FileExtractProcessor implements Processor<String, byte[]> {
private ProcessorContext context;
public FileExtractProcessor() {
}
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, byte[] bytearray) {
/* business logic */
//This is pojo class which will be forwarded to next processor as value
ProcessData pData = new ProcessData();
// Setting pojo object and forwarding to next processor
pData.setValue(value);
pData.setXmlData(files);
pData.setAppconfig(ac);
context.forward(value.getRunKeyId(), pData, To.child("XMLJSON"));
context.commit();
}
}
在上面的代码中,当我们调用 forward 方法时,我们在下面的代码中得到空指针异常。
org.apache.kafka.streams.processor.internals.ProcessorContextImpl#forward(K, V, org.apache.kafka.streams.processor.To)
ProcessorNode child = this.currentNode().getChild(sendTo);
java.lang.NullPointerException
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
at com.myapp.FileExtractProcessor.process(FileExtractProcessor.java:78)
at com.myapp.FileExtractProcessor.process(FileExtractProcessor.java:22)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
我们正在使用 kafka-stream-2.4.0.jar 能否请您帮助我们在这里缺少的地方。无法弄清楚为什么 currentNode 对象为 Null。非常感谢早期帮助。
解决方案
基于代码:https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L183错误意味着currentNode()
返回null
。
这表明您违反了在;上ProcessorSupplier
返回新Processor
实例的规则。当您传递到每次返回相同的对象引用时get()
,这似乎成立——相反,您每次都需要通过.() -> fileExtractProcessorObject
addProcessor
() -> new FileExtractProcessor()
我创建了一张票来改进错误消息:https ://issues.apache.org/jira/browse/KAFKA-10036
推荐阅读
- sql - 如何在 SQL Server 中选择满足某些条件的特定上一行
- css - 将html转换为pdf时如何将背景和边框应用于生成的pdf文件的每一页
- laravel - Laravel 从工匠到 XAMPP (apache)
- regex - 如果数据包含弯括号,则拒绝 elem
- python - pyODBC 模块可以在 teradata 中删除表吗
- android - Xamarin 应用程序不以等距方式接收传感器测量值
- arrays - 如何从 jsonb 对象数组中提取值?
- regex - 正则表达式匹配特殊字符的一个实例
- c - 如何使客户端服务器套接字程序异步?
- c++ - 无法使用 mysql_real_connect() 连接到 mysql 服务器