pyspark - 分批拆分数据帧pyspark
问题描述
我的要求是将数据帧分成 2 个批次,每个批次仅包含 2 个项目,并且批次大小(输出中的 BATCH)逐渐增加。
col#1 col#2 DATE
A 1 202010
B 1.1 202010
C 1.2 202010
D 1.3 202001
E 1.4 202001
输出/输出
col#1 col#2 DATE BATCH
A 1 202010 1
B 1.1 202010 1
C 1.2 202010 2
D 1.3 202001 2
E 1.4 202001 3
解决方案
我能够通过以下方法实现这一目标:
def dfZipWithIndex (df, offset=1, colName='rowId'):
new_schema = StructType([StructField(colName,LongType(),True)]+
df.schema.fields)
zipped_rdd = df.rdd.zipWithIndex()
new_rdd =zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0])))
return spark.createDataFrame(new_rdd, new_schema)
chunk_size=2
final_new=dfZipWithIndex(input_df)
temp_final=input_df.withColumn('BATCH',F.floor(F.col('rowId')/chunk_size)+1)
推荐阅读
- maatwebsite-excel - maatwebsite/excel 表中的 Summey 单元格值没有没有前缀“$”符号
- java - ObjectMap Map 的 Avro Schema 定义
- azure - Microsoft SharePoint:是否有用于快速上传列表的 API?
- python - 如何在 Windows 上有效地“替换”`os.execvpe` - 如果“子”进程是交互式命令行应用程序?
- python - PyQt5 Qsqltablemodel 数据类型“在等于运算符中不兼容”,数据超过 128 个字符
- python - urllib.request.urlopen 如何处理文件中的 url 列表?
- python - numpy polynomial.Polynomial.fit() 给出的系数与 polynomial.polyfit() 不同
- django - Django图像未上传
- javascript - 从js中的文本文件中删除多行
- python - Python 未在命令行中显示