首页 > 解决方案 > 将 Spark Dataframe 中的多个列发送到外部 API 并将结果存储在单独的列中

问题描述

我有一个包含 40 多列的 spark 数据框。和数百万行。我想创建另一列,它从上述数据框中接收 5 列,将 5 列中的每一行传递给单独的 Api(它采用这 5 个值并返回一些数据)并将结果存储在列中。

为简单起见,我使用以下示例:假设我有以下数据框。我想将每一行“食物”和“价格”发送到一个 API,该 API 返回一个结果,并将其存储在一个名为“组合”的单独列中

输入:

+----+------+-----+
|name|food  |price|
+----+------+-----+
|john|tomato|1.99 |
|john|carrot|0.45 |
|bill|apple |0.99 |
|john|banana|1.29 |
|bill|taco  |2.59 |
+----+------+-----+

输出:

+----+------+-----+----------+
|name|food  |price|combined  |
+----+------+-----+----------+
|john|tomato|1.99 |abcd      |
|john|carrot|0.45 |fdg       |
|bill|apple |0.99 |123fgfg   |
|john|banana|1.29 |fgfg4wf   |
|bill|taco  |2.59 |gfg45gn   |
+----+------+-----+----------+

我创建了一个 UDF 来查看每一行:

val zip = udf {
(food: String, price: Double) =>
    val nvIn = new NameValue
    nvIn.put("Query.ID", 1234)
    nvIn.put("Food", food)
    nvIn.put("Price", price)
    val nvOut = new NameValue

    val code: Code = getTunnelsClient().execute("CombineData", nvIn, nvOut) // this is calling the external API
    nvOut.get("CombineData")     //this is stored the result column
  }

  def test(sc: SparkContext, sqlContext: SQLContext): Unit = {
    import sqlContext.implicits._
    val df = Seq(
      ("john", "tomato", 1.99),
      ("john", "carrot", 0.45),
      ("bill", "apple", 0.99),
      ("john", "banana", 1.29),
      ("bill", "taco", 2.59)
    ).toDF("name", "food", "price")


    val result = df.withColumn("combined", zip($"food", $"price"))
    result.show(false)

  }

这种方法有效,但是我很担心,因为我正在查看数据帧的每一行,并且我有数百万这样的行,它在集群上的性能不会那么好

有没有其他方法可以做到(比如使用 spark-sql),可能不使用 udf ?

标签: scalaapache-sparkapache-spark-sqluser-defined-functions

解决方案


我强烈建议使用类型安全的spark Datasetapi 将您的数据行发送到 api。

这涉及使用该函数将您的Dataframe行解析为一个,然后在您上执行该函数以将其发送到 api 并返回另一个代表您的.scala case classasmapDataset\Dataframecase classOutput

尽管严格不spark sql使用Datasetapi 仍然可以让您从大多数可用的优化中受益spark sql

case class Input(name: String, food: String, price: Double)
case class Output(name: String, food: String, price: Double, combined: String)

val df = Seq(
  ("john", "tomato", 1.99),
  ("john", "carrot", 0.45),
  ("bill", "apple", 0.99),
  ("john", "banana", 1.29),
  ("bill", "taco", 2.59)
).toDF("name", "food", "price")

df.as[Input].map(input => {
    val nvIn = new NameValue
    nvIn.put("Query.ID", 1234)
    nvIn.put("Food", input.food)
    nvIn.put("Price", input.price)
    val nvOut = new NameValue
    getTunnelsClient().execute("CombineData", nvIn, nvOut)
    Output(input.name, input.food, input.price, nvOut.get("CombineData"))
}).show(false)

推荐阅读