首页 > 解决方案 > Hadoop 坚持减少 67%(仅适用于大数据)

问题描述

我是 Hadoop 和 Linux 的初学者。

问题

什么运行成功

映射器(Python)

#!/usr/bin/env python3

import sys
from itertools import islice
from operator import itemgetter

def read_input(file):
    # read file except first line
    for line in islice(file, 1, None):
        # split the line into words
        yield line.split(',')

def main(separator='\t'):
    
    # input comes from STDIN (standard input)
    data = read_input(sys.stdin)
    
    for words in data:
        
        # for each row we take only the needed columns
        data_row = list(itemgetter(*[1, 2, 4, 5, 6, 9, 10, 18])(words))
        data_row[7] = data_row[7].replace('\n', '')

        # taking year and month No.from first column to create the
        # key that will send to reducer
        date = data_row[0].split(' ')[0].split('-')
        key = str(date[0]) + '_' + str(date[1])
        
        # value that will send to reducer
        value = ','.join(data_row)
        
        # print here will send the output pair (key, value)
        print('%s%s%s' % (key, separator, value))
        
if __name__ == "__main__":
    main()

减速器(Python)

#!/usr/bin/env python3

from itertools import groupby
from operator import itemgetter
import sys
import pandas as pd
import numpy as np
import time

def read_mapper_output(file):
    for line in file:
        yield line

def main(separator='\t'):
    
    all_rows_2015 = []
    all_rows_2016 = []
    
    start_time = time.time()
    
    names = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance', 
             'pickup_longitude',     'pickup_latitude',       'dropoff_longitude', 
             'dropoff_latitude',     'total_amount']
    df = pd.DataFrame(columns=names)
    
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin)
    for words in data:

        # get key & value from Mapper
        key, value = words.split(separator)
        row = value.split(',')

        # split data with belong to 2015 from data belong to 2016
        if key in '2015_01 2015_02 2015_03': 
            all_rows_2015.append(row)
            if len(all_rows_2015) >= 10:
                df=df.append(pd.DataFrame(all_rows_2015, columns=names))
                all_rows_2015 = []
        elif key in '2016_01 2016_02 2016_03':
            all_rows_2016.append(row)
            if len(all_rows_2016) >= 10:
                df=df.append(pd.DataFrame(all_rows_2016, columns=names))
                all_rows_2016 = []
    
    print(df.to_string())
    print("--- %s seconds ---" % (time.time() - start_time))

if __name__ == "__main__":
    main()

更多信息

我在VMware上安装的Linux上使用Hadoop v3.2.1Python中运行 MapReduce 作业。

减少工作数量:

输入数据大小 行数 减少工作时间
~98 KB 600 行 ~0.1 秒 好的
~953 KB 6,000 行 ~1 秒 好的
~9.5 Mb 60,000 行 ~52 秒 好的
~94 MB 600,000 行 约 5647 秒(约 94 分钟) 非常慢
~11 Gb 76,000,000 行 ?? 不可能的

目标是在约 7600 万行输入数据上运行,剩下的这个问题是不可能的。

标签: pythonlinuxhadoopmapreduce

解决方案


“当减少达到 67% 时,只有一个 CPU 以 100% 的速度继续运行,而其余的 CPU 正在休眠” - 你有偏差。一个键比任何其他键具有更多的值。


推荐阅读