python - 如何为 mrJobs 中的多行输入编写自定义协议
问题描述
我正在尝试将 mrJobs 与 csv 文件一起使用。问题是 csv 文件的输入跨越了多行。
搜索 mrJob 文档,我想我需要编写一个自定义协议来处理输入。
我试图在下面编写自己的协议multiLineCsvInputProtocol
,但我已经收到错误: TypeError: a bytes-like object is required, not 'str'
不会说谎,我认为我在这里过头了。
基本上,多行 csv 文件中的每一行新数据都以日期字符串开头。我想逐行读取输入,在逗号上吐出每一行,将值存储在列表中,每当新行以日期字符串开头时,我想yield
将整个列表放到第一个映射器中。
(或者找到其他更好的方法来读取多行 csv 输入)
谁能帮我解决这个错误?
import csv
import mapreduce as mr
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob import protocol
class multiLineCsvInputProtocol(object):
def read(self, line):
key, val = enumerate(line.split(',', 1))
return key, val
class someTask(MRJob):
INPUT_PROTOCOL = multiLineCsvInputProtocol
def mapper1(self,_, row):
yield (row, 1 )
if __name__ == '__main__':
MRFindReciprocal.run()
解决方案
根据文档,readmrjob
函数的line参数的类型为bytestring,您很可能会收到该错误,因为您将其拆分为str:','
编写自定义协议
协议是具有 read(self, line) 和 write(self, key, value) 方法的对象。read() 方法接受一个字节串并返回一个解码对象的 2 元组,而 write() 接受键和值并返回要传递回 Hadoop Streaming 或作为输出的字节。
可能的解决方案:
- 您可以尝试按 拆分
b','
,这是一个字节串 - 您可以在拆分之前对行进行解码,如下所示:(
line.decode().split(',', 1)
指定编码可能是个好主意)
推荐阅读
- linux - Eclipse 工作区作为项目的系统根引用
- ios - swift iOS:在应用程序文档文件夹中创建 UIDocument
- python - 在 Python 中的 SQLAlchemy 中执行真正的原始 SQL
- html - 有没有办法逃避剪辑路径:从子元素?相对于剪裁背景定位的 IE 图像也会被剪裁
- asp.net-mvc - BundleConfig 返回 302 / 404
- android - DateFormat.getTimeInstance 没有格式化正确的语言环境
- sql - 处理 SAS 中的舍入误差
- java - 算法将无法正确加密/解密
- r - 如何通过引用替换 R 列中的值?
- php - 如何使用 Jade/Pug 在样式表的链接内编码 base_url?