首页 > 解决方案 > Avro 编码器 NullPointerException

问题描述

我正在尝试将 PubSub 中的消息转换为 JSON 对象,稍后我可以使用该对象过滤并插入 BigQuery。目前,将消息转换为 JSON 对象时出现错误。

来自 PubSubMessage 的输入:

[
  {
    "name": "test-name",
    "details": {
      "email": "test-email",
      "location": "test-location"
    }
  },
  {
    "name": "test-name2"
  }
]

在上述消息中,详细信息字段可以为空。

管道代码:

import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;

import javax.annotation.Nullable;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;

import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;


@DefaultCoder(AvroCoder.class)
class Raw {
  public String name;
  @Nullable
  Details details;
}

class Details {
  public String email;
  public String location;
}




public class DataFlowPipeline {

  public static void main(String[] args) {
    PubsubOptions options = PipelineOptionsFactory.fromArgs(args)
      .withValidation()
      .as(PubsubOptions.class);
    options.setPubsubRootUrl("http://localhost:8085");
    Pipeline pipeline = Pipeline.create(options);

    PCollection<List<Raw>> message = pipeline
      .apply("ReadPubSubTopic", PubsubIO.readMessagesWithAttributes().fromTopic("projects/test-dataflow/topics/demo-dataflow"))
      .apply("DeserializePubSubMessage", ParDo.of(new DoFn<PubsubMessage, List<Raw>>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
          System.out.println("Inside processor..");
          PubsubMessage message = c.element();
          String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
          ObjectMapper mapper = new ObjectMapper();
          mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
          TypeFactory typeFactory = mapper.getTypeFactory();
          try {
            List<Raw> data = mapper.readValue(payload, typeFactory.constructCollectionType(List.class, Raw.class));
            System.out.println("Message = " + data);
            c.output(data);
          } catch (IOException e) {
              e.printStackTrace();
          }
        }}));
    ;
    
    pipeline.run().waitUntilFinish();;
  }
}

执行管道在数据反射中返回空指针异常。基于AvroCoder文档 Nullable annotations 允许 null 字段。

错误:

Caused by: java.lang.NullPointerException: in com.company.dataflow.Raw in com.company.dataflow.Raw in string null of string in field name of com.company.dataflow.Details in field details of com.company.dataflow.Message
        at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
        at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:312)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:114)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
        at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
        at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
        at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
        at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
        at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
        at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
        at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
        at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
        at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
        at com.company.Dataflow.DataFlowPipeline$1.processElement(Collector.java:100)
Caused by: java.lang.NullPointerException
        at org.apache.avro.specific.SpecificDatumWriter.writeString(SpecificDatumWriter.java:67)
        at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:128)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
        at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
        at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
        at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
        at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
        at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
        at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
        at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
        at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
        at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
        at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
        at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
        at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
        at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
        at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:312)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:114)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
        at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
        at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
        at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
        at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
        at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
        at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
        at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
        at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
        at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
        at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
        at com.company.dataflow.DataFlowPipeline$1.processElement(Collector.java:100)
        at com.company.dataflow.DataFlowPipeline$1$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
        at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
        at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:78)
        at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:240)
        at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
        at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
        at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

标签: javagoogle-cloud-dataflowapache-beamgoogle-cloud-pubsub

解决方案


推荐阅读