首页 > 解决方案 > 是否可以将参数传递给工作先生

问题描述

给定来自 mrJob 网站的字数统计程序的基本示例:

from mrjob.job import MRJob


class MRWordFrequencyCount(MRJob):

    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    MRWordFrequencyCount.run()

从命令行,此示例可以作为python mrJobFilename.py mrJobFilename.py. 这应该自行运行程序并计算文件中的单词。

所以给出这个例子,如果我想传入一个参数,比如说minCount = 3. 使用这个参数,reducer 只会返回计数超过 的单词minCount

from mrjob.job import MRJob


class MRWordFrequencyCount(MRJob):

    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        X = sum(values)
        if X > minCount:
            yield key, sum(values)


if __name__ == '__main__':
    MRWordFrequencyCount.run()

我尝试将 minWord 作为参数传递 : python mrJobFilename.py mrJobFilename.py 3,但出现错误OSError: Input path 3 does not exist!

我还尝试使用 sysArg 设置变量:

if __name__ == '__main__':
    minWord = sys.argv[1]
    MRWordFrequencyCount.run()

运行时python mrJobFilename.py mrJobFilename.py < 3出现错误bash: 3: No such file or directory。如果我不使用,<我会收到未找到先前输入文件的错误。

最后,我尝试输入第二个 csv 文件。csv 文件有 2 行,如下所示:

minWord
3

它旨在将参数传递给 mrJobs,因为它不断给我错误,即第二个参数不是输入文件。我使用 mapper_raw 尝试加载它,但我得到一个奇怪的错误:UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8f in position 22: invalid start byte

from mrjob.job import MRJob


class MRWordFrequencyCount(MRJob):

    def mapper_raw(self, input_arg1, input_arg2):
        import csv
        f = open(input_path2)
        reader = csv.reader(f)
        next(reader) # skip header
        yield(next(reader))

    def steps(self):
          return [
              MRStep(mapper_raw=self.mapper_raw)
          ]


if __name__ == '__main__':
    MRWordFrequencyCount.run()

如何将参数传递给 mrJob?最终我需要这样做来传递我想要并行求解的微分方程系统的参数。

标签: pythonparallel-processingmrjob

解决方案


您可以按照mrjob 文档添加命令行参数,例如argparse.

所以你的代码应该是这样的:

from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):

    def configure_args(self):
        super(MRWordFrequencyCount, self).configure_args()
        self.add_passthru_arg("-m", "--minCount", help="your argument description")

    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        X = sum(values)
        if X > self.options.minCount:
            yield key, sum(values)


if __name__ == '__main__':
    MRWordFrequencyCount.run()

将您的论点与self.options.minCount.

运行命令:

python code.py input.txt --minCount 4

推荐阅读