首页 > 解决方案 > 根据 Spark 中的移动总和将批号添加到 DataFrame

问题描述

我有一个需要批量处理的数据集(由于 API 限制)。

一个批次的 text_lenth 列之和不能超过 1000。并且批处理中的最大行数不能大于 5

为此,我想将批号添加到单个批次中,以便稍后根据 batch_numbers 处理数据。

如何在 pyspark(在 Databricks 中)中实现这一点。我对这一切都很陌生,我什至不知道在网上寻找什么。

我真的很感谢你的帮助。

下表说明了我想要实现的目标:

原表

ID 文本长度
1 500
2 400
3 200
4 300
5 100
6 100
7 100
8 100
9 100
10 300

结果表

ID 文本长度 批号
1 500 1
2 400 1
3 200 2
4 300 2
5 100 2
6 100 2
7 100 2
8 100 3
9 100 3
10 300 3

标签: pythondataframeapache-sparkpyspark

解决方案


如果您不是在寻找最佳解决方案,而是在寻找一种在 Spark 中解决问题而又不太复杂的方法,我们可以将问题分为两个步骤:

  1. 将数据分成块,每行 5 行,忽略文本长度
  2. 如果一个块中的文本长度之和太大,则将该块拆分为更多块

该解决方案不是最优的,因为它产生了太多批次。

步骤 1 可以使用zipWithIndex来实现。在创建批次 id 时,我们留出足够的“空间”来稍后划分批次。在此步骤结束时,一个块中的所有行都被分组到一个列表中,作为步骤 2 的输入:

df = ...

r = df.rdd.zipWithIndex().toDF() \
    .select("_1.id", "_1.text_length", "_2") \
    .withColumn("batch", F.expr("cast(_2 / 5 as long)*5")) \
    .withColumn("data", F.struct("id", "text_length", "batch")) \
    .groupBy("batch") \
    .agg(F.collect_list("data").alias("data"))

第 2 部分主要由一个udf组成,该 udf检查是否在一个批次中超过了最大文本长度。如果是这样,则以下元素的批次 id 加一。由于我们在第 1 部分中跳过了足够多的批次 ID,因此我们没有遇到任何冲突。

def splitBatchIfNecessary(data):
    text_length = 0
    batch = -1
    for d in data:
        text_length = text_length + d.text_length
        if text_length > 1000:
          if batch == -1:
            text_length = 0
            batch = d.batch + 1
            yield (d.id, d.text_length, d.batch)
          else:
            text_length = d.text_length
            batch = batch + 1
            yield (d.id, d.text_length, batch)          
        else:
          if batch == -1:
            batch = d.batch
          yield (d.id, d.text_length, batch)

schema=r.schema["data"].dataType
split_udf = F.udf(splitBatchIfNecessary, schema)

r = r.withColumn("data",split_udf(F.col("data")) ) \
      .selectExpr("explode(data)") \
      .select("col.*") 

输出:

+---+-----------+-----+                                                         
| id|text_length|batch|
+---+-----------+-----+
|  1|        500|    0|
|  2|        400|    0|
|  3|        200|    1|
|  4|        300|    1|
|  5|        100|    1|
|  6|        100|    5|
|  7|        100|    5|
|  8|        100|    5|
|  9|        100|    5|
| 10|        300|    5|
+---+-----------+-----+

可能的优化是zipWithIndexzipWithUniqueIds替换(但获得稍微“不完整”的批次)或使用矢量化 udf


推荐阅读