首页 > 解决方案 > 如何进行安静的服务调用以快速(er)更新数据框的列?

问题描述

我们需要调用一个外部的 restful 服务来更新 a 中的列值Dataset。我们正在使用 UDF 函数来进行非常慢的安静服务调用。

dataset.withColumn("upper", upperUDF('call restful service'))

这是一个同步呼叫,25,000 个帐户花费了大约 1 小时 10 分钟(每个帐户发出一个呼叫)。

如何让它更快?

标签: apache-sparkapache-spark-sql

解决方案


我建议将 转换DatasetRDDusingDataset.rdd然后RDD.foreachPartition

val names = Seq("hello", "world").toDF("name")

scala> names.show
+-----+
| name|
+-----+
|hello|
|world|
+-----+

scala> names.rdd.foreachPartition(p => p.map(n => "call restful service for " + n).foreach(println))
call restful service for [hello]
call restful service for [world]

然后,您可以为相同的条目考虑一个本地缓存,以避免耗时的 restful 服务调用。


从评论:

这如何提高性能?

  1. RDD.foreachPartition使您可以作为迭代器访问所有元素(惰性和内存友好),因此您可以通过使用本地缓存(每个分区或每个执行程序,因此在执行程序上执行的所有分区/任务都可以使用缓存)来避免外部调用。

  2. 可以更改分区数量以避免过多的并行外部调用 (DDOS)。使用RDD.repartitionRDD.coalesce运算符。此外,您可以通过用于从中读取数据集的数据源来控制分区数。

从 API 获取响应后如何更新相应的列

由于您离开了 Dataset API 并希望使用 RDD API 进行外部调用,所以问题是如何从 RDD 回到 Datasets。就这么简单RDD.toDF(comma-separated column names)。这些列必须与 RDD 表示匹配,并且取决于 RDD 的案例类。


推荐阅读