首页 > 解决方案 > mrjob add_file_arg() csv 文件

问题描述

我无法理解如何将 add_file_arg() 用于 mrjob。我试图通过一个人的属性将 csv 传递给我的映射器,并在我的映射器中找到每个人的属性。到目前为止,这是我的代码:

class MRPeopleScores(MRJob):
    def configure_args(self):
        super(MRPeopleScores, self).configure_args()
        self.add_file_arg('--database')

    def mapper(self, _, line):
        print(self.options.database)

当我跑

python3 calculate_people_scores.py --jobconf mapreduce.job.reduces=1 data/people_ids.csv database=data/people_attributes.csv

我收到以下错误消息:

Traceback (most recent call last):
File "calculate_people_scores.py", line 88, in <module>
MRPeopleScores.run()
File "/usr/local/lib/python3.6/site-packages/mrjob/job.py", line 439, in run
mr_job.execute()
File "/usr/local/lib/python3.6/site-packages/mrjob/job.py", line 460, in execute
super(MRJob, self).execute()
File "/usr/local/lib/python3.6/site-packages/mrjob/launch.py", line 161, in execute
self.run_job()
File "/usr/local/lib/python3.6/site-packages/mrjob/launch.py", line 231, in run_job
runner.run()
File "/usr/local/lib/python3.6/site-packages/mrjob/runner.py", line 476, in run
self._run()
File "/usr/local/lib/python3.6/site-packages/mrjob/sim.py", line 185, in _run
self._invoke_step(step_num, 'mapper')
File "/usr/local/lib/python3.6/site-packages/mrjob/sim.py", line 272, in _invoke_step
working_dir, env)
File "/usr/local/lib/python3.6/site-packages/mrjob/inline.py", line 154, in _run_step
child_instance.execute()
File "/usr/local/lib/python3.6/site-packages/mrjob/job.py", line 448, in execute
self.run_mapper(self.options.step_num)
File "/usr/local/lib/python3.6/site-packages/mrjob/job.py", line 526, in run_mapper
for out_key, out_value in mapper(key, value) or ():
File "calculate_people_scores.py", line 47, in mapper
print(self.options.database)
AttributeError: 'Values' object has no attribute 'database'

我敢肯定我严重误解了如何使用这个论点,任何帮助将不胜感激。

标签: pythonpython-3.xmrjob

解决方案


我认为你应该运行 --database data/people_attributes.csv而不是database=data/people_attributes.csv先运行。

python3 calculate_people_scores.py --jobconf mapreduce.job.reduces=1 data/people_ids.csv --database data/people_attributes.csv

通过传递,--database您只需传递文件的路径,因此您需要先打开它,然后再对其进行任何操作。您可以通过以下方式覆盖reducer_init()mapper_init()函数来做到这一点:

def mapper_init(self):
    self.db=open(self.options.database)

或者

def reducer_init(self):
    self.db=open(self.options.database)

现在你可以self.db在你的 mapper 或 reducer 中使用了。不建议使用printmapper 或 reducer 输出,您应该使用它yield。另一方面,不建议在 mapper 或 reducer 中打印(或产生)您的第二个文件(self.db),因为它会运行与 mapper 或 reducer 一样多的数量。但你可以这样做:

def mapper(self, _, line):
     print(self.db.readlines())
     # OR
     yield(None,self.db.readlines())

最后,您可以像这样访问您的 csv 内容:

def mapper(self,_,line):
        for li in self.db:
            fields=li.split(',')
            yield(fields[0],fields[1])

推荐阅读