python - mrjob add_file_arg() csv 文件
问题描述
我无法理解如何将 add_file_arg() 用于 mrjob。我试图通过一个人的属性将 csv 传递给我的映射器,并在我的映射器中找到每个人的属性。到目前为止,这是我的代码:
class MRPeopleScores(MRJob):
def configure_args(self):
super(MRPeopleScores, self).configure_args()
self.add_file_arg('--database')
def mapper(self, _, line):
print(self.options.database)
当我跑
python3 calculate_people_scores.py --jobconf mapreduce.job.reduces=1 data/people_ids.csv database=data/people_attributes.csv
我收到以下错误消息:
Traceback (most recent call last):
File "calculate_people_scores.py", line 88, in <module>
MRPeopleScores.run()
File "/usr/local/lib/python3.6/site-packages/mrjob/job.py", line 439, in run
mr_job.execute()
File "/usr/local/lib/python3.6/site-packages/mrjob/job.py", line 460, in execute
super(MRJob, self).execute()
File "/usr/local/lib/python3.6/site-packages/mrjob/launch.py", line 161, in execute
self.run_job()
File "/usr/local/lib/python3.6/site-packages/mrjob/launch.py", line 231, in run_job
runner.run()
File "/usr/local/lib/python3.6/site-packages/mrjob/runner.py", line 476, in run
self._run()
File "/usr/local/lib/python3.6/site-packages/mrjob/sim.py", line 185, in _run
self._invoke_step(step_num, 'mapper')
File "/usr/local/lib/python3.6/site-packages/mrjob/sim.py", line 272, in _invoke_step
working_dir, env)
File "/usr/local/lib/python3.6/site-packages/mrjob/inline.py", line 154, in _run_step
child_instance.execute()
File "/usr/local/lib/python3.6/site-packages/mrjob/job.py", line 448, in execute
self.run_mapper(self.options.step_num)
File "/usr/local/lib/python3.6/site-packages/mrjob/job.py", line 526, in run_mapper
for out_key, out_value in mapper(key, value) or ():
File "calculate_people_scores.py", line 47, in mapper
print(self.options.database)
AttributeError: 'Values' object has no attribute 'database'
我敢肯定我严重误解了如何使用这个论点,任何帮助将不胜感激。
解决方案
我认为你应该运行 --database data/people_attributes.csv
而不是database=data/people_attributes.csv
先运行。
python3 calculate_people_scores.py --jobconf mapreduce.job.reduces=1 data/people_ids.csv --database data/people_attributes.csv
通过传递,--database
您只需传递文件的路径,因此您需要先打开它,然后再对其进行任何操作。您可以通过以下方式覆盖reducer_init()
或mapper_init()
函数来做到这一点:
def mapper_init(self):
self.db=open(self.options.database)
或者
def reducer_init(self):
self.db=open(self.options.database)
现在你可以self.db
在你的 mapper 或 reducer 中使用了。不建议使用print
mapper 或 reducer 输出,您应该使用它yield
。另一方面,不建议在 mapper 或 reducer 中打印(或产生)您的第二个文件(self.db
),因为它会运行与 mapper 或 reducer 一样多的数量。但你可以这样做:
def mapper(self, _, line):
print(self.db.readlines())
# OR
yield(None,self.db.readlines())
最后,您可以像这样访问您的 csv 内容:
def mapper(self,_,line):
for li in self.db:
fields=li.split(',')
yield(fields[0],fields[1])
推荐阅读
- javascript - 在反应 useEffect 生命周期中,我得到了无限的结果
- python - 无法正确启动 Scrapy shell
- spring - 我如何解决错误邮递员401未经授权
- r - Dplyr 中的 Vlookup() 等效项
- python - 运行 django-admin 后出现“bash: django-admin: command not found”
- ruby-on-rails - 如何在 Rails 中创建 has_many 关系记录?
- google-developers-console - 我可以从谷歌开发者控制台获取我的项目源代码吗
- php - php用大写单词替换描述中的特定单词并循环遍历它
- php - 即使浏览器在codeigniter中关闭,如何保持用户登录
- apache-nifi - NiFi 中是否有任何处理器可以帮助我为每条记录添加唯一标识符?