python - Word count sorting in MapReduce Python using yarn comparator
问题描述
I want to solve the word count problem and want to get the results in reverse sorted order according to the frequency of occurrence in the file.
Following are the four files (2 mappers and 2 reducers, as one Map Reduce job cannot solve this problem) I wrote for this purpose:
1) mapper1.py
import sys
import re
reload(sys)
sys.setdefaultencoding('utf-8') # required to convert to unicode
for line in sys.stdin:
try:
article_id, text = unicode(line.strip()).split('\t', 1)
except ValueError as e:
continue
words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
for word in words:
print "%s\t%d" % (word.lower(), 1)
2) reducer1.py
import sys
current_key = None
word_sum = 0
for line in sys.stdin:
try:
key, count = line.strip().split('\t', 1)
count = int(count)
except ValueError as e:
continue
if current_key != key:
if current_key:
print "%s\t%d" % (current_key, word_sum)
word_sum = 0
current_key = key
word_sum += count
if current_key:
print "%s\t%d" % (current_key, word_sum)
3) mapper2.py
import sys
import re
reload(sys)
sys.setdefaultencoding('utf-8') # required to convert to unicode
for line in sys.stdin:
try:
word, count = line.strip().split('\t', 1)
count = int(count)
except ValueError as e:
continue
print "%s\t%d" % (word, count)
4) reducer2.py
import sys
for line in sys.stdin:
try:
word, count = line.strip().split('\t', 1)
count = int(count)
except ValueError as e:
continue
print "%s\t%d" % (word, count)
The following are the two yarn commands run by me in a bash environment
OUT_DIR="wordcount_result_1"
NUM_REDUCERS=8
hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name="Streaming wordCount" \
-D mapreduce.job.reduces=${NUM_REDUCERS} \
-files mapper1.py,reducer1.py \
-mapper "python mapper1.py" \
-combiner "python reducer1.py" \
-reducer "python reducer1.py" \
-input /test/articles-part-short \
-output ${OUT_DIR} > /dev/null
OUT_DIR_2="wordcount_result_2"
NUM_REDUCERS=1
hdfs dfs -rm -r -skipTrash ${OUT_DIR_2} > /dev/null
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name="Streaming wordCount Rating" \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
-D map.output.key.field.separator=\t \
-D mapreduce.partition.keycomparator.options=-k2,2nr \
-D mapreduce.job.reduces=${NUM_REDUCERS} \
-files mapper2.py,reducer2.py \
-mapper "python mapper2.py" \
-reducer "python reducer2.py" \
-input ${OUT_DIR} \
-output ${OUT_DIR_2} > /dev/null
hdfs dfs -cat ${OUT_DIR_2}/part-00000 | head
This is not giving me the right answer. Can someone please explain where did it go wrong?
On the other hand,
in the mapper2.py
if I print in the following manner,
print "%d\t%s" % (count, word)
and in the reducer2.py
if I read in the following manner,
count, word = line.strip().split('\t', 1)
and edit the 2nd yarn command option to
-D mapreduce.partition.keycomparator.options=-k1,1nr
it gives me right answer.
Why is it behaving differently in both of the above cases?
Can someone please help me understand the Comparator options of Hadoop MapReduce?
解决方案
这将起作用
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name="Streaming wordCount rating" \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options='-k2nr' \
-D stream.num.map.output.key.fields=2 \
-D mapred.map.tasks=1 \
-D mapreduce.job.reduces=1 \
-files mapper2.py,reducer2.py \
-mapper "python mapper2.py" \
-reducer "python reducer2.py" \
-input /user/jovyan/assignment0_1563877099149160 \
-output ${OUT_DIR} > /dev/null
推荐阅读
- python-3.x - Selenium 无法在无头模式下工作
- python - 如何使用python从非动画gif文件中提取数据
- java - Firebase 数据库问题
- ios - 如何调整 UIView 的 Y 锚点?
- apache-flink - Apache Flink 是否会将所有添加功能的状态序列化到流中?即使在本地部署中?
- dictionary - Map Box sdk 方向
- jquery - 为 CatComplete 设置 jquery 数组
- android - 发布版本时 app:mergeReleaseResources 异常
- linux - 外部服务登录链接 asp core
- javascript - 向 ajax url 提供 csv 文件输入以显示在 html 表上