首页 > 解决方案 > MapReduce 中的排序 - Hadoop

问题描述

使用 Hadoop MAPREDUCE 实现排序算法?

当我尝试在 Hadoop 中排序时遇到问题,我无法获得排序的输出?MAP & REDUCE 工作 100% 但作业失败 ERROR...

以下是我的代码示例,mapper.py 和 reducer.py

为了更好地理解,我包含了整个代码。我希望这有助于理解这个问题。如果有什么遗漏,请告诉我。

**mapper.py** 

```
#!/usr/bin/env python
#coding: utf-8



import sys
import re

words=[]
doc_id=0;
line_number=1
wordcount = 1
wordcount_per_doc = 0
df_t=1

for line in sys.stdin:
    '''
    0. ***INPUT DATA*** | récupère le contenu de chaque fichier texte sur la sortie stdin
    1. Associe un ID au document lu
    2. Tokénise en mots
    3. Formatte chaque mot extrait
    4. Ajoute les mots de la ligne à la liste complète de mots
    5. Calcule le nombre de mot dans le document analysé
    6. ***OUTPUT DATA*** | Chaque mot est retourné sur stdout sur la forme suivante :
        - <key>\t<value>
        - <key> : 'word',docid
        - <value> : wordcount,wordcount_per_doc, df_t
    '''
    # Supprimer les espaces
    line = line.strip()

    # 1. Associe un ID au document lu
    if line_number==1:
        if line=='1903':
            doc_id=2
            line_number=0
        else:
            doc_id=1
            line_number=0

    # 2. Tokénise en mots
    words_in_line = line.split()

    # 3. Formatte chaque mot extrait
    # lower case
    words_in_line = [word.lower() for word in words_in_line]
    # Suppression des ponctuations et des caractères numériques (with regex)
    words_in_line = [re.sub(r'[^\w]', '', word) for word in words_in_line]
    # filtering stop words
    stopwords=[]
    for line in open('stopwords_en.txt'):
        stopwords.append(line.strip())
    words_in_line = [word for word in words_in_line if word not in stopwords]
    # Suppression des mots de moins de 3 caractères
    words_in_line = [word for word in words_in_line if len(word)>2]

    # 4. Ajoute les mots de la ligne à la liste complète de mots
    words += words_in_line

    #4. Calcule le nombre de mot par ligne et ajoute "wordcount_per_doc" à la liste de mots
    wordcount_per_doc += len(words_in_line)

# 5. ***OUTPUT DATA*** | Chaque mot est retourné sur stdout
for word in words:
    print("%s,%i\t%i\t%i\t%i" % (word,doc_id,wordcount,wordcount_per_doc, df_t))
```


**reducer.py**

```
#!/usr/bin/env python
#coding: utf-8



import sys
import os
import csv
from math import log10
from collections import defaultdict

words = []                                      # liste de mots
last_word_docid_pair = None                                 # pour le calcul du wordcount
df_t_dict = defaultdict(lambda: set())              # pour le calcul du df_t
docid_list = set()                                      # nombre de doc dans la collection


'''
***INPUT DATA*** | sortie de la fonction MAP triée sur stdin : word,docid\twordcount\tword_per_doc\tdf_t
    KEY: paire (word,docid)
    VALUE: wordcount\tword_per_doc\tdf_t

>>>> FOR #1 - Traitement de chaque ligne
    # 1. Calcule le "WordCount" en additionnant le nombre d'occurrences de chaque clés (mot,docid) et l'ajoute à la liste "words"
    # 2. Calcule df_t en construisant un dictionaire df_t_dict (key=word:value=set(docid))
    # 3. Calcule le nombre de document N dans la collection en construisant un set de chaque docid

>>>> FOR #2 - Traitement de la liste finale de mot "words"
    # 5. Donne les valeurs finale de df_t pour chaque mot
    # 6. Calcule TF-IDF
    # 7. Affichage sur chaque ligne de stdout : <word,docid______TFIDF>

***OUTPUT DATA*** | sur chaque ligne de stdout : <word,docid______TFIDF>
    KEY: paire (word,docid)
    VALUE: TFIDF

***OUTPUT DATA*** | fichier words_top20_tfidf_docid<docid>.csv
    top 20 des mots ayant la plus forte pondération pour chaque document
'''

for line in sys.stdin:
    # get key/values
    line = line.strip()
    key,wordcount,wordcount_per_doc,df_t = line.split("\t")
    wordcount_per_doc=int(wordcount_per_doc)
    wordcount = int(wordcount)
    df_t = int(df_t)
    word,docid = key.split(",")
    docid = int(docid)
    word_docid_pair = (word,docid)
    # 1. Calcule le "WordCount"
    if last_word_docid_pair is None:                        # Traitement du 1er mot
        last_word_docid_pair = word_docid_pair
        last_wordcount = 0
        last_wordcount_per_doc = wordcount_per_doc
        last_df_t = df_t
    if word_docid_pair == last_word_docid_pair:
        last_wordcount += wordcount
    else:
        words.append([last_word_docid_pair,last_wordcount,last_wordcount_per_doc,last_df_t])
        # set new values
        last_word_docid_pair = word_docid_pair
        last_wordcount = wordcount
        last_wordcount_per_doc = wordcount_per_doc
        last_df_t = df_t
    # 2. Calcule df_t
    dic_value = df_t_dict[word]
    dic_value.add(docid)
    df_t_dict[word] = dic_value
    # 3. Calcule le nombre de document N dans la collection
    docid_list.add(docid)

# ajout du dernier mot non traité par l'étape 1
words.append([last_word_docid_pair,last_wordcount,last_wordcount_per_doc,last_df_t])
# 3. Calcule le nombre de document N dans la collection
N = len(docid_list)

for word_block in words:
    word,docid,wordcount,wordcount_per_doc,df_t = word_block[0][0],int(word_block[0][1]),int(word_block[1]),int(word_block[2]),int(word_block[3])
    # 5. Donne les valeurs finale de df_t & wordcount_per_doc à chaque mot
    df_t = len(df_t_dict[word])
    # 6. Calcule TF-IDF = wordcount x wordcount_per_doc x log10(N/df_t)
    word_block.append(wordcount * wordcount_per_doc * log10(N/df_t))
    TFIDF = word_block[4]
    # 7. ***OUTPUT DATA*** | ensemble de paires ((mot, doc_ID), TF-IDF) sur chaque ligne de stdout
    key_formated = '{:_<30}'.format("%s,%i" % (word,docid))
    print("%s\t%i\t%i\t%i\t%.*f" % (key_formated,wordcount,wordcount_per_doc,df_t,5,TFIDF))


# RESULTATS DE TEST - TOP 20 tf-idf for each document sent to words_top20_tfidf_<docid>.csv
for docid in docid_list:
    words_top20_tfidf = sorted([word_block for word_block in words if word_block[0][1] == docid], key=lambda x: x[4], reverse=True)[:20]
    document_name = 'words_top20_tfidf_docid'
    document_name +="%s" %(docid)
    with open('%s.csv' % document_name, 'w') as f:
        csv.writer(f).writerow(words_top20_tfidf)
```




COMMAND & OUTPUT:

hadoop jar /home/hduser/Desktop/sample/hadoop-streaming-3.1.2.jar -D mapred.reduce.task=2 -input /user/defoe-robinson-103.txt -output /home/hduser/data -mapper "python3 mapper.py" -reducer "python3 reducer.py" -file /home/hduser/Desktop/mr_tfidf/tf-idf_mapper.py -file /home/hduser/Desktop/mr_tfidf/tf-idf_reducer.py
2019-10-08 10:38:25,816 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
2019-10-08 10:38:26,387 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [/home/hduser/Desktop/mr_tfidf/tf-idf_mapper.py, /home/hduser/Desktop/mr_tfidf/tf-idf_reducer.py, /tmp/hadoop-unjar2406562609442989570/] [] /tmp/streamjob5471183357876092640.jar tmpDir=null
2019-10-08 10:38:29,760 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
2019-10-08 10:38:30,814 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
2019-10-08 10:38:31,741 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/hduser/.staging/job_1570272874011_0010
2019-10-08 10:38:32,382 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2019-10-08 10:38:32,958 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2019-10-08 10:38:32,993 WARN hdfs.DataStreamer: Caught exception
java.lang.InterruptedException
    at java.lang.Object.wait(Native Method)
    at java.lang.Thread.join(Thread.java:1252)
    at java.lang.Thread.join(Thread.java:1326)
    at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:986)
    at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:640)
    at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:810)
2019-10-08 10:38:33,115 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2019-10-08 10:38:33,646 INFO mapred.FileInputFormat: Total input files to process : 1
2019-10-08 10:38:33,917 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2019-10-08 10:38:34,064 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2019-10-08 10:38:34,099 WARN hdfs.DataStreamer: Caught exception
java.lang.InterruptedException
    at java.lang.Object.wait(Native Method)
    at java.lang.Thread.join(Thread.java:1252)
    at java.lang.Thread.join(Thread.java:1326)
    at org.apache.hadoop.hdfs.DataStreamer.closeResponder(DataStreamer.java:986)
    at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:640)
    at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:810)
2019-10-08 10:38:34,105 INFO mapreduce.JobSubmitter: number of splits:2
2019-10-08 10:38:34,905 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2019-10-08 10:38:35,016 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1570272874011_0010
2019-10-08 10:38:35,018 INFO mapreduce.JobSubmitter: Executing with tokens: []
2019-10-08 10:38:36,228 INFO conf.Configuration: resource-types.xml not found
2019-10-08 10:38:36,232 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2019-10-08 10:38:36,585 INFO impl.YarnClientImpl: Submitted application application_1570272874011_0010
2019-10-08 10:38:36,938 INFO mapreduce.Job: The url to track the job: http://hadoop:8088/proxy/application_1570272874011_0010/
2019-10-08 10:38:36,999 INFO mapreduce.Job: Running job: job_1570272874011_0010
2019-10-08 10:39:00,083 INFO mapreduce.Job: Job job_1570272874011_0010 running in uber mode : false
2019-10-08 10:39:00,089 INFO mapreduce.Job:  map 0% reduce 0%
2019-10-08 10:39:20,930 INFO mapreduce.Job: Task Id : attempt_1570272874011_0010_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)

2019-10-08 10:39:21,021 INFO mapreduce.Job: Task Id : attempt_1570272874011_0010_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)

2019-10-08 10:39:43,799 INFO mapreduce.Job: Task Id : attempt_1570272874011_0010_m_000001_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)

2019-10-08 10:39:43,807 INFO mapreduce.Job: Task Id : attempt_1570272874011_0010_m_000000_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)

2019-10-08 10:40:11,764 INFO mapreduce.Job: Task Id : attempt_1570272874011_0010_m_000001_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)

2019-10-08 10:40:11,779 INFO mapreduce.Job: Task Id : attempt_1570272874011_0010_m_000000_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)

2019-10-08 10:40:32,304 INFO mapreduce.Job:  map 50% reduce 100%
2019-10-08 10:40:33,346 INFO mapreduce.Job:  map 100% reduce 100%
2019-10-08 10:40:33,408 INFO mapreduce.Job: Job job_1570272874011_0010 failed with state FAILED due to: Task failed task_1570272874011_0010_m_000001
Job failed as tasks failed. failedMaps:1 failedReduces:0 killedMaps:0 killedReduces: 0

2019-10-08 10:40:33,877 INFO mapreduce.Job: Counters: 14
    Job Counters 
        Failed map tasks=7
        Killed map tasks=1
        Killed reduce tasks=1
        Launched map tasks=8
        Other local map tasks=6
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=164125
        Total time spent by all reduces in occupied slots (ms)=0
        Total time spent by all map tasks (ms)=164125
        Total vcore-milliseconds taken by all map tasks=164125
        Total megabyte-milliseconds taken by all map tasks=168064000
    Map-Reduce Framework
        CPU time spent (ms)=0
        Physical memory (bytes) snapshot=0
        Virtual memory (bytes) snapshot=0
2019-10-08 10:40:33,880 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!

标签: python-3.xhadoop

解决方案


推荐阅读