apache-spark - 为什么使用for循环时python spark慢
问题描述
我正在通过编写页面排名程序来学习 pyspark。
但是当我使用 for 循环进行计算时,每次迭代都会变慢。
我尝试使用缓存,但它似乎不起作用。
我不知道如何解决这个问题。
这是我的循环代码
from time import time
for idx, i in tqdm(enumerate(range(10))):
start_time = time() # <-- start timing
new_values = stochastic_matrix.flatMap(lambda x: get_new_value(x, beta, N))
new_values = new_values.reduceByKey(add).map(lambda x: [x[0], x[1] + ((1-beta)/N)] )
S = new_values.values().reduce(add)
new_stochastic_matrix = stochastic_matrix.fullOuterJoin(new_values)
stochastic_matrix = new_stochastic_matrix.map(lambda x: sum_new_value(x, S, N))
new_stochastic_matrix.cache()
stochastic_matrix.cache() # <--- cache here
end_time = time()
print(idx, end_time - start_time)
sorted(stochastic_matrix.collect())[:10]
更新
在我评论这一行之后
stochastic_matrix = new_stochastic_matrix.map(lambda x: sum_new_value(x, S, N))
它工作正常!但我仍然不知道为什么以及如何解决它。
更新 2
我设置S为常数,速度正常。
但我仍然不知道为什么以及如何解决它。
所有流量
输入数据后
变量:stochastic_matrix - 数据结构看起来像这样。
[
(key,[value, this_node_connect_to_which_node]),
(1, [0.2, [2, 3]]),
(2, [0.2, [4]]),
(3, [0.2, [1, 4, 5]]),
(4, [0.2, []]),
(5, [0.2, [1, 4]])
]
地图
def get_new_value(item, beta, N):
key, tmp = item
value, dest = tmp
N_dest = len(dest)
new_values = []
for i in dest:
new_values.append([i, beta * (value/ N_dest)] )
return new_values
new_values = stochastic_matrix.flatMap(lambda x: get_new_value(x, beta, N))
new_values.collect()
########### output
[node, each_node_new_value]
[[2, 0.08000000000000002],
[3, 0.08000000000000002],
[4, 0.16000000000000003],
[1, 0.05333333333333334],
[4, 0.05333333333333334],
[5, 0.05333333333333334],
[1, 0.08000000000000002],
[4, 0.08000000000000002]]
按键减少
beta 和 N 只是一个浮点数
new_values = new_values.reduceByKey(add).map(lambda x: [x[0], x[1] + ((1-beta)/N)] )
new_values.collect()
###### Output
[[2, 0.12000000000000001],
[3, 0.12000000000000001],
[4, 0.33333333333333337],
[1, 0.17333333333333334],
[5, 0.09333333333333332]]
结合 new_values 和 stochastic_matrix
new_stochastic_matrix = stochastic_matrix.fullOuterJoin(new_values)
new_stochastic_matrix.collect()
#### Output
# (key, ([value, this_node_connect_to_which_node], new_value))
[(2, ([0.2, [4]], 0.12000000000000001)),
(4, ([0.2, []], 0.33333333333333337)),
(1, ([0.2, [2, 3]], 0.17333333333333334)),
(3, ([0.2, [1, 4, 5]], 0.12000000000000001)),
(5, ([0.2, [1, 4]], 0.09333333333333332))]
将 new_value 更新为 value
S 和 N 只是一个数字
def sum_new_value(item, S, N):
key, value = item
if value[1] == None:
new_value = 0 + (1-S)/N
else:
new_value = value[1] + (1-S)/N
# new_value = value[1]
return [key, [new_value, value[0][1]]]
stochastic_matrix = new_stochastic_matrix.map(lambda x: sum_new_value(x, S, N))
sorted(stochastic_matrix.collect())[:10]
######## Output
[[1, [0.2053333333333333, [2, 3]]],
[2, [0.152, [4]]],
[3, [0.152, [1, 4, 5]]],
[4, [0.36533333333333334, []]],
[5, [0.1253333333333333, [1, 4]]]]
解决方案
推荐阅读
- python - 如何导出烧瓶restplus swagger json?
- python - 如何在另一个类中使用模块的方法
- active-directory - 获取特定生日的所有用户
- unit-testing - 在 Vue Test Utils 中模拟退格
- ruby - 有没有办法同时(并行)运行三个循环 ruby 方法?
- exception-handling - 处理层之间的异常
- r - 在 R 中复制 SAS 阵列
- continuous-integration - 持续交付本地可安装产品,支持跨不同客户安装的多个版本
- r - 将值格式更改为 R 中的标准 30 秒格式
- apache-spark - Spark 广播失败