python - Hadoop 坚持减少 67%(仅适用于大数据)
问题描述
我是 Hadoop 和 Linux 的初学者。
问题
- 当输入数据很大(例如 600k 行或 6M 行)时,即使 Map 和 Reduce 函数非常简单,Hadoop reduce 也会卡住(或移动得非常慢)
2021-08-08 22:53:12,350 INFO mapreduce.Job: map 100% reduce 67%
。 - 在 Linux系统监视器中,我可以看到当减少达到 67% 时,只有一个 CPU 在 100% 时保持运行,其余的都在休眠 :) 看这张图片
什么运行成功
- 我使用少量输入数据(600 行)快速成功地运行了 MapReduce 作业,没有任何问题 map 100% reduce 100%,
2021-08-08 19:44:13,350 INFO mapreduce.Job: map 100% reduce 100%
.
映射器(Python)
#!/usr/bin/env python3
import sys
from itertools import islice
from operator import itemgetter
def read_input(file):
# read file except first line
for line in islice(file, 1, None):
# split the line into words
yield line.split(',')
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_input(sys.stdin)
for words in data:
# for each row we take only the needed columns
data_row = list(itemgetter(*[1, 2, 4, 5, 6, 9, 10, 18])(words))
data_row[7] = data_row[7].replace('\n', '')
# taking year and month No.from first column to create the
# key that will send to reducer
date = data_row[0].split(' ')[0].split('-')
key = str(date[0]) + '_' + str(date[1])
# value that will send to reducer
value = ','.join(data_row)
# print here will send the output pair (key, value)
print('%s%s%s' % (key, separator, value))
if __name__ == "__main__":
main()
减速器(Python)
#!/usr/bin/env python3
from itertools import groupby
from operator import itemgetter
import sys
import pandas as pd
import numpy as np
import time
def read_mapper_output(file):
for line in file:
yield line
def main(separator='\t'):
all_rows_2015 = []
all_rows_2016 = []
start_time = time.time()
names = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance',
'pickup_longitude', 'pickup_latitude', 'dropoff_longitude',
'dropoff_latitude', 'total_amount']
df = pd.DataFrame(columns=names)
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin)
for words in data:
# get key & value from Mapper
key, value = words.split(separator)
row = value.split(',')
# split data with belong to 2015 from data belong to 2016
if key in '2015_01 2015_02 2015_03':
all_rows_2015.append(row)
if len(all_rows_2015) >= 10:
df=df.append(pd.DataFrame(all_rows_2015, columns=names))
all_rows_2015 = []
elif key in '2016_01 2016_02 2016_03':
all_rows_2016.append(row)
if len(all_rows_2016) >= 10:
df=df.append(pd.DataFrame(all_rows_2016, columns=names))
all_rows_2016 = []
print(df.to_string())
print("--- %s seconds ---" % (time.time() - start_time))
if __name__ == "__main__":
main()
更多信息
我在VMware上安装的Linux上使用Hadoop v3.2.1在Python中运行 MapReduce 作业。
减少工作数量:
输入数据大小 | 行数 | 减少工作时间 | |
---|---|---|---|
~98 KB | 600 行 | ~0.1 秒 | 好的 |
~953 KB | 6,000 行 | ~1 秒 | 好的 |
~9.5 Mb | 60,000 行 | ~52 秒 | 好的 |
~94 MB | 600,000 行 | 约 5647 秒(约 94 分钟) | 非常慢 |
~11 Gb | 76,000,000 行 | ?? | 不可能的 |
目标是在约 7600 万行输入数据上运行,剩下的这个问题是不可能的。
解决方案
“当减少达到 67% 时,只有一个 CPU 以 100% 的速度继续运行,而其余的 CPU 正在休眠” - 你有偏差。一个键比任何其他键具有更多的值。