首页 > 解决方案 > 如何使用 SparkContext.submitJob 调用 REST API

问题描述

有人可以提供 submitJob 方法调用的示例吗

在此处找到参考:如何从 map/filter/etc 执行异步操作(即返回 Future)?

我相信我可以为我的用例实现它

在我当前的实现中,我使用分区来调用并行调用,但它们在调用下一个调用之前正在等待响应

Dataframe.rdd.reparition(TPS allowed on API)
.map(row => {
            val response = callApi(row)
            parse(response)
    })

但由于 API 端存在延迟,我在解析之前等待 10 秒的响应,然后进行下一次调用。我有 100 TPS 但目前的逻辑我只看到 4-7 TPS

如果有人使用SparkContext.submitJob进行异步调用,请提供一个示例,因为我是新的 spark 和 scala

我想在不等待响应的情况下调用调用,确保 100 TPS,然后一旦我收到响应,我想解析并在其上创建 Dataframe。

我之前曾尝试从主节点收集行并调用 API 调用,似乎受限于创建大型线程池的硬件

submitJob[T, U, R](rdd: RDD[T], processPartition: (Iterator[T]) ⇒ U, partitions: Seq[Int], resultHandler: (Int, U)⇒ Unit, resultFunc: ⇒ R): SimpleFutureAction[R]

Rdd - rdd 从我的数据框中

分区 - 我的 rdd 已经分区,我是否在我的 rdd 中提供范围 0 到 No.of.partitions ?

processPartition - 是我的 callApi() 吗?

resultHandler - 不确定这里要做什么

resultFunc - 我相信这会解析我的回复

如何在 SimpleFutureAction 之后创建数据框

有人可以帮忙吗

标签: scalarestapiapache-sparkasynchronous

解决方案


submitJob不会使您的 API 调用自动更快。它是 Spark 并行处理的低级实现的一部分——Spark 将操作拆分为作业,然后将它们提交给任何现有的集群调度程序。调用submitJob就像启动一个 Java 线程 - 作业将异步运行,但不会比您简单地在数据帧/RDD 上调用操作快。

恕我直言,您最好的选择是使用mapPartitions它允许您在每个分区的上下文中运行一个函数。您已经对数据进行了分区,因此为了确保最大的并发性,只需确保您有足够的 Spark 执行程序来实际让这些分区同时运行:

df.rdd.repartition(#concurrent API calls)
  .mapPartitions(partition => {
    partition.map(row => {
      val response = callApi(row)
      parse(response)
    })
  })
  .toDF("col1", "col2", ...)

mapPartitionsIterator[T]需要一个将(单个分区中的所有数据)映射到Iterator[U](转换后的分区)并返回的函数RDD[U]。转换回数据框是将调用链接到toDF()适当的列名的问题。

您可能希望实现某种每线程速率限制,callApi以确保没有单个执行程序每秒触发大量请求。请记住,执行程序可能在单独的线程和/或单独的 JVM 中运行。

当然,只是调用mapPartitions没有任何作用。您需要在生成的数据帧上触发一个操作,才能真正触发 API 调用。


推荐阅读