scala - 从 Scala 中的 Pubsub 读取 AVRO 消息 (readAvroGenericRecords)
问题描述
我想阅读来自 PubSub 主题的 AVRO 消息。我之前实现了 Scala Dataflow 管道,它从 PubSub 读取 JSON 消息并写入 BigQuery。但是我在处理 AVRO 消息时遇到了很大的问题。
val p = Pipeline.create(options)
val file = Source.fromFile("avro_schemas/" + "xxx.avsc")
val avroSchema = new Schema.Parser().parse(file)
p.apply("read-avro", PubsubIO
.readAvroGenericRecords(avroSchema)
.fromSubscription(subscriptionName))
.apply("process", ParDo.of(new MyDoFn())
p.run()
class MyDoFn() extends DoFn[GenericRecord, String] with LazyLogging {
@ProcessElement
def processElement(c: ProcessContext): Unit = {
val input = c.element()
logger.info(input.toString)
}
}
消息是从主题中消耗的(我可以通过在 Dataflow 中显示一个作业来看到这一点)但是问题出现在它进入的位置.apply ("process", ParDo.of (new MyDoFn ())
以前,在其他以 json 格式下载数据的项目中,我创建了我的MyDoFn
函数来进一步处理接收到的数据以便可以将其添加到 BigQuery。而现在我也想尽可能多地使用它,当然要根据需要调整它。到目前为止,我的 MyDoFn
函数几乎是空的,但这个片段无论如何都不起作用。我在数据流中收到这样的错误:
Error message from worker: java.lang.IndexOutOfBoundsException
java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:180)
java.io.FilterInputStream.read(FilterInputStream.java:133)
org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184)
org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
org.apache.beam.sdk.coders.AvroCoder.decode(AvroCoder.java:327)
org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
...
org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.lambda$parsePayloadUsingCoder$839baa85$1(PubsubIO.java:1137)
org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1(Contextful.java:112)
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:140)
我的问题是: * 是否可以从订阅中获取 AVRO 格式的数据,然后以某种方式将其转换为 json(字符串)格式,以便我可以使用我的 (MyDoFn) 函数处理这些数据或至少显示它?或者我可以用其他方式吗?如何将 GenericRecord 转换为字符串?
请帮助我,我是新手,我不知道我还能做什么:(
解决方案
推荐阅读
- r - nls() 错误 - 收敛失败:奇异收敛 (7)
- r - 如何在 R 数据框中生成一个新列,其中包含来自多列的有序项
- tensorflow - 试图将 lasagne.layers.SliceLayer(input_from_previous_layer, -1, 1) fom theano 转换为 keras
- authentication - OpenShift 中的身份验证日志在哪里?
- selenium - 即使页面加载完毕,Selenium 也无法点击 Element
- google-oauth - 如果我在 iframe 外部单击,一键登录隐藏
- vba - 在不同的 IDE 中处理 Word 宏
- python -
如何使用 Python获取标签前后的字符串 - java - 尝试在我的 Java Spring API 中实现 Swagger 时出现错误消息
- css - 引导导航居中链接