首页 > 解决方案 > Spark RDD:从其他 RDD 中查找

问题描述

作为一些练习滚动我自己的关联规则模块的一部分,我正在尝试在 Spark 中执行最快的查找。请注意,我知道 PySpark 支持以下指标,即置信度。这只是一个示例 - 不支持另一个指标提升,但我打算使用此讨论的结果来开发它。

作为计算规则置信度的一部分,我需要查看前件和后件一起出现的频率,以及前件在整个事务集(在本例中为rdd)中出现的频率。

from itertools import combinations, chain

def powerset(iterable, no_empty=True):
    ''' Produce the powerset for a given iterable '''
    s = list(iterable)
    combos = (combinations(s, r) for r in range(len(s)+1))
    powerset = chain.from_iterable(combos)
    return (el for el in powerset if el) if no_empty else powerset

# Set-up transaction set
rdd = sc.parallelize(
    [
        ('a',),
        ('a', 'b'),
        ('a', 'b'),
        ('b', 'c'),
        ('a', 'c'),
        ('a', 'b'),
        ('b', 'c'),
        ('c',),
        ('b'),
    ]
)

# Create an RDD with the counts of each
# possible itemset
counts = (
    rdd
    .flatMap(lambda x: powerset(x))
    .map(lambda x: (x, 1))
    .reduceByKey(lambda x, y: x + y)
    .map(lambda x: (frozenset(x[0]), x[1]))
)

# Function to calculate confidence of a rule
confidence = lambda x: counts.lookup(frozenset(x)) / counts.lookup((frozenset(x[1]),))

confidence_result = (
    rdd
    # Must be applied to length-two and greater itemsets
    .filter(lambda x: len(x) > 1)
    .map(confidence)
)

对于熟悉此类查找问题的人,您会知道引发了此类异常:

Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

解决此异常的一种方法是转换counts为字典:

counts = dict(counts.collect())

confidence = lambda x: (x, counts[frozenset(x)] / counts[frozenset(x[1])])

confidence_result = (
    rdd
    # Must be applied to length-two and greater itemsets
    .filter(lambda x: len(x) > 1)
    .map(confidence)
)

这给了我我的结果。但是运行counts.collect过程非常昂贵,因为实际上我有一个包含 50m+ 记录的数据集。执行此类查找是否有更好的选择?

标签: apache-sparkpysparkrdd

解决方案


如果你的目标指标可以在每个 RDD 分区上独立计算,然后组合起来达到目标​​结果,你可以在计算你的指标时使用mapPartitions代替。map

通用流程应该是这样的:

metric_result = (
    rdd
    # apply your metric calculation independently on each partition       
    .mapPartitions(confidence_partial) 
    # collect results from the partitions into a single list of results
    .collect()
    # reduce the list to combine the metrics calculated on each partition
    .reduce(confidence_combine)
)

两者confidence_partialconfidence_combine都是接受迭代器/列表输入的常规python函数。

顺便说一句,通过使用数据框 API 和原生表达式函数来计算指标,您可能会获得巨大的性能提升。


推荐阅读