apache-spark - Elasticsearch Spark 解析问题 - 无法解析字段 [Y] 的值 [X]
问题描述
我正在使用 Spark 2.3 (Pyspark) 从 Elasticsearch 6.6 索引中读取数据。
Spark 作业正在尝试创建一个df
并因解析问题而失败:
火花代码:
df = spark.read.format("org.elasticsearch.spark.sql").option("es.resource.read", index_name).option("es.nodes", hosts).load()
错误信息:
org.elasticsearch.hadoop.rest.EsHadoopParsingException: Cannot parse value [2019/05/06 19:31:21] for field [GenerateTime]
我相信这部分是由于源日期格式不是公认的ISO 8601格式。
此外,在阅读时间/日期映射文档时,我知道这可以通过创建映射来解决,但这只会影响新索引并且不会更改历史索引的映射。
问题:
有没有办法解决这个问题,以便我可以通过 Spark 从历史索引中成功读取(例如,在可能需要进行任何映射更改之前)?我也试过,.option("es.mapping.date.rich", False)
没有任何运气。
解决方案
我已经根据您在ES 6.4/Spark 2.1版本中的数据创建了一个示例文档,并使用了以下代码,以便在 spark 中读取GenerateTime
字段text
而不是日期类型。
ES中的映射
PUT somedateindex
{
"mappings": {
"mydocs":{
"properties": {
"GenerateTime": {
"type": "date",
"format": "yyyy/MM/dd HH:mm:ss"
}
}
}
}
}
请注意,该字段是date
ES 中的类型。
Spark 代码将 ES 中的日期字段用作字符串
请注意,我使用了配置选项(“es.mapping.date.rich”,false)
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.master", "local")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val df = spark.read.format("org.elasticsearch.spark.sql")
.option("es.resource.read","somedateindex")
.option("es.nodes", "some_host_name")
.option("es.mapping.date.rich", false)
.option("es.port","9200")
.load()
df.show()
df.printSchema()
我的 Eclipse 控制台中的 Spark 代码结果:
19/05/13 03:10:53 INFO DAGScheduler: Job 1 finished: show at Elasticsearch.scala:134, took 9.424294 s
19/05/13 03:10:53 INFO CodeGenerator: Code generated in 21.256205 ms
+-------------------+
| GenerateTime|
+-------------------+
|2019/05/06 19:31:21|
+-------------------+
root
|-- GenerateTime: string (nullable = true)
19/05/13 03:10:53 INFO SparkUI: Stopped Spark web UI at....
请注意,这printSchema
表明 table 有一个GenerateTime
类型为 的列string
。
如果您不想继续更改映射,上面应该对您有所帮助。
我建议使用日期格式而不是文本格式的日期字段,并且也使用 ISO-8601 支持的格式,这样当类型推断开始时,您最终会在 Spark 中获得正确类型的数据,您可以简单地专注于业务逻辑,很多时候,正确的解决方案在于我们如何存储数据,而不是我们如何处理它。
将字符串转换为时间戳/日期的 Spark 代码
但是,如果由于某种原因您无法从源(即 elasticsearch)更改映射,您可以进一步添加以下代码,使用以下代码将字符串值转换为时间戳:
import org.apache.spark.sql.functions._
//String into Timestamp Transformation
val df2_timestamp = df.withColumn("GenerateTime_timestamp", from_unixtime(unix_timestamp($"GenerateTime", "yyyy/MM/dd HH:mm:ss")).cast(TimestampType))
df2_timestamp.show(false)
df2_timestamp.printSchema();
如果你运行上面的代码,你会看到如下输出:
19/05/14 11:33:10 INFO CodeGenerator: Code generated in 23.742359 ms
+-------------------+----------------------+
|GenerateTime |GenerateTime_timestamp|
+-------------------+----------------------+
|2019/05/06 19:31:21|2019-05-06 19:31:21.0 |
+-------------------+----------------------+
root
|-- GenerateTime: string (nullable = true)
|-- GenerateTime_timestamp: timestamp (nullable = true)
19/05/14 11:33:10 INFO SparkContext: Invoking stop() from shutdown hook
另请注意,我的解决方案是在 Scala 中。让我知道它是否有帮助!
推荐阅读
- javascript - 如何使用 Rails 路由从 React 前端重定向?
- c - 使用 fscanf 后什么都没有读取
- c++ - 我正在修改一些代码,并将一些函数更改为系统调用,但现在我遇到了一些让我感到困惑的错误
- session-state - 是否可以使用 GTM 跨多个数据中心进行粘性会话?
- c++ - 构建 CppBenchmark 时出错 (https://github.com/chronoxor/CppBenchmark)
- .net-core - OpenIdConnect 的点网核心设置 redirect_uri 与点网框架不同?
- c++ - C++ 可重用模块类设计
- sql - 如何从两个表中获取特定记录
- javascript - 使用 .map 转换嵌套的 For 循环
- laravel - 图像在数据库中保存为 C:\wamp64\tmp\phpAE1C.tmp 而不是保存在 public/images