scala - 如何在spark中同步执行器之间的函数以避免写入Elastic时并发
问题描述
我有一个函数将被调用以使用 spark 和 scala 将 DataFrame 写入弹性搜索。(DataFrame 在函数调用之前创建)
def writeToES(dfForES: DataFrame, indexName: String, spark: SparkSession, conf: JSONObject) = {
import org.apache.spark.sql.functions.col
val doc_id_cols = Array("zip_id", "pattern_name", "row_index")
if (indexName == conf.getString("elkParserIndex")) {
println("Parser Index")
.withColumn("row_index", col("line_number").cast(IntegerType))
.write.format("org.elasticsearch.spark.sql")
.mode("append")
.save(conf.getString("elkParserIndex"))
}
我有 5 个执行器,每个执行器有 3 个核心。他们并行调用此函数,而弹性搜索会给出异常,因为它无法处理批量并行负载。
org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException:连接错误(检查网络和/或代理设置)- 所有节点
由于我是弹性新手,因此无法在弹性中处理此异常,并希望通过避免并发来处理火花。有没有办法处理这个?
解决方案
在您的语句中适当地使用 .coalese(1) 或 .repartition(1) 将导致所有数据被洗牌到 Worker 上的单个 Executor。
这意味着 1 个进程并且没有并发问题。这也意味着较低的吞吐量。
推荐阅读
- javascript - exit().remove() 不在视野范围内时不会删除节点
- python - 如何更改 pandas df 中的日期时间值?
- c# - 如何在列表中存储、排序整数(来自用户的输入)?
- android - 如何以编程方式相对于 ConstraintLayout 中的另一个视图设置视图的宽度/边距
- javascript - 我需要将图像放置在背景图像的底部,并在调整窗口大小时让它们保持原位
- java - 无法获取下载路径
- typescript - Jsdom 如何“轮询特定元素的存在”?
- json - 如何为 PowerShell 类属性指定 JSON 变量名称?
- annotations - %hint 注释是否导入/Dec 和自动注释?
- javascript - 如何根据用户输入检索和设置月份的最后一天?