python - Spark:加载多个文件,执行相同的操作并合并到一个数据帧中
问题描述
我有很多小的、单独的 .txt 文件。对于这些文件中的每一个,我将多行按空格分成 2 列,start_time 和 end_time(一个浮点数)。
我想:
- 加载所有 .txt 文件
- 为每一行计算一个包含 (end_time - start_time) 的新列
- 为每一行添加一个带有文件名的新列
- 最后,我想用这个模式得到一个数据帧:
+------------+--------------+------------+------------+
| file_name | start_time | end_time | duration |
+------------+--------------+------------+------------+
我知道我可以简单地为每个文件和每一行创建一个循环,并一次将一行添加到数据框中,但我想知道是否有更快的方法来做到这一点。
我对事情完成的顺序不感兴趣,但最终结果的速度。我看到SparkContext 中提供了textFile()和wholeTextFiles()等现有函数,但我不知道如何使用它们来做我想做的事。
非常感谢任何方向或建议!
(对不起我的英语不好)
更新:
感谢@Shu 的帮助,这是我用来解决问题的最终代码
from pyspark.sql.functions import split, reverse, input_file_name
original_schema = [StructField("Start", FloatType(), True),
StructField("End", FloatType(), True)]
data_structure = StructType(original_schema)
df = self.spark_session.read.\
csv(path=PATH_FILES+'\\*.txt', header=False, schema=data_structure, sep='\t').\
withColumn("Filename", reverse(split(input_file_name(), "/")).getItem(0) ).\
withColumn("duration", col("End") - col("Start"))
df.show(20, False)
解决方案
使用 和 如果您的列由use spark.read.csv()
分隔,则读取文件。space
.option("delimiter"," ")
- 使用
input_file_name
函数获取文件名。
例子:
from pyspark.sql.functions import *
spark.read.option("header",true).\
option("delimiter"," ").\
csv("<path>").\
withColumn("file_name",input_file_name).\
withColumn("duration",col("end_time") - col("start_time")).show()
如果行被分隔,space
则使用文件中不存在的分隔符读取数据。
然后拆分数据
\\s+
并爆炸,现在我们将数据放入数据框行。使用substring函数提取
start_time,end_time
并减去它们以获得持续时间。
spark.read.csv("<file_path>").\
withColumn("input",explode(split(col("_c0"),"\\s+"))).\
withColumn("filename",input_file_name()).\
drop("_c0").\
show()
UPDATE
Using array index:
spark.read.csv("<file_path>").\
withColumn("input",explode(split(col("_c0"),"\\s+"))).\
withColumn("filename",reverse(split(input_file_name(),'/'))[0]).\
drop("_c0").\
show()
#or
spark.read.csv("<file_path>").\
withColumn("input",explode(split(col("_c0"),"\\s+"))).\
withColumn("filename",reverse(split(input_file_name(),'/')).getItem(0)).\
drop("_c0").\
show()
From Spark-2.4+ Using element_at:
spark.read.csv("<file_path>").\
withColumn("input",explode(split(col("_c0"),"\\s+"))).\
withColumn("filename",element_at(split(input_file_name(),'/'),-1)).\
drop("_c0").\
show()
推荐阅读
- kubernetes - 如果没有额外的就绪门,我们是否可以假设“ContainersReady”为真时“Ready”必须为真?
- powerbi - PowerBI 中具有多个条件的平均值
- javascript - YouTube 聊天直播嵌入不再起作用
- python - 在转换为元组时创建嵌套字典
- html - 如何在 Bootstrap 5 中将区域居中
- c - 调整 malloc 数组的大小
- javascript - Selenium webdriver 自动化测试给出错误:连接到系统的设备不起作用。(0x1F)
- python - 如何将python脚本保存为txt?
- sql - 在 SQL 中添加检查约束
- python-3.x - 生成器中的 ListComp 代码太复杂