scala - flink 增加异步操作的并行度
问题描述
我们有 AsyncFunction 异步操作是使用akka http 客户端完成的
class Foo[A,B] extends AsyncFunction[A, B] with {
val akkaConfig = ConfigFactory.load()
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
implicit lazy val system = ActorSystem("MyActorSystem", akkaConfig)
implicit lazy val materializer = ActorMaterializer()
def postReq(uriStr: String, str: String): Future[HttpResponse] = {
Http().singleRequest(HttpRequest(
method = HttpMethods.POST,
uri = uriStr,
entity = HttpEntity(ContentTypes.`application/json`, str))
)
}
override def asyncInvoke(input: A, resultFuture: ResultFuture[B]) : Unit = {
val resultFutureRequested: Future[HttpResponse] = postReq(...)
//the rest of the class ...
问题 :
- 如果我想增加 http 请求的并行性 - 我应该使用 akka 配置还是有办法通过 flink.yamel 配置它
- 由于 Flink 也使用 akka,这是创建
ActorSystem
和的正确方法ExecutionContext
吗?
解决方案
至于第一个问题,您有三个不同的设置会影响性能和执行的实际请求数:
- 并行性,这将导致 Flink 创建多个 Your 实例,
AsyncFunction
包括多个 Your 实例HttpClient
。 - 函数本身的并发请求数。当你调用
orderedWait
或者unorderedWait
你应该capacity
在函数中提供,这将限制并发请求的数量。 - Your Http 客户端的实际设置。
如您所见,第 2. 点和第 3. 点是相连的,因为 Flink 可以限制可能的并发请求数,所以有时更改 Your Http Client 设置可能没有效果,因为请求数受 Flink 限制自己。
增加 Your 的吞吐量AsyncFunction
取决于具体情况。您需要记住这AsyncFunction
是单线程中的调用。这基本上意味着如果您调用的服务的响应时间很大,您将简单地阻止等待响应的请求数量,因此唯一的方法是增加parallelism'
. 但是,通常,更改函数的HttpClient
和 的设置capacity
应该可以让您获得更好的吞吐量。
至于第二个问题,我认为创建多个ActorSystems
. 您可以在 [此处] 看到类似问题的回答。1
推荐阅读
- python - 如何按频率和字母顺序对列表进行排序?
- sql - Laravel TNTsearch 大型 SQL 数据库表的自定义索引创建和使用
- toad - 在 Toad Scheme 浏览器中看不到表/视图
- ruby-on-rails - 在 ec2 上部署 2 个 rails 应用程序
- linux - 如何在 Tmux 中打开每个新窗口时显示消息?
- primeng - p-triStateCheckbox 在 Angular6 中不起作用
- c# - c# 7.x 使用元组类型的短名称
- python - 重命名项目及其目录后,我需要做什么才能让 Django 单元测试在 PyCharm 中再次运行?
- java - Android 驻留菜单项禁用或隐藏?
- python - python []中的元素是什么