scala - 如何使用 SparkContext.submitJob 调用 REST API
问题描述
有人可以提供 submitJob 方法调用的示例吗
在此处找到参考:如何从 map/filter/etc 执行异步操作(即返回 Future)?
我相信我可以为我的用例实现它
在我当前的实现中,我使用分区来调用并行调用,但它们在调用下一个调用之前正在等待响应
Dataframe.rdd.reparition(TPS allowed on API)
.map(row => {
val response = callApi(row)
parse(response)
})
但由于 API 端存在延迟,我在解析之前等待 10 秒的响应,然后进行下一次调用。我有 100 TPS 但目前的逻辑我只看到 4-7 TPS
如果有人使用SparkContext.submitJob进行异步调用,请提供一个示例,因为我是新的 spark 和 scala
我想在不等待响应的情况下调用调用,确保 100 TPS,然后一旦我收到响应,我想解析并在其上创建 Dataframe。
我之前曾尝试从主节点收集行并调用 API 调用,似乎受限于创建大型线程池的硬件
submitJob[T, U, R](rdd: RDD[T], processPartition: (Iterator[T]) ⇒ U, partitions: Seq[Int], resultHandler: (Int, U)⇒ Unit, resultFunc: ⇒ R): SimpleFutureAction[R]
Rdd - rdd 从我的数据框中
分区 - 我的 rdd 已经分区,我是否在我的 rdd 中提供范围 0 到 No.of.partitions ?
processPartition - 是我的 callApi() 吗?
resultHandler - 不确定这里要做什么
resultFunc - 我相信这会解析我的回复
如何在 SimpleFutureAction 之后创建数据框
有人可以帮忙吗
解决方案
submitJob
不会使您的 API 调用自动更快。它是 Spark 并行处理的低级实现的一部分——Spark 将操作拆分为作业,然后将它们提交给任何现有的集群调度程序。调用submitJob
就像启动一个 Java 线程 - 作业将异步运行,但不会比您简单地在数据帧/RDD 上调用操作快。
恕我直言,您最好的选择是使用mapPartitions
它允许您在每个分区的上下文中运行一个函数。您已经对数据进行了分区,因此为了确保最大的并发性,只需确保您有足够的 Spark 执行程序来实际让这些分区同时运行:
df.rdd.repartition(#concurrent API calls)
.mapPartitions(partition => {
partition.map(row => {
val response = callApi(row)
parse(response)
})
})
.toDF("col1", "col2", ...)
mapPartitions
Iterator[T]
需要一个将(单个分区中的所有数据)映射到Iterator[U]
(转换后的分区)并返回的函数RDD[U]
。转换回数据框是将调用链接到toDF()
适当的列名的问题。
您可能希望实现某种每线程速率限制,callApi
以确保没有单个执行程序每秒触发大量请求。请记住,执行程序可能在单独的线程和/或单独的 JVM 中运行。
当然,只是调用mapPartitions
没有任何作用。您需要在生成的数据帧上触发一个操作,才能真正触发 API 调用。
推荐阅读
- google-dfp - Google DFP ad slot sizes and stats mismatch
- python - 从数据中提取最多提及的内容
- javascript - 从 JavaScript 函数返回的 NaN 结果,其中预期为“未定义”
- jenkins - 为什么 Jenkins 多分支管道中的工作会被禁用
- sql-server - GROUP BY 除了一列有优先权
- java - Java中OutputStream中字符串中的引号问题
- wix - 如何从 Wix 包中卸载 inno setup 应用程序
- python - 如何在 Python 中生成相关随机数?
- database - 是否有将 API 密钥和秘密保存到数据库中的最佳实践?
- java - 如何在androidstudio中居中菜单项