首页 > 解决方案 > py spark成功写入无效日期但读取时抛出异常

问题描述

我正在使用 py-spark 处理一些数据并将处理后的文件以 parquet 格式写入 S3。批处理的代码在 docker 容器 (Linux) 中的 Ec2 上运行。该数据还包含一些日期时间字段,我将其保存为TimestampType(在 parquet 文件中),因为我需要在 athena 查询中支持该字段。如果此字段的值为'0001-01-01',批处理成功将其写入parquet文件,但只有在读取此数据时,才会抛出异常。这是 linux 机器上的行为。这是重现此的示例代码-

from pyspark.sql.types import StructType,StructField,DateType,TimestampType
from dateutil.parser import parse
d=parse('0001-01-01 00:00:00')
data=[{'createdon':d}]
distdata = sc.parallelize(data)
schema = StructType([StructField('createdon',TimestampType())])
df=spark.createDataFrame(distdata,schema)
df.write.parquet("\test-1")

执行此代码后,它将数据写入文件而不会出现任何错误。当我尝试阅读相同内容时,出现以下错误-

Traceback (most recent call last):                                                                    
  File "<stdin>", line 1, in <module>                                                                 
  File "/usr/local/lib/python3.6/site-packages/pyspark/sql/dataframe.py", line 572, in take           
    return self.limit(num).collect()                                                                  
  File "/usr/local/lib/python3.6/site-packages/pyspark/sql/dataframe.py", line 535, in collect        
    return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))                  
  File "/usr/local/lib/python3.6/site-packages/pyspark/serializers.py", line 147, in load_stream      
    yield self._read_with_length(stream)                                                              
  File "/usr/local/lib/python3.6/site-packages/pyspark/serializers.py", line 172, in _read_with_length
    return self.loads(obj)                                                                            
  File "/usr/local/lib/python3.6/site-packages/pyspark/serializers.py", line 580, in loads            
    return pickle.loads(obj, encoding=encoding)                                                       
  File "/usr/local/lib/python3.6/site-packages/pyspark/sql/types.py", line 1396, in <lambda>          
    return lambda *a: dataType.fromInternal(a)                                                        
  File "/usr/local/lib/python3.6/site-packages/pyspark/sql/types.py", line 633, in fromInternal       
    for f, v, c in zip(self.fields, obj, self._needConversion)]                                       
  File "/usr/local/lib/python3.6/site-packages/pyspark/sql/types.py", line 633, in <listcomp>         
    for f, v, c in zip(self.fields, obj, self._needConversion)]                                       
  File "/usr/local/lib/python3.6/site-packages/pyspark/sql/types.py", line 445, in fromInternal       
    return self.dataType.fromInternal(obj)                                                            
  File "/usr/local/lib/python3.6/site-packages/pyspark/sql/types.py", line 199, in fromInternal       
    return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000)           
ValueError: year 0 is out of range  

理想情况下,它不应该被写入,因为 createdon (datetime) 字段具有无效值,但这不是行为。我在这里做错了吗?

标签: pyspark

解决方案


我无法使用给定的时间戳重现该问题,但是有几种方法可以在编写时间戳字符串之前对其进行验证:

from pyspark.sql import functions as F
from pyspark.sql import types as T
from dateutil.parser import parse
import datetime


def isvaliddatetime(date_text):
    try:
        datetime.datetime.strptime(date_text, '%Y-%m-%d %H:%M:%S')
        return True
    except ValueError:
        return False


raw_timestamp = "0001-01-01 00:00:00"

assert isvaliddatetime(raw_timestamp)

data=[{'createdon': parse(raw_timestamp)}]
distdata = sc.parallelize(data)
schema = T.StructType([T.StructField('createdon', T.TimestampType())])
df = spark.createDataFrame(distdata, schema)
df.show()

df = df.withColumn("badTimestamp", F.when(F.to_date(F.col("createdon"), "yyyy-MM-dd HH:mm:ss").isNotNull(), False).otherwise(True))
assert df.where(F.col("badTimestamp")==True).count() == 0, f"Corrupt timestamps founds: {[r['createdon'] for r in df.where(F.col('badTimestamp')==True).collect()]}"
df = df.drop("badTimestamp")

df.write.mode('overwrite').parquet(hdfs_root + "\test-so-1")


df_read = spark.read.parquet(hdfs_root + "\test-so-1")
df_read.show()

也许问题是由于datetime, dateutil, 或pyspark(我使用2.4.0)版本造成的。有趣的是,上面的代码在从 parquet 写入和读取之前和之后输出以下值:

创建于
0001-01-03 00:06:32

这与原始字符串不同,因此该值被强制为不同的值。对我来说似乎是一个错误。


推荐阅读