首页 > 解决方案 > PySpark:导出到 SQL Server 时更改时间戳

问题描述

我是 PySpark 的新手。我在 SQL Server 中有一个表,df如下所示:

DeviceID       TimeStamp            A      B     C
 00234       11-03-2014 05:55      5.6    2.3   3.3
 00235       11-03-2014 05:33      2.8    0.9   4.2
 00236       11-03-2014 06:15      3.5    0.1   1.3
 00234       11-03-2014 07:23      2.5    0.2   3.9
 00236       11-03-2014 07:33      2.5    4.5   2.9

目标/我想要的:找到max每个值DeviceID及其对应的TimeStamp. 此外,我还需要拥有当前时间戳,以便每天我应该知道max每个DeviceID.

所以结果df_final应该是

DeviceID    Max_Value       TimeStamp           Curr_TimeStamp
00234          5.6        11-03-2014 05:55     11-03-2014 23:54
00236          4.5        11-03-2014 07:33     11-03-2014 23:54

为了实现上述目的df_final,我使用了Window函数。下面是我的代码片段。

import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql import SparkSession

##Initialize Spark Session##
spark = SparkSession.builder.appName("test").config("spark.driver.extraClassPath", "/path/to/sqljdbc-6.4.jar").getOrCreate()
##Fetch data from SQL Server table df ##
df = spark.read.format("jdbc").options(url="SQL Server details",properties = 
{ "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver" 
},dbtable="df").load()

##Create a Temp View for further processing##
df.createOrReplaceTempView("df_temp")

##Get only a days data##
df_view = spark.sql("select * from df_temp where TimeStamp between date_add(current_date(),-1) and current_date()")

#Finally creating the dataframe df_final as required##
w = Window.partitionBy('DeviceImei')
df_final = df_view.select('DeviceImei','DeviceTimeStamp',F.greatest('IL1','IL2','IL3').alias('max_value'))
df_final = df_final.withColumn('Max-TimeStamp',F.max('max_value').over(w)).where(F.col('max_value') == F.col('Max-TimeStamp')).drop('Max-TimeStamp').withColumn('TimeStamp',F.current_timestamp())

到目前为止,一切都很好!!但是,当我将其导出到另一个 SQL Server 表时,会发生奇怪的事情。

df_final.write.jdbc(url="SQL Server details", table="MaxLoad", mode="append", properties={ "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver" })

我得到如下:

DeviceID    Max_Value         TimeStamp         Curr_TimeStamp
00234          5.6        10-03-2014 10:55     11-03-2014 23:54
00236          4.5        10-03-2014 12:33     11-03-2014 23:54

如您所见,TimeStamp值已更改!

为什么会发生这样的事情?我在代码中遗漏了什么吗?我已经检查了 Spark 和 SQL 服务器机器上的系统时间戳,它们是完美的。

任何帮助,将不胜感激。

PS:Spark 2.4.1 在 CentOS 7 上运行,我在 Windows Server 2008 R2 机器上使用 SQL Server 2014

标签: pythonsql-serverpyspark

解决方案


所以我敢打赌你的 Spark Cluster 和 MS SQL Server 位于不同的时区。我经历过这一点,解决方案是通过设置 conf 来使用 UTC TZ spark.conf.set("spark.sql.session.timeZone", "Etc/UTC")。通过设置此配置,当您现在坚持到 MS SQL Server 时,您的时间戳应该会为您提供您所期望的。

免责声明:我认为将 Spark TZ conf 设置为 UTC 将解决您的问题,但是您的时间戳格式也可能会导致问题......推荐的 java 格式是 yyyy-MM-dd HH:mm:ss

这是您的一个时间戳在不同时区表现不同的示例

spark.version
'2.4.3'

from pyspark.sql.functions import *

# you can check Spark Cluster TZ like this
spark.conf.get("spark.sql.session.timeZone")
"will list your server tz here"

# change to UTC to fix problem / preserve event time source data timestamp
spark.conf.set("spark.sql.session.timeZone", "Etc/UTC")

# let's take one of your timestamp and convert to unix for testing
ut = spark.createDataFrame([('11-03-2014 05:55',)], ['ut'])
ut.select(unix_timestamp('ut', 'MM-dd-yyyy HH:mm').alias('ut')).show()

+----------+
|        ut|
+----------+
|1414994100|
+----------+

# let's test the output with a system set at LA TZ to see the timestamp changes
spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")

la_time = spark.createDataFrame([(1414994100,)], ['la_tz'])
la_time.select(from_unixtime('la_tz').alias('la_tz')).show() # different ts as source

+-------------------+
|              la_tz|
+-------------------+
|2014-11-02 21:55:00|
+-------------------+

# set TZ back to UTC to confirm timestamp has preserved source data event time
spark.conf.set("spark.sql.session.timeZone", "Etc/UTC")

utc = spark.createDataFrame([(1414994100,)], ['utc_tz'])
utc.select(from_unixtime('utc_tz').alias('utc_tz')).show() # same ts as source

+-------------------+
|             utc_tz|
+-------------------+
|2014-11-03 05:55:00|
+-------------------+

# reset TZ conf if you want
spark.conf.unset("spark.sql.session.timeZone")

# if you want to change your timestamp format
ts = spark.createDataFrame([('11-03-2014 05:55',)], ['ts'])
ts.select(to_timestamp('ts', 'MM-dd-yyyy HH:mm').alias('ts')).show()

+-------------------+
|                 ts|
+-------------------+
|2014-11-03 05:55:00|
+-------------------+


推荐阅读