apache-spark - 如何在pyspark中读取二进制数据
问题描述
我正在 使用 pyspark读取二进制文件http://snap.stanford.edu/data/amazon/productGraph/image_features/image_features.b 。
import array
from io import StringIO
img_embedding_file = sc.binaryRecords("s3://bucket/image_features.b", 4106)
def mapper(features):
a = array.array('f')
a.frombytes(features)
return a.tolist()
def byte_mapper(bytes):
return str(bytes)
decoded_embeddings = img_embedding_file.map(lambda x: [byte_mapper(x[:10]), mapper(x[10:])])
当 justproduct_id
从 rdd 中选择时使用
decoded_embeddings = img_embedding_file.map(lambda x: [byte_mapper(x[:10]), mapper(x[10:])])
的输出product_id
是
["b'1582480311'", "b'\\x00\\x00\\x00\\x00\\x88c-?\\xeb\\xe2'", "b'7@\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", "b'\\xec/\\x0b?\\x00\\x00\\x00\\x00K\\xea'", "b'\\x00\\x00c\\x7f\\xd9?\\x00\\x00\\x00\\x00'", "b'L\\xa6\\n>\\x00\\x00\\x00\\x00\\xfe\\xd4'", "b'\\x00\\x00\\x00\\x00\\x00\\x00\\xe5\\xd0\\xa2='", "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'"]
该文件托管在 s3 上。每行中的文件在product_id
接下来的 4096 字节中具有前 10 个字节,因为image_features
我能够提取所有 4096 图像特征,但在读取前 10 个字节并将其转换为正确的可读格式时遇到问题。
解决方案
编辑:
最后,问题来自于recordLength
. 不是4096 + 10
但是4096*4 + 10
。更改为:
img_embedding_file = sc.binaryRecords("s3://bucket/image_features.b", 16394)
应该管用。实际上,您可以在下载二进制文件的网站提供的代码中找到它:
for i in range(4096):
feature.append(struct.unpack('f', f.read(4))) # <-- so 4096 * 4
旧答案:
我认为问题出在你的byte_mapper
功能上。这不是将字节转换为字符串的正确方法。你应该使用decode
:
bytes = b'1582480311'
print(str(bytes))
# output: "b'1582480311'"
print(bytes.decode("utf-8"))
# output: '1582480311'
如果您收到错误:
UnicodeDecodeError:“utf-8”编解码器无法解码位置 4 中的字节 0x88:无效的起始字节
这意味着product_id
字符串包含非 utf8 字符。如果您不知道输入编码,则很难转换为字符串。
但是,您可能希望通过向函数添加选项ignore
来忽略这些字符:decode
bytes.decode("utf-8", "ignore")
推荐阅读
- xml - 使用 xsl:number 递减变量计数器
- python - FastAPI 忽略 CORS 中间件
- gdb - gdbserver 和远程 gdb 的区别
- android - 取消 Android 中所有以前的 Api 调用 - 改造 - 使用模块
- amazon-web-services - DynamoDB 一对一
- optimization - 如何在最小化目标参数的同时重新评估 Gekko 目标
- flutter - 等到firestore中的数据成功更新
- vue.js - vue中如何根据获取的数据增加图标的数量
- oracle - 有没有办法在oracle sql developer中对列的每个字段执行md5散列,并将结果散列存储在相应的列中(md5)
- flutter - 同时在单个小部件上使用多个模拟