首页 > 解决方案 > 在使用 mrjob 的 MapReduce 作业中使用 SORT_VALUES 时出现问题(键值在减速器输入中未排序)

问题描述

我想创建一个 MapReduce 程序,其 reduce 接收按值排序的 kv 对。我正在使用 mrjob,其 SORT_VALUES 参数似乎非常适合该任务。将此参数设置为 True 后,reducer 输入未排序,例如,我得到以下内容(考虑 A 应该在 X 之前):

"ES"    ["X", 3]
"ES"    ["A", "Spain"]

我正在使用 Python 2.7.5、mrjob==0.6.1 和 Hadoop。该程序的本地执行给了我:

"ES"    ["A", "Spain"]
"ES"    ["X", 1]
"ES"    ["X", 2]

哪个是对的。但是hadoop执行给出了:

"ES"    ["X", 3]
"ES"    ["A", "Spain"]

我的代码是:


import sys, os, re
from mrjob.job import MRJob
from mrjob.step import MRStep 

class MRJoin(MRJob):

    SORT_VALUES = True

    def mapper(self, _, line):
        splits = line.rstrip("\n").split(",")
        if len(splits) == 2: # countries
            symbol = 'A' # countries before clients
            country2digit = splits[1]
            yield country2digit, (symbol, splits[0])
        else: #  clients
            symbol = 'X'
            country2digit = splits[2]
            if splits[1]=='bueno':
                yield country2digit,(symbol, 1)

    def combiner(self,key, values):
        bueno=0
        for value in values:
            if value[0] == 'A':
                yield key, ('A', value[1])
            else:
                bueno=bueno + 1

        if bueno > 0:
            yield key, ('X', bueno)

    def reducerSimple(self, key, values):
        for value in values:
            yield key,value


    def steps(self): 
        return [ 
            MRStep(mapper=self.mapper 
                   ,combiner=self.combiner
                   ,reducer=self.reducerSimple) 
        ] 


if __name__ == '__main__':
    MRJoin.run()

我像这样运行上面的代码:

python mrjob-p2.py /media/notebooks/clients.csv /media/notebooks/countries.csv -r hadoop

这使:

"ES"    ["X", 3]
"ES"    ["A", "Spain"]
...
"GN"    ["A", "Guinea"]
"GN"    ["X", 1]
...

ES 键(以及少数其他键)的值未排序,但对于其他键,它们已排序。

我预计(如果对值进行了排序,A 应该在 X 之前):

"ES"    ["A", "Spain"]
"ES"    ["X", 3]

如果我在本地运行:

python mrjob-p2.py /media/notebooks/clients.csv /media/notebooks/countries.csv -r 本地

然后我得到:

"ES"    ["A", "Spain"]
"ES"    ["X", 1]
"ES"    ["X", 2]
...
"GN"    ["A", "Guinea"]
"GN"    ["X", 1]
...

哪个是对的。

有人知道如何对值进行排序吗?

谢谢 :)

标签: pythonhadoopmapreducemrjob

解决方案


推荐阅读