首页 > 解决方案 > 多个线程从 ConcurrentLinkedQueue 读取,并进入 DB。并非所有条目都在写入

问题描述

我有一个方法,可以将大约 700 万个条目写入队列。我正在多个线程中逐个读取条目,并将其写入数据库。这是执行此操作的类的代码片段。同样,这是在多线程环境中执行的。

private class WriteRunner<T> implements Callable<Long> {

// member variables

public WriteRunner(ConcurrentLinkedQueue<T> elementsQueue,...)
     //constructor
}

@Override
public Long call() throws Exception {

  while (!elementsQueue.isEmpty()) {
    int failures = 0;
    T t = elementsQueue.poll();
    while (true) {
      try {
        // write to DB
        break;
      } catch (Exception e) {
        //Log error
        //Wait for 10 seconds and retry
        Thread.sleep(10000);
        if (++failures > 10) {
          throw new Exception(
              String.format("[TID: %d] Connection repeatedly failed.", threadId));
        }
      }
    }

  }

  //log more stuff
  return (long)total;
}
}

这是我调用该方法的方式

      for (int i = 0; i < N_THREADS; i++) {
           completionService.submit(
             new WriteRunner<T>(elementsQueue ,...));
       }
       long total = 0;
       for (int i = 0; i < N_THREADS; i++) {
            total += completionService.take().get();
       }

我错过了尽可能多的条目,因为有很多例外。我的想法是,重试与最新民意调查相同的“t”元素。

异常有什么问题?

标签: javamultithreadingqueuetry-catch

解决方案


推荐阅读