scala - 将 Spark Dataframe 中的多个列发送到外部 API 并将结果存储在单独的列中
问题描述
我有一个包含 40 多列的 spark 数据框。和数百万行。我想创建另一列,它从上述数据框中接收 5 列,将 5 列中的每一行传递给单独的 Api(它采用这 5 个值并返回一些数据)并将结果存储在列中。
为简单起见,我使用以下示例:假设我有以下数据框。我想将每一行“食物”和“价格”发送到一个 API,该 API 返回一个结果,并将其存储在一个名为“组合”的单独列中
输入:
+----+------+-----+
|name|food |price|
+----+------+-----+
|john|tomato|1.99 |
|john|carrot|0.45 |
|bill|apple |0.99 |
|john|banana|1.29 |
|bill|taco |2.59 |
+----+------+-----+
输出:
+----+------+-----+----------+
|name|food |price|combined |
+----+------+-----+----------+
|john|tomato|1.99 |abcd |
|john|carrot|0.45 |fdg |
|bill|apple |0.99 |123fgfg |
|john|banana|1.29 |fgfg4wf |
|bill|taco |2.59 |gfg45gn |
+----+------+-----+----------+
我创建了一个 UDF 来查看每一行:
val zip = udf {
(food: String, price: Double) =>
val nvIn = new NameValue
nvIn.put("Query.ID", 1234)
nvIn.put("Food", food)
nvIn.put("Price", price)
val nvOut = new NameValue
val code: Code = getTunnelsClient().execute("CombineData", nvIn, nvOut) // this is calling the external API
nvOut.get("CombineData") //this is stored the result column
}
def test(sc: SparkContext, sqlContext: SQLContext): Unit = {
import sqlContext.implicits._
val df = Seq(
("john", "tomato", 1.99),
("john", "carrot", 0.45),
("bill", "apple", 0.99),
("john", "banana", 1.29),
("bill", "taco", 2.59)
).toDF("name", "food", "price")
val result = df.withColumn("combined", zip($"food", $"price"))
result.show(false)
}
这种方法有效,但是我很担心,因为我正在查看数据帧的每一行,并且我有数百万这样的行,它在集群上的性能不会那么好
有没有其他方法可以做到(比如使用 spark-sql),可能不使用 udf ?
解决方案
我强烈建议使用类型安全的spark
Dataset
api 将您的数据行发送到 api。
这涉及使用该函数将您的Dataframe
行解析为一个,然后在您上执行该函数以将其发送到 api 并返回另一个代表您的.scala
case
class
as
map
Dataset\Dataframe
case class
Output
尽管严格不spark sql
使用Dataset
api 仍然可以让您从大多数可用的优化中受益spark sql
case class Input(name: String, food: String, price: Double)
case class Output(name: String, food: String, price: Double, combined: String)
val df = Seq(
("john", "tomato", 1.99),
("john", "carrot", 0.45),
("bill", "apple", 0.99),
("john", "banana", 1.29),
("bill", "taco", 2.59)
).toDF("name", "food", "price")
df.as[Input].map(input => {
val nvIn = new NameValue
nvIn.put("Query.ID", 1234)
nvIn.put("Food", input.food)
nvIn.put("Price", input.price)
val nvOut = new NameValue
getTunnelsClient().execute("CombineData", nvIn, nvOut)
Output(input.name, input.food, input.price, nvOut.get("CombineData"))
}).show(false)
推荐阅读
- c++ - C++:在头文件中生成常量数组实例
- gpu - 如何使用 Vulkan 有效地计算汉明距离
- swift - macOS 上的 Swift:使用 URL 作为参数打开 Chrome
- javascript - 如何使用 _.set 或等效方法从数组中删除元素
- python - 如何使用 selenium 获取 youtube 视频的标签?
- python - 调用函数时,它告诉我对象(列表)的位置而不是列表本身?
- javascript - 使用 findIndex 检查对象数组中存在的元素数组 - 打字稿
- php - 查找最接近值的数字不起作用(SQL)
- r - 使用 if 语句在 R 中定义函数
- java - [已解决]Android Studio - ListView 的显示顺序不正确并出现重复