python - Pyspark读取csv并结合日期和时间列并基于它进行过滤
问题描述
我有大约 10,000 个 csv 文件,每个文件包含 14 列。它们包含有关金融机构、交易价值、日期和时间的数据。
一些 csv 文件只是标题,其中没有数据。我设法在我的本地 hadoop 文件系统上加载了所有 csv 文件。我想要实现的是过滤数据以包含仅在上午 9 点到下午 6 点之间发生的记录。
我将如何实现这一目标?我对 lambda 和 filter 很困惑,所有东西都存在于 spark-python 中。
你能告诉我如何过滤这个并使用过滤后的数据进行其他分析吗?
PS,冬季时间和夏季时间也需要考虑,我想我应该有一些功能可以将时间更改为UTC格式吗?
由于我关心的是基于 csv 文件中的时间列过滤数据,因此我简化了 csvs。让我们说:
CSV 1:(过滤器.csv)
- ISIN、货币、日期、时间
- "1","欧元",2018-05-08,07:00
- "2","欧元",2018-05-08,17:00
- "3","欧元",2018-05-08,06:59
- "4","欧元",2018-05-08,17:01
CSV 2:(NoFilter.csv)
- ISIN、货币、日期、时间
- "1","欧元",2018-05-08,07:01
- "2","欧元",2018-05-08,16:59
- "3","欧元",2018-05-08,10:59
- "4","欧元",2018-05-08,15:01
我的代码是:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)
ehsanLocationFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/Filter.csv'
ehsanLocationNonFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/NoFilter.csv'
df = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationNonFiltered)
dfFilter = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationFiltered)
data = df.rdd
dataFilter = dfFilter.rdd
data.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')
dataFilter.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')
print data.count()
print dataFilter.count()
我希望看到 data.count 返回 4,因为所有时间都适合范围,而 dataFilter.count 返回 0,因为没有匹配时间。
谢谢!
解决方案
在您的代码中,您只能使用“csv”作为格式
from pyspark import SparkContext, SparkConf
ehsanLocationFiltered = '/FileStore/tables/stackoverflow.csv'
df = sqlContext.read.format('csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationFiltered).rdd
result=data.map(lambda row: row.Time > '07:00' and row.Time < '17:00')
result.count()
推荐阅读
- php - @property 注释和 PHP 中的受保护变量有什么区别?
- postgresql - PostreSQL - “运算符不存在:字符变化 = 字符变化 []:将数组传递给 ANY 时?
- angular - 是否可以在 Angular 的 kendo-editor 中以某种方式使用管道?
- reactjs - 反应原生的调试器问题?
- c++ - C ++为什么new关键字有效而malloc无效?
- flutter - 是否可以在 Flutter 中使用没有 MaterialApp 的 GridView?
- python - PyQt5通过拖动标签移动QDockWidget
- python - Numpy.nanstd 没有正确跳过通过 excel 读取的 DataFrame 的 nan 值
- javascript - 调用超链接 (tel:),通过 javascript 调用时数字为空白
- javascript - 有条件地分配对象值 JavaScript