首页 > 解决方案 > 从 Scala 中的 Pubsub 读取 AVRO 消息 (readAvroGenericRecords)

问题描述

我想阅读来自 PubSub 主题的 AVRO 消息。我之前实现了 Scala Dataflow 管道,它从 PubSub 读取 JSON 消息并写入 BigQuery。但是我在处理 AVRO 消息时遇到了很大的问题。

val p = Pipeline.create(options)
val file = Source.fromFile("avro_schemas/" + "xxx.avsc")
val avroSchema = new Schema.Parser().parse(file)

p.apply("read-avro", PubsubIO
 .readAvroGenericRecords(avroSchema)
 .fromSubscription(subscriptionName))
 .apply("process", ParDo.of(new MyDoFn())

p.run()
class MyDoFn() extends DoFn[GenericRecord, String] with LazyLogging {

    @ProcessElement
        def processElement(c: ProcessContext): Unit = {
            val input = c.element()
            logger.info(input.toString)
    }
  }

消息是从主题中消耗的(我可以通过在 Dataflow 中显示一个作业来看到这一点)但是问题出现在它进入的位置.apply ("process", ParDo.of (new MyDoFn ()) 以前,在其他以 json 格式下载数据的项目中,我创建了我的MyDoFn函数来进一步处理接收到的数据以便可以将其添加到 BigQuery。而现在我也想尽可能多地使用它,当然要根据需要调整它。到目前为止,我的 MyDoFn函数几乎是空的,但这个片段无论如何都不起作用。我在数据流中收到这样的错误:

Error message from worker: java.lang.IndexOutOfBoundsException
java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:180)
java.io.FilterInputStream.read(FilterInputStream.java:133)
org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184)
org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
org.apache.beam.sdk.coders.AvroCoder.decode(AvroCoder.java:327)
org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) 
...
org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1137)
org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112)
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140)

我的问题是: * 是否可以从订阅中获取 AVRO 格式的数据,然后以某种方式将其转换为 json(字符串)格式,以便我可以使用我的 (MyDoFn) 函数处理这些数据或至少显示它?或者我可以用其他方式吗?如何将 GenericRecord 转换为字符串?

请帮助我,我是新手,我不知道我还能做什么:(

标签: scalaavropublish-subscribedataflow

解决方案


推荐阅读