首页 > 解决方案 > 如何使用 spark 结构化流在 elasticsearch sink 中设置动态文档 ID

问题描述

在弹性搜索写入接收器中,我应该如何添加带有来自数据集字段的动态值的文档 ID。在我的情况下,我需要根据格式化数据集中的特定字段设置文档 ID。遇到“es.mapping.id”,但我将如何从我的数据集中获取值?

标签: spark-structured-streamingelasticsearch-hadoop

解决方案


发现只需将字段名称指定为“es.mapping.id”的值即可实现此目的

StreamingQuery query = finalData.writeStream()
                .outputMode(OutputMode.Append())
                .format("org.elasticsearch.spark.sql")
                .option("es.mapping.id", "input_key")        
                .option("checkpointLocation","/tmp/spark-checkpoint")
                .start("spark_index/doc");

推荐阅读