groovy - 在 Groovy 中批处理请求?
问题描述
我是 Groovy 的新手,对如何批量处理请求有点迷茫,以便可以将它们作为批处理而不是单独提交到服务器,就像我目前所拥有的那样:
class Handler {
private String jobId
// [...]
void submit() {
// [...]
// client is a single instance of Client used by all Handlers
jobId = client.add(args)
}
}
class Client {
//...
String add(String args) {
response = postJson(args)
return parseIdFromJson(response)
}
}
就像现在一样,调用Client.add()
POST 到 REST API 并返回解析结果。
我遇到的问题是,该方法可能会连续快速调用数千次,收集所有传入的 toadd()
会更有效,等到调用停止进入的时刻,然后 POST 到该批次的 REST API 一次,一次发送所有参数。args
add()
add()
这可能吗?潜在地,add()
可以立即返回一个假 id,只要批处理发生,提交发生,客户端稍后可以知道假 id 和来自 REST API 的 ID 之间的查找(它将按照 args 对应的顺序返回 ID)发送给它)。
解决方案
正如评论中所提到的,这对于 gpars 来说可能是一个很好的案例,它在这些场景中表现出色。
这实际上不是关于 groovy,而是关于 java 和一般 jvm 中的异步编程。
如果您想坚持使用 java 并发习语,我将一个代码片段放在一起,您可以将其用作潜在的起点。这尚未经过测试,也未考虑边缘情况。我写这个是为了好玩,因为这是异步编程,我没有花适当的时间考虑它,我怀疑那里有足够大的洞可以驱动坦克通过。
话虽如此,这里有一些代码尝试对请求进行批处理:
import java.util.concurrent.*
import java.util.concurrent.locks.*
// test code
def client = new Client()
client.start()
def futureResponses = []
1000.times {
futureResponses << client.add(it as String)
}
client.stop()
futureResponses.each { futureResponse ->
// resolve future...will wait if the batch has not completed yet
def response = futureResponse.get()
println "received response with index ${response.responseIndex}"
}
// end of test code
class FutureResponse extends CompletableFuture<String> {
String args
}
class Client {
int minMillisLullToSubmitBatch = 100
int maxBatchSizeBeforeSubmit = 100
int millisBetweenChecks = 10
long lastAddTime = Long.MAX_VALUE
def batch = []
def lock = new ReentrantLock()
boolean running = true
def start() {
running = true
Thread.start {
while (running) {
checkForSubmission()
sleep millisBetweenChecks
}
}
}
def stop() {
running = false
checkForSubmission()
}
def withLock(Closure c) {
try {
lock.lock()
c.call()
} finally {
lock.unlock()
}
}
FutureResponse add(String args) {
def future = new FutureResponse(args: args)
withLock {
batch << future
lastAddTime = System.currentTimeMillis()
}
future
}
def checkForSubmission() {
withLock {
if (System.currentTimeMillis() - lastAddTime > minMillisLullToSubmitBatch ||
batch.size() > maxBatchSizeBeforeSubmit) {
submitBatch()
}
}
}
def submitBatch() {
// here you would need to put the combined args on a format
// suitable for the endpoint you are calling. In this
// example we are just creating a list containing the args
def combinedArgs = batch.collect { it.args }
// further there needs to be a way to map one specific set of
// args in the combined args to a specific response. If the
// endpoint responds with the same order as the args we submitted
// were in, then that can be used otherwise something else like
// an id in the response etc would need to be figured out. Here
// we just assume responses are returned in the order args were submitted
List<String> combinedResponses = postJson(combinedArgs)
combinedResponses.indexed().each { index, response ->
// here the FutureResponse gets a value, can be retrieved with
// futureResponse.get()
batch[index].complete(response)
}
// clear the batch
batch = []
}
// bogus method to fake post
def postJson(combinedArgs) {
println "posting json with batch size: ${combinedArgs.size()}"
combinedArgs.collect { [responseIndex: it] }
}
}
几点注意事项:
- 有些东西需要能够对暂时没有调用添加的事实做出反应。这意味着一个单独的监视线程,并且是 start 和 stop 方法管理的内容。
- 如果我们有无限的添加序列而没有暂停,您可能会耗尽资源。因此,代码有一个最大批处理大小,即使在添加调用中没有暂停,它也会提交批处理。
- 代码使用锁来确保(或尝试,如上所述,我在这里没有考虑所有潜在问题)我们在批量提交等期间保持线程安全
- 假设这里的总体思路是合理的,那么您就剩下实现
submitBatch
主要问题是处理将特定参数映射到特定响应的逻辑 CompletableFuture
是一个java 8类。这可以使用早期版本中的其他构造来解决,但我碰巧使用的是 java 8。- 我或多或少地在没有执行或测试的情况下写了这个,我敢肯定那里有一些错误。
- 从下面的打印输出中可以看出,“maxBatchSizeBeforeSubmit”设置更像是一个建议,而不是实际的最大值。由于监控线程休眠了一段时间,然后醒来检查我们的工作情况,调用 add 方法的线程可能已经在批处理中累积了任意数量的请求。我们所保证的是,每次
millisBetweenChecks
我们都会醒来并检查我们的工作情况,如果已达到提交批次的标准,则将提交该批次。
如果您不熟悉 java 期货和锁,我建议您阅读它们。
如果将上述代码保存在 groovy 脚本中code.groovy
并运行它:
~> groovy code.groovy
posting json with batch size: 153
posting json with batch size: 234
posting json with batch size: 243
posting json with batch size: 370
received response with index 0
received response with index 1
received response with index 2
...
received response with index 998
received response with index 999
~>
它应该可以工作并打印出从我们的虚假 json 提交中收到的“响应”。
推荐阅读
- php - 检查 JSON 是否有 Given Key
- python - Python-我的数据集包含日期时间列,它不允许我进行任何处理
- c# - 如何从存储过程中提取数据并动态创建 zip 文件夹并添加 csv 文件
- swift - Apple 应用内配置(将卡添加到钱包)
- flutter - 我可以创建 Get.find<>() 的单个实例吗?
- java - Flink 去重和 processWindowFunction
- datatable - 成功时数据表复制标题
- vue.js - Nuxt 手表不重定向
- javascript - 将 Loader 放入数据网格打字稿中
- python - 如何从 Scrapy Selector 中删除子元素?