mongodb - MongoDB 和 Spark:无法将 STRING 转换为 TimestampType
问题描述
我正在使用官方 MongoDB Spark 连接器从 MongoDB 集合中读取 Spark 中的数据,代码如下:
val spark = SparkSession.
builder().
appName("MongoDB to SQL").
getOrCreate()
val df = MongoSpark.load(spark, readConfig)
df.count()
readConfig 是 MongoDB 的标准读取配置,它工作正常。我遇到的问题是我从 MongoDB 作为字符串获取的一些日期/时间,它无法将其转换为 Spark 类型 TimestampValue:
INFO DAGScheduler: Job 1 failed: count at transfer.scala:159, took 3,138191 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver):
com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a TimestampType (value: BsonString{value='2999.12.31 14:09:34'})
at com.mongodb.spark.sql.MapFunctions$.com$mongodb$spark$sql$MapFunctions$$convertToDataType(MapFunctions.scala:200)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:39)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:37)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
从我在有问题的属性被列为df.printSchema()
之前调用时看到的.count()
| | | |-- endDate: string (nullable = true)
在 MongoDB 中,endDate 也存储为字符串。Spark 是否在此处执行了额外的步骤来检测模式?然后它无法施放它......?通过查看https://github.com/mongodb/mongo-spark/blob/master/src/main/scala/com/mongodb/spark/sql/MapFunctions.scala#L181的源代码,它只在那里进行简单的映射,不复杂的演员表。
使用的版本:Mongo-Scala-Driver 2.4.0、Mongo-Spark-Connector 2.3.0、Spark 2.3.1
解决方案
也许您的 Schema 的另一个字段会导致此错误,但不会导致“endDate”。您显示的没有错误信息说“endDate”会导致此错误。
MongoDB Connector for Spark 默认使用每个字段的 1000 个样本来构建其架构,因此如果一个字段包含不同的数据类型,例如 string datatype 和 datetime datatype,并且 MongoDB Connector for Spark 可能不会对字符串数据进行采样并将该字段作为日期时间数据类型。至少,当您使用 count 方法时,连接器会尝试从 mongodb 加载数据以在 spark 数据帧中指定数据类型,并导致此错误:“Cannot cast STRING into a TimestampType”
解决方案:
添加 MongoDB Connector for Spark 的示例数据以构建正确的模式。例如,在 pyspark 中:
df = session.read.format("com.mongodb.spark.sql.DefaultSource").option('sampleSize', 50000).load()
推荐阅读
- sql - 如何在Oracle中找到外键的引用列?
- security - 如何在 Steam、PlayFab、GameSparks 或 HeroicLabs 中使用会话票证实现身份验证?
- spring-boot - 如何在 SpringBoot2 OAuth2-JWT 中实现注销功能
- reactjs - Firebase 有时不会更新值
- assembly - A-20 线如何克服 Wrapping 问题
- azure - 如何使用 Azure CLI 创建 SQL 表?
- python - python - 如何计算一个序列在python的给定字符串中出现的次数?
- ios - 类型“UITextField”没有成员“textDidBeginEditingNotification”
- windows - Windows 中的打印屏幕
- python - 如何解压多个相同的元组