pyspark - py spark成功写入无效日期但读取时抛出异常
问题描述
我正在使用 py-spark 处理一些数据并将处理后的文件以 parquet 格式写入 S3。批处理的代码在 docker 容器 (Linux) 中的 Ec2 上运行。该数据还包含一些日期时间字段,我将其保存为TimestampType(在 parquet 文件中),因为我需要在 athena 查询中支持该字段。如果此字段的值为'0001-01-01',批处理成功将其写入parquet文件,但只有在读取此数据时,才会抛出异常。这是 linux 机器上的行为。这是重现此的示例代码-
from pyspark.sql.types import StructType,StructField,DateType,TimestampType
from dateutil.parser import parse
d=parse('0001-01-01 00:00:00')
data=[{'createdon':d}]
distdata = sc.parallelize(data)
schema = StructType([StructField('createdon',TimestampType())])
df=spark.createDataFrame(distdata,schema)
df.write.parquet("\test-1")
执行此代码后,它将数据写入文件而不会出现任何错误。当我尝试阅读相同内容时,出现以下错误-
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.6/site-packages/pyspark/sql/dataframe.py", line 572, in take
return self.limit(num).collect()
File "/usr/local/lib/python3.6/site-packages/pyspark/sql/dataframe.py", line 535, in collect
return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
File "/usr/local/lib/python3.6/site-packages/pyspark/serializers.py", line 147, in load_stream
yield self._read_with_length(stream)
File "/usr/local/lib/python3.6/site-packages/pyspark/serializers.py", line 172, in _read_with_length
return self.loads(obj)
File "/usr/local/lib/python3.6/site-packages/pyspark/serializers.py", line 580, in loads
return pickle.loads(obj, encoding=encoding)
File "/usr/local/lib/python3.6/site-packages/pyspark/sql/types.py", line 1396, in <lambda>
return lambda *a: dataType.fromInternal(a)
File "/usr/local/lib/python3.6/site-packages/pyspark/sql/types.py", line 633, in fromInternal
for f, v, c in zip(self.fields, obj, self._needConversion)]
File "/usr/local/lib/python3.6/site-packages/pyspark/sql/types.py", line 633, in <listcomp>
for f, v, c in zip(self.fields, obj, self._needConversion)]
File "/usr/local/lib/python3.6/site-packages/pyspark/sql/types.py", line 445, in fromInternal
return self.dataType.fromInternal(obj)
File "/usr/local/lib/python3.6/site-packages/pyspark/sql/types.py", line 199, in fromInternal
return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000)
ValueError: year 0 is out of range
理想情况下,它不应该被写入,因为 createdon (datetime) 字段具有无效值,但这不是行为。我在这里做错了吗?
解决方案
我无法使用给定的时间戳重现该问题,但是有几种方法可以在编写时间戳字符串之前对其进行验证:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from dateutil.parser import parse
import datetime
def isvaliddatetime(date_text):
try:
datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S')
return True
except ValueError:
return False
raw_timestamp = "0001-01-01 00:00:00"
assert isvaliddatetime(raw_timestamp)
data=[{'createdon': parse(raw_timestamp)}]
distdata = sc.parallelize(data)
schema = T.StructType([T.StructField('createdon', T.TimestampType())])
df = spark.createDataFrame(distdata, schema)
df.show()
df = df.withColumn("badTimestamp", F.when(F.to_date(F.col("createdon"), "yyyy-MM-dd HH:mm:ss").isNotNull(), False).otherwise(True))
assert df.where(F.col("badTimestamp")==True).count() == 0, f"Corrupt timestamps founds: {[r['createdon'] for r in df.where(F.col('badTimestamp')==True).collect()]}"
df = df.drop("badTimestamp")
df.write.mode('overwrite').parquet(hdfs_root + "\test-so-1")
df_read = spark.read.parquet(hdfs_root + "\test-so-1")
df_read.show()
也许问题是由于datetime
, dateutil
, 或pyspark
(我使用2.4.0
)版本造成的。有趣的是,上面的代码在从 parquet 写入和读取之前和之后输出以下值:
创建于 |
---|
0001-01-03 00:06:32 |
这与原始字符串不同,因此该值被强制为不同的值。对我来说似乎是一个错误。
推荐阅读
- angular - 如何更改 imageCapture.takePhoto() 中的默认捕获图像大小?
- javascript - 如何使用基于各种条件的嵌套 Joi 模式
- react-native - 动画和状态问题:在 useEffect 中使用 Animated.spring
- javascript - adonisjs 5 有@adonisjs/drive 吗?
- python - 如何使用 python selenium chromedriver 隐藏 cmd 窗口
- python - 绘制 matplotlib 聚合数据 python
- php - 从字符串运行脚本的 eval 替代方法
- php - 从PHP中的数组中获取最大值
- r - r 如何在使用 saveVideo 生成的动画视频中设置 x 和 y 范围
- python - 解决这个问题的算法是什么?