apache-spark - Spark RDD:从其他 RDD 中查找
问题描述
作为一些练习滚动我自己的关联规则模块的一部分,我正在尝试在 Spark 中执行最快的查找。请注意,我知道 PySpark 支持以下指标,即置信度。这只是一个示例 - 不支持另一个指标提升,但我打算使用此讨论的结果来开发它。
作为计算规则置信度的一部分,我需要查看前件和后件一起出现的频率,以及前件在整个事务集(在本例中为rdd
)中出现的频率。
from itertools import combinations, chain
def powerset(iterable, no_empty=True):
''' Produce the powerset for a given iterable '''
s = list(iterable)
combos = (combinations(s, r) for r in range(len(s)+1))
powerset = chain.from_iterable(combos)
return (el for el in powerset if el) if no_empty else powerset
# Set-up transaction set
rdd = sc.parallelize(
[
('a',),
('a', 'b'),
('a', 'b'),
('b', 'c'),
('a', 'c'),
('a', 'b'),
('b', 'c'),
('c',),
('b'),
]
)
# Create an RDD with the counts of each
# possible itemset
counts = (
rdd
.flatMap(lambda x: powerset(x))
.map(lambda x: (x, 1))
.reduceByKey(lambda x, y: x + y)
.map(lambda x: (frozenset(x[0]), x[1]))
)
# Function to calculate confidence of a rule
confidence = lambda x: counts.lookup(frozenset(x)) / counts.lookup((frozenset(x[1]),))
confidence_result = (
rdd
# Must be applied to length-two and greater itemsets
.filter(lambda x: len(x) > 1)
.map(confidence)
)
对于熟悉此类查找问题的人,您会知道引发了此类异常:
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
解决此异常的一种方法是转换counts
为字典:
counts = dict(counts.collect())
confidence = lambda x: (x, counts[frozenset(x)] / counts[frozenset(x[1])])
confidence_result = (
rdd
# Must be applied to length-two and greater itemsets
.filter(lambda x: len(x) > 1)
.map(confidence)
)
这给了我我的结果。但是运行counts.collect
过程非常昂贵,因为实际上我有一个包含 50m+ 记录的数据集。执行此类查找是否有更好的选择?
解决方案
如果你的目标指标可以在每个 RDD 分区上独立计算,然后组合起来达到目标结果,你可以在计算你的指标时使用mapPartitions
代替。map
通用流程应该是这样的:
metric_result = (
rdd
# apply your metric calculation independently on each partition
.mapPartitions(confidence_partial)
# collect results from the partitions into a single list of results
.collect()
# reduce the list to combine the metrics calculated on each partition
.reduce(confidence_combine)
)
两者confidence_partial
和confidence_combine
都是接受迭代器/列表输入的常规python函数。
顺便说一句,通过使用数据框 API 和原生表达式函数来计算指标,您可能会获得巨大的性能提升。
推荐阅读
- python - 了解循环内的 if 语句
- laravel - laravel 在 Auth\Register Controller 的特定列上添加默认值
- android - Android OpenGL ES2.0,着色器在某些设备上编译为假
- tensorflow - 关于同步更新的分布式张量流
- mysql - AWS RDS 上的 Mysql 模式显示数据库元数据中不可用的表 (information_schema)
- android - Listview中的交替颜色
- php - 将持续时间格式“%h%m%s”隐藏为秒
- spring-security - 如何使用 RSA 为 OAuth2 客户端凭据加密 client_secret?
- javascript - 将视频绘制到画布 firefox mobile
- javascript - 表格排序时 HTML 标题行间歇性消失