首页 > 解决方案 > 如何使用 Apache Spark 仅流式传输文件的一部分

问题描述

我正在尝试将 Spark Streaming 和 Spark SQL 与 Python API 一起使用。

我有一个不断编辑的文件,每随机 N 秒添加一些行。

该文件可以是 JSON、XML、CSV 或 TXT,甚至是 SQL 表:我完全可以根据自己的情况自由选择最佳解决方案。

我有一定数量的字段,大约 4-5 个。以此表为例:

+-------+------+-------+--------------------+ 
| event |  id  | alert |      datetime      |
+-------+------+-------+--------------------+
| reg   |  1   | def1  | 06.06.17-17.24.30  |
+-------+------+-------+--------------------+
| alt   |  2   | def2  | 06.06.17-17.25.11  |
+-------+------+-------+--------------------+
| mot   |  3   | def5  | 06.06.17-17.26.01  |
+-------+------+-------+--------------------+
| mot   |  4   | def5  | 06.06.17-17.26.01  |
+-------+------+-------+--------------------+

我只想用 Spark Streaming 流式传输新行。所以,如果我添加了 2 个新行,下次我只想流式传输这两行而不是整个文件(已经流式传输)

此外,我想在每次找到新行时过滤或计算整个同一个文件的 Spark SQL 查询。例如,我想选择"mot"仅在 10 分钟内出现两次的事件,并且每次文件更改和新数据到达时都必须重做此查询。

Spark Streaming 和 Spark SQL 可以处理这些情况吗?如何?

标签: pythonapache-sparkpysparkapache-spark-sqlspark-streaming

解决方案


文件源 Spark不支持它

将写入目录中的文件作为数据流读取。支持的文件格式为 text、csv、json、orc、parquet。请参阅 DataStreamReader 接口的文档以获取最新列表以及每种文件格式支持的选项。请注意,文件必须以原子方式放置在给定目录中,在大多数文件系统中,这可以通过文件移动操作来实现

对于传统流式传输也是如此(请注意this 2.2 documentation,但实现没有改变)

必须通过将文件原子地移动或重命名到数据目录中来在 dataDirectory 中创建文件。


推荐阅读