首页 > 解决方案 > Elasticsearch Spark 解析问题 - 无法解析字段 [Y] 的值 [X]

问题描述

我正在使用 Spark 2.3 (Pyspark) 从 Elasticsearch 6.6 索引中读取数据。
Spark 作业正在尝试创建一个df并因解析问题而失败:

火花代码:

df = spark.read.format("org.elasticsearch.spark.sql").option("es.resource.read", index_name).option("es.nodes", hosts).load()

错误信息:

org.elasticsearch.hadoop.rest.EsHadoopParsingException: Cannot parse value [2019/05/06 19:31:21] for field [GenerateTime]

我相信这部分是由于源日期格式不是公认的ISO 8601格式。

此外,在阅读时间/日期映射文档时,我知道这可以通过创建映射来解决,但这只会影响新索引并且不会更改历史索引的映射。

问题:

有没有办法解决这个问题,以便我可以通过 Spark 从历史索引中成功读取(例如,在可能需要进行任何映射更改之前)?我也试过,.option("es.mapping.date.rich", False)没有任何运气。

标签: apache-sparkelasticsearchpyspark

解决方案


我已经根据您在ES 6.4/Spark 2.1版本中的数据创建了一个示例文档,并使用了以下代码,以便在 spark 中读取GenerateTime字段text而不是日期类型。

ES中的映射

PUT somedateindex
{
  "mappings": {
    "mydocs":{
      "properties": {
        "GenerateTime": {
          "type": "date",
          "format": "yyyy/MM/dd HH:mm:ss"
        }
      }
    }
  }
}

请注意,该字段是dateES 中的类型。

Spark 代码将 ES 中的日期字段用作字符串

请注意,我使用了配置选项(“es.mapping.date.rich”false

    val spark = SparkSession
                .builder()
                .appName("Spark SQL basic example")
                .config("spark.master", "local")
                .getOrCreate()

    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._

    val df = spark.read.format("org.elasticsearch.spark.sql")
            .option("es.resource.read","somedateindex")
            .option("es.nodes", "some_host_name")
            .option("es.mapping.date.rich", false)
            .option("es.port","9200")
            .load()

   df.show()
   df.printSchema()

我的 Eclipse 控制台中的 Spark 代码结果:

19/05/13 03:10:53 INFO DAGScheduler: Job 1 finished: show at Elasticsearch.scala:134, took 9.424294 s
19/05/13 03:10:53 INFO CodeGenerator: Code generated in 21.256205 ms
+-------------------+
|       GenerateTime|
+-------------------+
|2019/05/06 19:31:21|
+-------------------+

root
 |-- GenerateTime: string (nullable = true)

19/05/13 03:10:53 INFO SparkUI: Stopped Spark web UI at....

请注意,这printSchema表明 table 有一个GenerateTime类型为 的列string

如果您不想继续更改映射,上面应该对您有所帮助。

我建议使用日期格式而不是文本格式的日期字段,并且也使用 ISO-8601 支持的格式,这样当类型推断开始时,您最终会在 Spark 中获得正确类型的数据,您可以简单地专注于业务逻辑,很多时候,正确的解决方案在于我们如何存储数据,而不是我们如何处理它。

将字符串转换为时间戳/日期的 Spark 代码

但是,如果由于某种原因您无法从源(即 elasticsearch)更改映射,您可以进一步添加以下代码,使用以下代码将字符串值转换为时间戳:

    import org.apache.spark.sql.functions._

    //String into Timestamp Transformation
    val df2_timestamp = df.withColumn("GenerateTime_timestamp",  from_unixtime(unix_timestamp($"GenerateTime", "yyyy/MM/dd HH:mm:ss")).cast(TimestampType))
    df2_timestamp.show(false)
    df2_timestamp.printSchema();

如果你运行上面的代码,你会看到如下输出:

19/05/14 11:33:10 INFO CodeGenerator: Code generated in 23.742359 ms
+-------------------+----------------------+
|GenerateTime       |GenerateTime_timestamp|
+-------------------+----------------------+
|2019/05/06 19:31:21|2019-05-06 19:31:21.0 |
+-------------------+----------------------+

root
 |-- GenerateTime: string (nullable = true)
 |-- GenerateTime_timestamp: timestamp (nullable = true)

19/05/14 11:33:10 INFO SparkContext: Invoking stop() from shutdown hook

另请注意,我的解决方案是在 Scala 中。让我知道它是否有帮助!


推荐阅读