python - 根据 Spark 中的移动总和将批号添加到 DataFrame
问题描述
我有一个需要批量处理的数据集(由于 API 限制)。
一个批次的 text_lenth 列之和不能超过 1000。并且批处理中的最大行数不能大于 5。
为此,我想将批号添加到单个批次中,以便稍后根据 batch_numbers 处理数据。
如何在 pyspark(在 Databricks 中)中实现这一点。我对这一切都很陌生,我什至不知道在网上寻找什么。
我真的很感谢你的帮助。
下表说明了我想要实现的目标:
原表
ID | 文本长度 |
---|---|
1 | 500 |
2 | 400 |
3 | 200 |
4 | 300 |
5 | 100 |
6 | 100 |
7 | 100 |
8 | 100 |
9 | 100 |
10 | 300 |
结果表
ID | 文本长度 | 批号 |
---|---|---|
1 | 500 | 1 |
2 | 400 | 1 |
3 | 200 | 2 |
4 | 300 | 2 |
5 | 100 | 2 |
6 | 100 | 2 |
7 | 100 | 2 |
8 | 100 | 3 |
9 | 100 | 3 |
10 | 300 | 3 |
解决方案
如果您不是在寻找最佳解决方案,而是在寻找一种在 Spark 中解决问题而又不太复杂的方法,我们可以将问题分为两个步骤:
- 将数据分成块,每行 5 行,忽略文本长度
- 如果一个块中的文本长度之和太大,则将该块拆分为更多块
该解决方案不是最优的,因为它产生了太多批次。
步骤 1 可以使用zipWithIndex来实现。在创建批次 id 时,我们留出足够的“空间”来稍后划分批次。在此步骤结束时,一个块中的所有行都被分组到一个列表中,作为步骤 2 的输入:
df = ...
r = df.rdd.zipWithIndex().toDF() \
.select("_1.id", "_1.text_length", "_2") \
.withColumn("batch", F.expr("cast(_2 / 5 as long)*5")) \
.withColumn("data", F.struct("id", "text_length", "batch")) \
.groupBy("batch") \
.agg(F.collect_list("data").alias("data"))
第 2 部分主要由一个udf组成,该 udf检查是否在一个批次中超过了最大文本长度。如果是这样,则以下元素的批次 id 加一。由于我们在第 1 部分中跳过了足够多的批次 ID,因此我们没有遇到任何冲突。
def splitBatchIfNecessary(data):
text_length = 0
batch = -1
for d in data:
text_length = text_length + d.text_length
if text_length > 1000:
if batch == -1:
text_length = 0
batch = d.batch + 1
yield (d.id, d.text_length, d.batch)
else:
text_length = d.text_length
batch = batch + 1
yield (d.id, d.text_length, batch)
else:
if batch == -1:
batch = d.batch
yield (d.id, d.text_length, batch)
schema=r.schema["data"].dataType
split_udf = F.udf(splitBatchIfNecessary, schema)
r = r.withColumn("data",split_udf(F.col("data")) ) \
.selectExpr("explode(data)") \
.select("col.*")
输出:
+---+-----------+-----+
| id|text_length|batch|
+---+-----------+-----+
| 1| 500| 0|
| 2| 400| 0|
| 3| 200| 1|
| 4| 300| 1|
| 5| 100| 1|
| 6| 100| 5|
| 7| 100| 5|
| 8| 100| 5|
| 9| 100| 5|
| 10| 300| 5|
+---+-----------+-----+
可能的优化是zipWithIndex
用zipWithUniqueIds替换(但获得稍微“不完整”的批次)或使用矢量化 udf。
推荐阅读
- javascript - Socket.io 在另一条使用 express 的路线上
- python - Python 抓取某个个人资料中每个帖子的点赞数
- python - 仅使用常量表达式的 f 字符串格式?
- python - 多次迭代后,Elasticsearch Python 不再保存索引
- java - 通过 SSH 隧道从 Java 程序连接到 AWS DocumentDB
- postgresql - Postgres db 规则根据列值的组合将记录限制为一条记录
- protocol-buffers - Protobuf oneof JSON 语法问题
- ios - VS 2019 存档管理器永远不会完成存档(iOS)应用程序包
- arm - ATSAMC21E ADC 读数随着 ADC 通道之间的明显耦合而不规律地跳跃 - 有没有人遇到过类似的行为?
- websphere - CWMBD0147E: 在事件 IBM DSI 810 的事件处理期间检测到致命错误