首页 > 解决方案 > 为什么使用for循环时python spark慢

问题描述

我正在通过编写页面排名程序来学习 pyspark。
但是当我使用 for 循环进行计算时,每次迭代都会变慢。
我尝试使用缓存,但它似乎不起作用。
我不知道如何解决这个问题。
表现

这是我的循环代码

from time import time
for idx, i in tqdm(enumerate(range(10))):

    start_time = time() # <-- start timing

    new_values = stochastic_matrix.flatMap(lambda x: get_new_value(x, beta, N))
    new_values = new_values.reduceByKey(add).map(lambda x: [x[0], x[1] + ((1-beta)/N)] )
    S = new_values.values().reduce(add)
    new_stochastic_matrix = stochastic_matrix.fullOuterJoin(new_values)
    stochastic_matrix = new_stochastic_matrix.map(lambda x: sum_new_value(x, S, N))

    new_stochastic_matrix.cache()
    stochastic_matrix.cache() # <--- cache here


    end_time = time()
    print(idx, end_time - start_time)

sorted(stochastic_matrix.collect())[:10]

更新

在我评论这一行之后

stochastic_matrix = new_stochastic_matrix.map(lambda x: sum_new_value(x, S, N))

它工作正常!但我仍然不知道为什么以及如何解决它。

更新 2

我设置S为常数,速度正常。
但我仍然不知道为什么以及如何解决它。

所有流量

输入数据后

变量:stochastic_matrix - 数据结构看起来像这样。

[
(key,[value, this_node_connect_to_which_node]),
(1, [0.2, [2, 3]]),
(2, [0.2, [4]]),
(3, [0.2, [1, 4, 5]]),
(4, [0.2, []]),
(5, [0.2, [1, 4]])
]

地图

def get_new_value(item, beta, N):
    key, tmp = item
    value, dest = tmp
    N_dest = len(dest)

    new_values = []
    for i in dest:
        new_values.append([i, beta * (value/ N_dest)] )

    return new_values

new_values = stochastic_matrix.flatMap(lambda x: get_new_value(x, beta, N))
new_values.collect()

########### output
[node, each_node_new_value]
[[2, 0.08000000000000002],
 [3, 0.08000000000000002],
 [4, 0.16000000000000003],
 [1, 0.05333333333333334],
 [4, 0.05333333333333334],
 [5, 0.05333333333333334],
 [1, 0.08000000000000002],
 [4, 0.08000000000000002]]

按键减少

beta 和 N 只是一个浮点数

new_values = new_values.reduceByKey(add).map(lambda x: [x[0], x[1] + ((1-beta)/N)] )
new_values.collect()

###### Output
[[2, 0.12000000000000001],
 [3, 0.12000000000000001],
 [4, 0.33333333333333337],
 [1, 0.17333333333333334],
 [5, 0.09333333333333332]]

结合 new_values 和 stochastic_matrix

new_stochastic_matrix = stochastic_matrix.fullOuterJoin(new_values)
new_stochastic_matrix.collect()

#### Output
# (key, ([value, this_node_connect_to_which_node], new_value))

[(2, ([0.2, [4]], 0.12000000000000001)),
 (4, ([0.2, []], 0.33333333333333337)),
 (1, ([0.2, [2, 3]], 0.17333333333333334)),
 (3, ([0.2, [1, 4, 5]], 0.12000000000000001)),
 (5, ([0.2, [1, 4]], 0.09333333333333332))]

将 new_value 更新为 value

S 和 N 只是一个数字

def sum_new_value(item, S, N):
    key, value = item

    if value[1] == None:
        new_value = 0 + (1-S)/N
    else:
        new_value = value[1] + (1-S)/N


#     new_value = value[1]

    return [key, [new_value, value[0][1]]]

stochastic_matrix = new_stochastic_matrix.map(lambda x: sum_new_value(x, S, N))

sorted(stochastic_matrix.collect())[:10]

######## Output
[[1, [0.2053333333333333, [2, 3]]],
 [2, [0.152, [4]]],
 [3, [0.152, [1, 4, 5]]],
 [4, [0.36533333333333334, []]],
 [5, [0.1253333333333333, [1, 4]]]]

标签: apache-sparkpyspark

解决方案


推荐阅读