protocol-buffers - Flink消耗s3 parquet文件kyro序列化错误
问题描述
我们想从 s3 使用 parquet 文件
我的代码片段是这样的。我的输入文件是 protobuf 编码的镶木地板文件。protobuf 类是 Pageview.class。
import com.twitter.chill.protobuf.ProtobufSerializer;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.parquet.proto.ProtoParquetInputFormat;
import org.apache.hadoop.fs.Path;
import scala.Tuple2;
public class ParquetReadJob {
public static void main(String... args) throws Exception {
ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment();
ee.getConfig().registerTypeWithKryoSerializer(StandardLog.Pageview.class, ProtobufSerializer.class);
String path = args[0];
Job job = Job.getInstance();
job.setInputFormatClass(ProtoParquetInputFormat.class);
HadoopInputFormat<Void, StandardLog.Pageview> hadoopIF =
new HadoopInputFormat<> (new ProtoParquetInputFormat<>(), Void.class, StandardLog.Pageview.class, job);
ProtoParquetInputFormat.addInputPath(job, new Path(path));
DataSource<Tuple2<Void, StandardLog.Pageview>> dataSet = ee.createInput(hadoopIF).setParallelism(10);
dataSet.print();
}
}
总是有错误:
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
supportCrtSize_ (access.Access$AdPositionInfo)
adPositionInfo_ (access.Access$AccessRequest)
accessRequest_ (com.adshonor.proto.StandardLog$Pageview$Builder)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 23 more
谁能告诉我如何编写可以使用这种文件的批处理程序?
解决方案
我也遇到了这个问题。我在 flink-protobuf 的待处理 PR 中找到了这个和这个,它解决了这个问题。
您需要将NonLazyProtobufSerializer
和ProtobufKryoSerializer
类添加到您的项目中,并注册NonLazyProtobufSerializer
为 Message 类型的默认 Kryo 序列化程序:
env.getConfig().addDefaultKryoSerializer(Message.class, NonLazyProtobufSerializer.class);
来自作者 JavaDocs:
这是在 Flink TableEnvironment 中使用来自 Kafka 的 DataSource 时出现的问题的解决方法。对于在 .proto 中声明为“string”类型的字段,Java 类上的相应字段已声明为“Object”类型。Message.parseFrom(byte[]) 返回的对象上这些字段的实际类型是“ByteArray”。但是这些字段的 getter 方法返回“String”,必要时用字符串懒惰地替换底层 ByteArray 字段。
希望这可以帮助。
推荐阅读
- r - 在R中将3个数据帧中具有相同行名的行组合起来
- json - 如何在 bash 中获取 JSON 对象的元素?
- c# - 如何使用 EF Core C# 将 List<> 数据添加到 List<> 变量
- python - 如何在文件上一遍又一遍地写入相同的数据?
- graphql - Graphql - 查询返回 null
- ios - 有没有办法从我的应用程序中拨打“*#06#”来让用户在我的应用程序中使用 IMEI 屏幕?
- javascript - 在数据库中创建新位置后,直接重定向到新创建的视图
- php - 比较 PHP 中的两个 md5 加密密钥
- python - 创建一个python算法来训练一个keras模型来预测一个大的整数序列
- r - 如何计算一年中365天的一天的指数?