java - 为什么avro反序列化只在spark执行中失败?
问题描述
我有一个 avro 架构,例如:
{
"type": "record",
"namespace": "quotes",
"name": "Quotes",
"fields": [
{
"name": "instrument",
"type": "string"
},
{
"name": "providerSentTime",
"type": "long"
},
{
"name": "bids",
"type": {
"type": "array",
"items": {
"name": "BidQuote",
"type": "record",
"fields": [
{
"name": "rate",
"type": "double"
},
{
"name": "liquidity",
"type": "double"
},
{
"name": "time",
"type": "long"
},
{
"name": "status",
"type": "int"
}
]
}
}
},
{
"name": "asks",
"type": {
"type": "array",
"items": {
"name": "AskQuote",
"type": "record",
"fields": [
{
"name": "rate",
"type": "double"
},
{
"name": "liquidity",
"type": "double"
},
{
"name": "time",
"type": "long"
},
{
"name": "status",
"type": "int"
}
]
}
}
}
]
}
当我想反序列化它时,我使用自定义 Java 方法,例如:
public static List<Quotes> deserialize(byte[] avroBytes)
throws IOException, InvocationTargetException {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(avroBytes);
byteArrayInputStream.reset();
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(byteArrayInputStream, null);
SpecificDatumReader<Quotes> quoteDatumReader = new SpecificDatumReader<>(Quotes.class);
List<Quotes> sparkQuotes = new ArrayList<>();
while (!binaryDecoder.isEnd()) {
Quotes read = quoteDatumReader.read(null, binaryDecoder);
sparkQuotes.add(read);
}
return sparkQuotes;
}
这工作正常。
当我尝试在 spark 执行中反序列化文件时,我面临的问题发生在以下代码中:
// I need the hadoopFile method in my context that isn't expressed in this snippet.
JavaPairRDD<Text, Text> textTextJavaPairRDD =
javaSparkContext.hadoopFile(
"Quotes.avro",
KeyValueTextInputFormat.class, Text.class, Text.class);
JavaRDD<byte[]> avroByteRDD = textTextJavaPairRDD.map(new Function<Tuple2<Text, Text>, byte[]>() {
@Override
public byte[] call(Tuple2<Text, Text> hdfsTextTextRow) throws Exception {
return hdfsTextTextRow._1.copyBytes();
}
});
JavaRDD<Quotes> quotesRDD = avroQuoteRdd.flatMap(new FlatMapFunction<byte[], Quotes>() {
@Override
public Iterable<Quotes> call(byte[] keyAvroByte) throws Exception {
return deserialize(avroByteRDD);
}
});
quotesRDD.collect();
所以当我执行这个时,我有异常:
WARN TaskSetManager:在阶段 0.0(TID 0,本地主机)中丢失任务 0.0:java.io.EOFException
在我的反序列化方法中调用 avro 方法read
引发。
为什么它会在 spark 执行之外工作,但当我尝试在 spark 内部执行相同的操作时却失败了。
解决方案
推荐阅读
- dialogflow-es - Dialogflow 实现 - 使用来自用户的参数查询 firebase
- reactjs - 试图在 React Native 中注册两个同名 RTCVideoView 的 View
- android - ActivityRecognitionClient 详细是如何工作的?
- protocol-buffers - 您如何将文件响应定义为消息
- r - R通过分组多列将多行折叠成一行
- javascript - Rockstar 语言解析正确的变量
- python - Python使用标题或关键字从文本文件中打印段落
- python - 具有混合类型特征的 scikit learn 分类器返回 0% 的测试数据准确率
- c# - Blazor 服务器构建错误 - 新 Web 应用程序导致构建错误
- laravel - 发送没有主题的 Laravel Mailable