首页 > 解决方案 > 如何在spark中同步执行器之间的函数以避免写入Elastic时并发

问题描述

我有一个函数将被调用以使用 spark 和 scala 将 DataFrame 写入弹性搜索。(DataFrame 在函数调用之前创建)

def writeToES(dfForES: DataFrame, indexName: String, spark: SparkSession, conf: JSONObject) = {
    import org.apache.spark.sql.functions.col
    val doc_id_cols = Array("zip_id", "pattern_name", "row_index")
    if (indexName == conf.getString("elkParserIndex")) {
      println("Parser Index")
        .withColumn("row_index", col("line_number").cast(IntegerType))
        .write.format("org.elasticsearch.spark.sql")
        .mode("append")
        .save(conf.getString("elkParserIndex"))
}

我有 5 个执行器,每个执行器有 3 个核心。他们并行调用此函数,而弹性搜索会给出异常,因为它无法处理批量并行负载。

org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException:连接错误(检查网络和/或代理设置)- 所有节点

由于我是弹性新手,因此无法在弹性中处理此异常,并希望通过避免并发来处理火花。有没有办法处理这个?

标签: scalaapache-sparkelasticsearchapache-spark-sql

解决方案


在您的语句中适当地使用 .coalese(1) 或 .repartition(1) 将导致所有数据被洗牌到 Worker 上的单个 Executor。

这意味着 1 个进程并且没有并发问题。这也意味着较低的吞吐量。


推荐阅读