apache-spark - 关于使用 parquet 处理时间序列数据的问题
问题描述
我正在探索以可扩展且具有成本效益的方式存储来自传感器的大量数据(时间序列数据)的方法。
目前,我正在为每个传感器编写一个按日期分区的 CSV 文件,因此我的文件系统层次结构如下所示:
client_id/sensor_id/year/month/day.csv
我的目标是能够对此数据执行 SQL 查询,(通常为特定客户端/传感器获取时间范围,执行聚合等)我尝试将其加载到Postgres
and timescaledb
,但体积太大而且查询慢得不合理。
我现在正在尝试使用Spark
和Parquet
文件来执行这些查询,但是我有一些问题我无法从我对这个主题的研究中得到答案,即:
我正在将这些数据转换为镶木地板文件,所以我现在有这样的东西:
client_id/sensor_id/year/month/day.parquet
但我担心的是,当Spark
加载包含许多Parquet
文件的顶层文件夹时,行组信息的元数据没有像我使用一个包含所有数据的单个 parquet 文件那样优化,分区为client/sensor/year/month/day
. 这是真的?还是拥有多个 parquet 文件或单个分区 Parquet 文件是否相同?我知道镶木地板文件在内部存储在一个文件夹层次结构中,就像我正在使用的那样,但我不清楚这如何影响文件的元数据。
我无法做到这一点的原因是我不断接收新数据,据我了解,由于页脚元数据的工作性质,我无法附加到镶木地板文件。这个对吗?现在,我只需将前一天的数据转换为 parquet 并为每个客户端的每个传感器创建一个新文件。
谢谢你。
解决方案
您可以将结构化流与 kafka 一起使用(因为您已经在使用它)来实时处理您的数据并以 parquet 格式存储数据。而且,是的,您可以将数据附加到镶木地板文件中。使用 SaveMode.Append ,例如
df.write.mode('append').parquet(path)
您甚至可以按小时对数据进行分区。客户端/传感器/年/月/日/小时,这将进一步为您提供查询时的性能改进。您可以根据要对数据运行的查询类型基于系统时间或时间戳列创建小时分区。
如果您选择基于时间戳列进行分区,则可以使用watermaking来处理延迟记录。
希望这可以帮助!
推荐阅读
- javascript - 检查 TypeScript 中的全局或窗口对象上是否存在属性
- google-apps-script - 使用出现错误的 Google 表格 onOpen() 函数时,如何获得实时脚本错误通知?
- prolog - Prolog 作为第一种编程语言
- html - 展开一张卡片时展开所有卡片的高度
- python - 安装 djangocms-blog 后的 ImportError: cannot import name 'python_2_unicode_compatible' from 'django.utils .encoding'
- java - 如何从 pagedList 中删除项目
- javascript - 在 python 中运行 javascript 函数
- python - 我无法将 dtypes 对象的 2d numpy ndarray 转换为 dtypes float
- typescript - 如何使用 $out 聚合将多个数据从一个集合传输到另一个集合
- ios - 应用程序版本未出现在 App Store 连接中