首页 > 解决方案 > 在 Groovy 中批处理请求?

问题描述

我是 Groovy 的新手,对如何批量处理请求有点迷茫,以便可以将它们作为批处理而不是单独提交到服务器,就像我目前所拥有的那样:

class Handler {
    private String jobId
    // [...]
    void submit() {
        // [...]
        // client is a single instance of Client used by all Handlers
        jobId = client.add(args)
    }
}

class Client {
    //...
    String add(String args) {
        response = postJson(args)
        return parseIdFromJson(response)
    }
}

就像现在一样,调用Client.add()POST 到 REST API 并返回解析结果。

我遇到的问题是,该方法可能会连续快速调用数千次,收集所有传入的 toadd()会更有效,等到调用停止进入的时刻,然后 POST 到该批次的 REST API 一次,一次发送所有参数。argsadd()add()

这可能吗?潜在地,add()可以立即返回一个假 id,只要批处理发生,提交发生,客户端稍后可以知道假 id 和来自 REST API 的 ID 之间的查找(它将按照 args 对应的顺序返回 ID)发送给它)。

标签: groovyconcurrency

解决方案


正如评论中所提到的,这对于 gpars 来说可能是一个很好的案例,它在这些场景中表现出色。

这实际上不是关于 groovy,而是关于 java 和一般 jvm 中的异步编程。

如果您想坚持使用 java 并发习语,我将一个代码片段放在一起,您可以将其用作潜在的起点。这尚未经过测试,也未考虑边缘情况。我写这个是为了好玩,因为这是异步编程,我没有花适当的时间考虑它,我怀疑那里有足够大的洞可以驱动坦克通过。

话虽如此,这里有一些代码尝试对请求进行批处理:

import java.util.concurrent.* 
import java.util.concurrent.locks.* 

// test code 
def client = new Client()

client.start()
def futureResponses = []
1000.times { 
  futureResponses << client.add(it as String)
}
client.stop()

futureResponses.each { futureResponse ->
  // resolve future...will wait if the batch has not completed yet
  def response = futureResponse.get()
  println "received response with index ${response.responseIndex}"
}
// end of test code 

class FutureResponse extends CompletableFuture<String> {
  String args
}

class Client {
  int minMillisLullToSubmitBatch = 100
  int maxBatchSizeBeforeSubmit   = 100
  int millisBetweenChecks        = 10
  long lastAddTime               = Long.MAX_VALUE

  def batch = []
  def lock = new ReentrantLock()
  boolean running = true

  def start() {
    running = true
    Thread.start { 
      while (running) {
        checkForSubmission()
        sleep millisBetweenChecks
      }
    }
  }

  def stop() {
    running = false
    checkForSubmission()
  }

  def withLock(Closure c) {
    try { 
      lock.lock()
      c.call()
    } finally { 
      lock.unlock()
    }    
  }

  FutureResponse add(String args) {
    def future = new FutureResponse(args: args)

    withLock { 
      batch << future
      lastAddTime = System.currentTimeMillis()
    }

    future
  }

  def checkForSubmission() {
    withLock { 
      if (System.currentTimeMillis() - lastAddTime > minMillisLullToSubmitBatch ||
          batch.size() > maxBatchSizeBeforeSubmit) { 
        submitBatch()
      }
    }
  }

  def submitBatch() {
    // here you would need to put the combined args on a format 
    // suitable for the endpoint you are calling. In this 
    // example we are just creating a list containing the args
    def combinedArgs = batch.collect { it.args }

    // further there needs to be a way to map one specific set of 
    // args in the combined args to a specific response. If the 
    // endpoint responds with the same order as the args we submitted
    // were in, then that can be used otherwise something else like 
    // an id in the response etc would need to be figured out. Here 
    // we just assume responses are returned in the order args were submitted
    List<String> combinedResponses = postJson(combinedArgs)
    combinedResponses.indexed().each { index, response -> 
      // here the FutureResponse gets a value, can be retrieved with 
      // futureResponse.get()
      batch[index].complete(response)
    }

    // clear the batch
    batch = []
  }

  // bogus method to fake post
  def postJson(combinedArgs) {
    println "posting json with batch size: ${combinedArgs.size()}"
    combinedArgs.collect { [responseIndex: it] }
  }
}

几点注意事项:

  • 有些东西需要能够对暂时没有调用添加的事实做出反应。这意味着一个单独的监视线程,并且是 start 和 stop 方法管理的内容。
  • 如果我们有无限的添加序列而没有暂停,您可能会耗尽资源。因此,代码有一个最大批处理大小,即使在添加调用中没有暂停,它也会提交批处理。
  • 代码使用锁来确保(或尝试,如上所述,我在这里没有考虑所有潜在问题)我们在批量提交等期间保持线程安全
  • 假设这里的总体思路是合理的,那么您就剩下实现submitBatch主要问题是处理将特定参数映射到特定响应的逻辑
  • CompletableFuture是一个java 8类。这可以使用早期版本中的其他构造来解决,但我碰巧使用的是 java 8。
  • 我或多或少地在没有执行或测试的情况下写了这个,我敢肯定那里有一些错误。
  • 从下面的打印输出中可以看出,“maxBatchSizeBeforeSubmit”设置更像是一个建议,而不是实际的最大值。由于监控线程休眠了一段时间,然后醒来检查我们的工作情况,调用 add 方法的线程可能已经在批处理中累积了任意数量的请求。我们所保证的是,每次millisBetweenChecks我们都会醒来并检查我们的工作情况,如果已达到提交批次的标准,则将提交该批次。

如果您不熟悉 java 期货和锁,我建议您阅读它们。

如果将上述代码保存在 groovy 脚本中code.groovy并运行它:

~> groovy code.groovy
posting json with batch size: 153
posting json with batch size: 234
posting json with batch size: 243
posting json with batch size: 370
received response with index 0
received response with index 1
received response with index 2
...
received response with index 998
received response with index 999

~> 

它应该可以工作并打印出从我们的虚假 json 提交中收到的“响应”。


推荐阅读