python - Spark:加载多个文件,单独分析,合并结果,保存
问题描述
我是 Spark 的新手,不太了解如何问这个问题(使用哪些术语等),所以这是我在概念上试图完成的图片:
我有很多小的、单独的 .txt “分类帐”文件(例如,当时带有时间戳和属性值的行分隔文件)。
我想:
将每个“分类帐”文件读入单独的数据框(阅读:不合并为一个大数据框);
对每个单独的数据框执行一些基本计算,从而产生一行新的数据值;接着
将所有单独的结果行合并到最终对象中,并将其以行分隔文件的形式保存到磁盘。
似乎我找到的几乎每个答案(在谷歌搜索相关术语时)都是关于将多个文件加载到单个 RDD 或 DataFrame 中,但我确实找到了这个 Scala 代码:
val data = sc.wholeTextFiles("HDFS_PATH")
val files = data.map { case (filename, content) => filename}
def doSomething(file: String) = {
println (file);
// your logic of processing a single file comes here
val logData = sc.textFile(file);
val numAs = logData.filter(line => line.contains("a")).count();
println("Lines with a: %s".format(numAs));
// save rdd of single file processed data to hdfs comes here
}
files.collect.foreach( filename => {
doSomething(filename)
})
... 但:
A. 我不知道这是否使读取/分析操作并行化,并且
B. 我不认为它可以将结果合并到一个对象中。
非常感谢任何方向或建议!
更新
似乎我正在尝试做的事情(在多个文件上并行运行脚本,然后合并结果)可能需要线程池(?)之类的东西。
为清楚起见,这是我想对通过读取“分类帐”文件创建的 DataFrame 执行的计算示例:
from dateutil.relativedelta import relativedelta
from datetime import datetime
from pyspark.sql.functions import to_timestamp
# Read "ledger file"
df = spark.read.json("/path/to/ledger-filename.txt")
# Convert string ==> timestamp & sort
df = (df.withColumn("timestamp", to_timestamp(df.timestamp, 'yyyy-MM-dd HH:mm:ss'))).sort('timestamp')
columns_with_age = ("location", "status")
columns_without_age = ("wh_id")
# Get the most-recent values (from the last row of the df)
row_count = df.count()
last_row = df.collect()[row_count-1]
# Create an empty "final row" dictionary
final_row = {}
# For each column for which we want to calculate an age value ...
for c in columns_with_age:
# Initialize loop values
target_value = last_row.__getitem__(c)
final_row[c] = target_value
timestamp_at_lookback = last_row.__getitem__("timestamp")
look_back = 1
different = False
while not different:
previous_row = df.collect()[row_count - 1 - look_back]
if previous_row.__getitem__(c) == target_value:
timestamp_at_lookback = previous_row.__getitem__("timestamp")
look_back += 1
else:
different = True
# At this point, a difference has been found, so calculate the age
final_row["days_in_{}".format(c)] = relativedelta(datetime.now(), timestamp_at_lookback).days
因此,这样的分类帐:
+---------+------+-------------------+-----+
| location|status| timestamp|wh_id|
+---------+------+-------------------+-----+
| PUTAWAY| I|2019-04-01 03:14:00| 20|
|PICKABLE1| X|2019-04-01 04:24:00| 20|
|PICKABLE2| X|2019-04-01 05:33:00| 20|
|PICKABLE2| A|2019-04-01 06:42:00| 20|
| HOTPICK| A|2019-04-10 05:51:00| 20|
| ICEXCEPT| A|2019-04-10 07:04:00| 20|
| ICEXCEPT| X|2019-04-11 09:28:00| 20|
+---------+------+-------------------+-----+
将减少到(假设计算在 2019-04-14 运行):
{ '_id': 'ledger-filename', 'location': 'ICEXCEPT', 'days_in_location': 4, 'status': 'X', 'days_in_status': 3, 'wh_id': 20 }
解决方案
wholeTextFiles
不推荐使用,因为它会将整个文件一次加载到内存中。如果您真的想为每个文件创建一个单独的数据框,您可以简单地使用完整路径而不是目录。但是,不建议这样做,并且很可能会导致资源利用率低下。相反,请考虑使用input_file_path
https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/functions.html#input_file_name--
例如:
spark
.read
.textFile("path/to/files")
.withColumn("file", input_file_name())
.filter($"value" like "%a%")
.groupBy($"file")
.agg(count($"value"))
.show(10, false)
+----------------------------+------------+
|file |count(value)|
+----------------------------+------------+
|path/to/files/1.txt |2 |
|path/to/files/2.txt |4 |
+----------------------------+------------+
因此可以单独处理文件,然后再合并。
推荐阅读
- mysql - 当“where in”子句是带有替换的 JSON 括号/引号的动态字段时,在子查询中选择行不起作用
- python - Django Navbar 类别链接问题
- nativescript - 尝试在 nativescript sidekick 上发布时出错
- javascript - 图像和可编辑标题的 Tiptap 扩展
- python - 为什么 Django base.css 不保存我的更改?
- arrays - 无法将类型“[dataModel]”的值转换为预期的参数类型“(dataModel) throws -> Bool”
- amazon-web-services - Amazon Transcribe 中的 LENCE 是什么
- botframework - TypeError:azure.DocumentDbClient 不是构造函数
- javascript - 如何将此延迟模式弹出窗口上的电子邮件表单 (mailchimp) 更改为此 (getresponse) 代码?
- manual-testing - 测试策略和测试计划的区别?