apache-spark - 如何在apache spark中使用sql提取工作日的特定时间间隔?
问题描述
我在使用 apache spark的sql 表数据块中加载了 csv 文件。我需要提取具有内容的 sql 表列:
01.01.2018,15:25
01.01.2018,00:10
01.01.2018,13:20
...
...
仅代表工作日和上午 8.30 到 9.30 之间的时间的数据 我应该怎么做?我应该先在两列上提取列吗?我发现如何处理我输入数据块的数据的某些部分,但这些数据是 sql 表的一部分。
来自经典 sql 的一些命令也不适用于 apache spark ,这意味着数据块。
这是读取数据的查询:
# File location and type
file_location = "/FileStore/tables/NEZ_OPENDATA_2018_20190125-1.csv"
file_type = "csv"
# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","
# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.option("header", first_row_is_header) \
.option("sep", delimiter) \
.load(file_location)
display(df)
# Create a view or table
temp_table_name = "NEZ_OPENDATA_2018_20190125"
df.createOrReplaceTempView(temp_table_name)
%sql
/* Query the created temp table in a SQL cell */
select * from `NEZ_OPENDATA_2018_20190125`
permanent_table_name = "NEZ_OPENDATA_2018_20190125"
df.write.format("parquet").saveAsTable(permanent_table_name)
解决方案
作为文本文件读取可能更合适,因为时间戳包含日期和时间。然后,您可以使用相关的 Pyspark 函数过滤星期几和时间。请注意,星期几是 1 代表星期日,2 代表星期一,......等等。
import pyspark.sql.functions as F
file_location = "/FileStore/tables/NEZ_OPENDATA_2018_20190125-1.csv"
df = spark.read.text(file_location).toDF('timestamp')
result = df.select(
F.to_timestamp('timestamp', 'dd.MM.yyyy,HH:mm').alias('timestamp')
).filter(
F.dayofweek('timestamp').isin([2,3,4,5,6]) & (
( (F.hour('timestamp') == 8) & (F.minute('timestamp').between(30,59)) ) |
( (F.hour('timestamp') == 9) & (F.minute('timestamp').between(0,30)) )
)
)
如果要显示输出,可以执行result.show()
或display(result)
。
推荐阅读
- javascript - 在表格中切换各个切换开关
- python - 如何离线安装kivy?
- node.js - 您可以在 Cloudinary 上编辑用户上传图像的质量吗?
- python-3.x - 编写 SpaCy NER 提取器的优雅方式?
- jquery - 当图标仅处于活动状态时如何更改它?
- javascript - Lodash过滤器不适用于多种条件
- python-3.x - 为什么“测试过程”在深度学习的时代循环中?
- mysql - Mysql 8.0_新建db_“Schema目录已存在”
- firebase - 访问嵌入在静态 pod 中的 xib
- zend-form - 使用来自控制器的参数来操作 zend 表单是不是一个坏主意?