首页 > 解决方案 > 将行格式化为向量,如何reduceByKey (list(n_1, m_1)....(n_k, m_k)) 到(n_1...n_k) (m_1...m_k))

问题描述

数据

RDD 从 textFile() 中读取,其中包含 (str-key, [int-id, int-value]) 对的列表。

[(u'ID1', (132, 1)),
 (u'ID2', (133, 3)),
 (u'ID3', (120, 5)),
 (u'ID4', (110, 0)),
 (u'ID5', (160, 2)),
 (u'ID6', (500, 9)),
 (u'ID7', (932, 8)),
 (u'ID8', (132, 1)),
 (u'ID1', (133, 6)),
 (u'ID8', (133, 1))]

输出 我想有效地创建(键,密集/稀疏向量)列表的RDD,尽可能少地洗牌

编辑:基于下面的评论。无论组/聚合如何,都无法在 Spark 中执行此操作

密集向量

正在读取的文件是按 int-id 排序的,所以如果我要丢弃 str-key 上的 int-id 和 reduceByKey,我可以形成一个 int-value 的 DenseVector

rdd.map(lambda x: (x[0], [x[1]]))\
    .reduceByKey(lambda a, b: a + b)\
    .map(lambda x: [x[0], DenseVector(x[1])])

会给我正确的 1 个分区的 int-value 排序,但速度很慢。如果分区和工作人员超过 1 个,这可能非常快,但顺序在 str-key 中是随机的。例如,对于 str-key ID1 和 ID8,所需的输出将是 [1, 6], [3, 1] 或 [6, 1], [1, 3] 但它不能是 [1, 6], [1, 3]。

1)有没有办法 reduceByKey 但保留文件/读取顺序(或根据 int-ID 重新排序结果)?

稀疏向量

对于 Sparsevector,我尝试直接输入 [int-d, int-value] 对的列表,但这需要跨 ID 聚合 afaik。groupByKey() 导致大量改组。

RDD.map(lambda x: (x[0], (int(x[1]), int(x[2]))))\
            .groupByKey()\
            .mapValues(list)\
            .mapValues(sorted)\
            .mapValues(lambda x: (SparseVector(N, x)))

该列表聚合每个 str-key 的数据 [(int-id, value), (int-id_2, value_2) .... (int-id_n, value_n)]。Sorted 在那里,因为 sparseVector 需要一个排序列表或字典。

2)有没有办法更有效地写这个?

标签: pythonpython-2.7apache-sparkpyspark

解决方案


如果数据是稀疏的(您可以计算确切的稀疏阈值,具体取决于密钥的预期大小),groupByKey则最佳解决方案是 - 对于必须洗牌的每一行:

  • 钥匙。
  • 价值。因为它是tuple原始值,所以不需要完全成熟__dict__,并且尺寸尽可能小。

由于您问题中的 (index, value) 对似乎是唯一的,因此值大小的 shuffle 并没有减少,但是任何复杂的对象(如向量)都可能比 a 具有更大的开销tuple

唯一可能的减少发生在关键方面。要实现一个,这超过了价值大小的增加,您需要一个合理密集的数据。

如果是这种情况,aggregateByKey可能会表现得更好,尽管合并的额外成本仍然会消耗 map-side combine 的可能好处。

def seq_func(acc, x):
    if x[1]:
        acc[x[0]] = acc.get(x[0], 0) + x[1]
    return acc

def comb_func(acc1, acc2):
    for k in acc2:
        acc1[k] = acc1.get(k, 0) + acc2[k]
    return acc1

rdd.aggregateByKey(dict(), seq_func, comb_func).mapValues(lambda d: SparseVector(N, d))

否则groupByKey,跳过排序并使用dict

rdd.groupByKey().mapValues(lambda x: SparseVector(N, dict(x)))

推荐阅读