首页 > 解决方案 > groovy 中如何使用线程迭代 40 万条记录

问题描述

// this query returns 0.45 million records and stored in the list.
List<Employee> empList=result.getQuery(query);

迭代员工列表并设置属性,最后调用服务方法保存员工对象。由于记录量大,使用顺序处理方法需要花费大量时间,所以我想使用线程。我是 groovy 的新手,只实现了简单的示例。

如何使用 groovy 将线程用于以下逻辑?

for (Employee employee : empList) {
    employee.setQuantity(8);
    employeeService.save(employee);
}

标签: javagroovy

解决方案


有一些框架可以做到这一点(想到 gpars),而且 java executors 框架是比直接线程更好的抽象,但是如果我们想让事情保持原始状态,您可以将您的列表分批拆分并运行每个批次通过使用类似的东西一个单独的线程:

def employeeService = new EmployeeService()

def empList   = (1..400000).collect { new Employee() }
def batchSize = 10000

def workerThreads = empList.collate(batchSize).withIndex().collect { List<Employee> batch, int index ->
  Thread.start("worker-thread-${index}") { 
    println "worker ${index} starting"
    batch.each { Employee e -> 
      e.quantity = 8
      employeeService.save(e)
    }
    println "worker ${index} completed"
  }
}

println "main thread waiting for workers to finish"
workerThreads*.join()
println "workers finished, exiting..."

class Employee { 
  int quantity
}

class EmployeeService { 
  def save(Employee e) {
    Thread.sleep(1)
  }
}

它在运行时打印:

─➤ groovy solution.groovy
worker 7 starting
worker 11 starting
worker 5 starting
worker 13 starting
worker 17 starting
worker 16 starting
worker 2 starting
worker 18 starting
worker 6 starting
worker 15 starting
worker 12 starting
worker 14 starting
worker 1 starting
worker 4 starting
worker 10 starting
worker 8 starting
worker 9 starting
worker 3 starting
worker 0 starting
worker 20 starting
worker 21 starting
worker 19 starting
worker 22 starting
worker 24 starting
worker 23 starting
worker 25 starting
worker 26 starting
worker 27 starting
worker 28 starting
worker 29 starting
worker 30 starting
worker 31 starting
worker 32 starting
worker 33 starting
worker 34 starting
worker 35 starting
worker 36 starting
worker 37 starting
worker 38 starting
worker 39 starting
main thread waiting for workers to finish
worker 0 completed
worker 16 completed
worker 20 completed
worker 1 completed
worker 3 completed
worker 14 completed
worker 7 completed
worker 12 completed
worker 24 completed
worker 10 completed
worker 6 completed
worker 19 completed
worker 33 completed
worker 27 completed
worker 28 completed
worker 35 completed
worker 17 completed
worker 25 completed
worker 38 completed
worker 4 completed
worker 8 completed
worker 13 completed
worker 9 completed
worker 39 completed
worker 15 completed
worker 36 completed
worker 37 completed
worker 18 completed
worker 30 completed
worker 23 completed
worker 11 completed
worker 32 completed
worker 2 completed
worker 29 completed
worker 26 completed
worker 5 completed
worker 22 completed
worker 31 completed
worker 21 completed
worker 34 completed
workers finished, exiting...

List.collate将员工列表拆分为List<Employee>大小为 的块 ( ) batchSizewithIndex就在那里,以便每个批次也获得一个索引(即只是一个数字 0、1、2、3...),用于调试和跟踪。

当我们开始线程时,我们需要等待它们完成,workerThreads*.join()本质上是做同样的事情:

workerThreds.each { t -> t.join() }

但是使用更简洁的语法并且Thread.join()是等待线程完成的java构造。


推荐阅读