python - PySpark:导出到 SQL Server 时更改时间戳
问题描述
我是 PySpark 的新手。我在 SQL Server 中有一个表,df
如下所示:
DeviceID TimeStamp A B C
00234 11-03-2014 05:55 5.6 2.3 3.3
00235 11-03-2014 05:33 2.8 0.9 4.2
00236 11-03-2014 06:15 3.5 0.1 1.3
00234 11-03-2014 07:23 2.5 0.2 3.9
00236 11-03-2014 07:33 2.5 4.5 2.9
目标/我想要的:找到max
每个值DeviceID
及其对应的TimeStamp
. 此外,我还需要拥有当前时间戳,以便每天我应该知道max
每个DeviceID
.
所以结果df_final
应该是
DeviceID Max_Value TimeStamp Curr_TimeStamp
00234 5.6 11-03-2014 05:55 11-03-2014 23:54
00236 4.5 11-03-2014 07:33 11-03-2014 23:54
为了实现上述目的df_final
,我使用了Window
函数。下面是我的代码片段。
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql import SparkSession
##Initialize Spark Session##
spark = SparkSession.builder.appName("test").config("spark.driver.extraClassPath", "/path/to/sqljdbc-6.4.jar").getOrCreate()
##Fetch data from SQL Server table df ##
df = spark.read.format("jdbc").options(url="SQL Server details",properties =
{ "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
},dbtable="df").load()
##Create a Temp View for further processing##
df.createOrReplaceTempView("df_temp")
##Get only a days data##
df_view = spark.sql("select * from df_temp where TimeStamp between date_add(current_date(),-1) and current_date()")
#Finally creating the dataframe df_final as required##
w = Window.partitionBy('DeviceImei')
df_final = df_view.select('DeviceImei','DeviceTimeStamp',F.greatest('IL1','IL2','IL3').alias('max_value'))
df_final = df_final.withColumn('Max-TimeStamp',F.max('max_value').over(w)).where(F.col('max_value') == F.col('Max-TimeStamp')).drop('Max-TimeStamp').withColumn('TimeStamp',F.current_timestamp())
到目前为止,一切都很好!!但是,当我将其导出到另一个 SQL Server 表时,会发生奇怪的事情。
df_final.write.jdbc(url="SQL Server details", table="MaxLoad", mode="append", properties={ "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver" })
我得到如下:
DeviceID Max_Value TimeStamp Curr_TimeStamp
00234 5.6 10-03-2014 10:55 11-03-2014 23:54
00236 4.5 10-03-2014 12:33 11-03-2014 23:54
如您所见,TimeStamp
值已更改!
为什么会发生这样的事情?我在代码中遗漏了什么吗?我已经检查了 Spark 和 SQL 服务器机器上的系统时间戳,它们是完美的。
任何帮助,将不胜感激。
PS:Spark 2.4.1 在 CentOS 7 上运行,我在 Windows Server 2008 R2 机器上使用 SQL Server 2014
解决方案
所以我敢打赌你的 Spark Cluster 和 MS SQL Server 位于不同的时区。我经历过这一点,解决方案是通过设置 conf 来使用 UTC TZ spark.conf.set("spark.sql.session.timeZone", "Etc/UTC")
。通过设置此配置,当您现在坚持到 MS SQL Server 时,您的时间戳应该会为您提供您所期望的。
免责声明:我认为将 Spark TZ conf 设置为 UTC 将解决您的问题,但是您的时间戳格式也可能会导致问题......推荐的 java 格式是 yyyy-MM-dd HH:mm:ss
这是您的一个时间戳在不同时区表现不同的示例
spark.version
'2.4.3'
from pyspark.sql.functions import *
# you can check Spark Cluster TZ like this
spark.conf.get("spark.sql.session.timeZone")
"will list your server tz here"
# change to UTC to fix problem / preserve event time source data timestamp
spark.conf.set("spark.sql.session.timeZone", "Etc/UTC")
# let's take one of your timestamp and convert to unix for testing
ut = spark.createDataFrame([('11-03-2014 05:55',)], ['ut'])
ut.select(unix_timestamp('ut', 'MM-dd-yyyy HH:mm').alias('ut')).show()
+----------+
| ut|
+----------+
|1414994100|
+----------+
# let's test the output with a system set at LA TZ to see the timestamp changes
spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
la_time = spark.createDataFrame([(1414994100,)], ['la_tz'])
la_time.select(from_unixtime('la_tz').alias('la_tz')).show() # different ts as source
+-------------------+
| la_tz|
+-------------------+
|2014-11-02 21:55:00|
+-------------------+
# set TZ back to UTC to confirm timestamp has preserved source data event time
spark.conf.set("spark.sql.session.timeZone", "Etc/UTC")
utc = spark.createDataFrame([(1414994100,)], ['utc_tz'])
utc.select(from_unixtime('utc_tz').alias('utc_tz')).show() # same ts as source
+-------------------+
| utc_tz|
+-------------------+
|2014-11-03 05:55:00|
+-------------------+
# reset TZ conf if you want
spark.conf.unset("spark.sql.session.timeZone")
# if you want to change your timestamp format
ts = spark.createDataFrame([('11-03-2014 05:55',)], ['ts'])
ts.select(to_timestamp('ts', 'MM-dd-yyyy HH:mm').alias('ts')).show()
+-------------------+
| ts|
+-------------------+
|2014-11-03 05:55:00|
+-------------------+
推荐阅读
- node.js - 从另一个函数 (Firebase) 中安排 Cloud Function 触发器
- scala - 如何使用 scala 作为后端?
- r - 降低功能并合并两个数据框
- ruby-on-rails - Ruby on Rails 查询等于两个连接列
- elixir - 根据用户角色渲染部分模板
- java - 使用反射创建具有参数化构造函数的类的动态对象
- python-3.x - 使用 NetworkX 进行图排列和旋转
- google-app-engine - GAE 推送队列任务不在测试中运行,产生孤立的 dev_appserver 进程
- c++ - Gtkmm 盒子没有被移除
- javascript - 如何制作(复制到剪贴板)与 br 断线