java - Avro 编码器 NullPointerException
问题描述
我正在尝试将 PubSub 中的消息转换为 JSON 对象,稍后我可以使用该对象过滤并插入 BigQuery。目前,将消息转换为 JSON 对象时出现错误。
来自 PubSubMessage 的输入:
[
{
"name": "test-name",
"details": {
"email": "test-email",
"location": "test-location"
}
},
{
"name": "test-name2"
}
]
在上述消息中,详细信息字段可以为空。
管道代码:
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import javax.annotation.Nullable;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
@DefaultCoder(AvroCoder.class)
class Raw {
public String name;
@Nullable
Details details;
}
class Details {
public String email;
public String location;
}
public class DataFlowPipeline {
public static void main(String[] args) {
PubsubOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(PubsubOptions.class);
options.setPubsubRootUrl("http://localhost:8085");
Pipeline pipeline = Pipeline.create(options);
PCollection<List<Raw>> message = pipeline
.apply("ReadPubSubTopic", PubsubIO.readMessagesWithAttributes().fromTopic("projects/test-dataflow/topics/demo-dataflow"))
.apply("DeserializePubSubMessage", ParDo.of(new DoFn<PubsubMessage, List<Raw>>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Inside processor..");
PubsubMessage message = c.element();
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
TypeFactory typeFactory = mapper.getTypeFactory();
try {
List<Raw> data = mapper.readValue(payload, typeFactory.constructCollectionType(List.class, Raw.class));
System.out.println("Message = " + data);
c.output(data);
} catch (IOException e) {
e.printStackTrace();
}
}}));
;
pipeline.run().waitUntilFinish();;
}
}
执行管道在数据反射中返回空指针异常。基于AvroCoder文档 Nullable annotations 允许 null 字段。
错误:
Caused by: java.lang.NullPointerException: in com.company.dataflow.Raw in com.company.dataflow.Raw in string null of string in field name of com.company.dataflow.Details in field details of com.company.dataflow.Message
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:312)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:114)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
at com.company.Dataflow.DataFlowPipeline$1.processElement(Collector.java:100)
Caused by: java.lang.NullPointerException
at org.apache.avro.specific.SpecificDatumWriter.writeString(SpecificDatumWriter.java:67)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:128)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:312)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:114)
at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
at com.company.dataflow.DataFlowPipeline$1.processElement(Collector.java:100)
at com.company.dataflow.DataFlowPipeline$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:78)
at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:240)
at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)