首页 > 解决方案 > 如何在pyspark中读取二进制数据

问题描述

我正在 使用 pyspark读取二进制文件http://snap.stanford.edu/data/amazon/productGraph/image_features/image_features.b 。

import array
from io import StringIO

img_embedding_file = sc.binaryRecords("s3://bucket/image_features.b", 4106)

def mapper(features):
    a = array.array('f')
    a.frombytes(features)
    return a.tolist()

def byte_mapper(bytes):
    return str(bytes)

decoded_embeddings = img_embedding_file.map(lambda x: [byte_mapper(x[:10]), mapper(x[10:])])

当 justproduct_id从 rdd 中选择时使用

decoded_embeddings = img_embedding_file.map(lambda x: [byte_mapper(x[:10]), mapper(x[10:])])

的输出product_id

["b'1582480311'", "b'\\x00\\x00\\x00\\x00\\x88c-?\\xeb\\xe2'", "b'7@\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", "b'\\xec/\\x0b?\\x00\\x00\\x00\\x00K\\xea'", "b'\\x00\\x00c\\x7f\\xd9?\\x00\\x00\\x00\\x00'", "b'L\\xa6\\n>\\x00\\x00\\x00\\x00\\xfe\\xd4'", "b'\\x00\\x00\\x00\\x00\\x00\\x00\\xe5\\xd0\\xa2='", "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'"]

该文件托管在 s3 上。每行中的文件在product_id接下来的 4096 字节中具有前 10 个字节,因为image_features 我能够提取所有 4096 图像特征,但在读取前 10 个字节并将其转换为正确的可读格式时遇到问题。

标签: apache-sparkpysparkapache-spark-sqlpyspark-sqlamazon-emr

解决方案


编辑

最后,问题来自于recordLength. 不是4096 + 10但是4096*4 + 10。更改为:

img_embedding_file = sc.binaryRecords("s3://bucket/image_features.b", 16394)

应该管用。实际上,您可以在下载二进制文件的网站提供的代码中找到它:

for i in range(4096):
     feature.append(struct.unpack('f', f.read(4))) # <-- so 4096 * 4

旧答案

我认为问题出在你的byte_mapper功能上。这不是将字节转换为字符串的正确方法。你应该使用decode

bytes = b'1582480311'
print(str(bytes))
# output: "b'1582480311'"

print(bytes.decode("utf-8"))
# output: '1582480311'

如果您收到错误:

UnicodeDecodeError:“utf-8”编解码器无法解码位置 4 中的字节 0x88:无效的起始字节

这意味着product_id字符串包含非 utf8 字符。如果您不知道输入编码,则很难转换为字符串。

但是,您可能希望通过向函数添加选项ignore来忽略这些字符:decode

bytes.decode("utf-8", "ignore") 

推荐阅读