java - 基于异步队列的文件写入
问题描述
我正在用 Java 编写一个多线程程序,并有一个编写器线程在旁边运行。一旦线程处理了一大块数据,它们就会LinkedBlockingQueue
通过synchronized writeToFile
.writer
这个想法是,一旦队列达到一定大小,线程就会被阻止附加到队列,并且数据会输出到文件中。我正在处理大量数据(一次 20-50GB),这有助于减少使用的 RAM。(如果有更好的方法来做到这一点,我愿意接受建议!)
我遇到的问题是,尽管使方法同步,并通过块中的writeToFile
写入文件,线程仍在附加到队列中,而线程正在写入文件。emptyQueues
synchonrized
@Component("writer")
public class WriterImpl implements Writer {
private boolean isRunning;
private PrintWriter fastQWriter1, fastQWriter2;
private final Queue<FastQRecord> fastQQueue1 = new LinkedBlockingQueue<>();
private final Queue<FastQRecord> fastQQueue2 = new LinkedBlockingQueue<>();
private final int MAX_QUEUE_SIZE = 5000;
@Override
public void setOutputFiles(File fastQ1, File fastQ2) {
try{
fastQWriter1 = new PrintWriter(new FileOutputStream(fastQ1));
fastQWriter2 = new PrintWriter(new FileOutputStream(fastQ2));
}catch (IOException ioe){
System.out.println(ioe.getMessage());
}
}
@Override
public synchronized void writeToFile(FastQRecord one, FastQRecord two) {
fastQQueue1.add(one);
fastQQueue2.add(two);
}
@Override
public void close() {
isRunning = false;
emptyQueues();
fastQWriter1.flush();
fastQWriter1.close();
fastQWriter2.flush();
fastQWriter2.close();
}
@Override
public void run() {
isRunning = true;
while(isRunning){
//do stuff
if(fastQQueue1.size() > MAX_QUEUE_SIZE){ //empty queues - 5000 record pairs at a time
synchronized (fastQQueue1){
synchronized (fastQQueue2){
emptyQueues();
}
}
}
}
}
private void emptyQueues() {
while(fastQQueue1.size() > 0){
FastQRecord one = fastQQueue1.poll();
fastQWriter1.println(one.getId());
fastQWriter1.println(one.getRawSequence());
fastQWriter1.println(one.getPlus());
fastQWriter1.println(one.getQualityString());
}
while(fastQQueue2.size() > 0){
FastQRecord two = fastQQueue2.poll();
fastQWriter2.println(two.getId());
fastQWriter2.println(two.getRawSequence());
fastQWriter2.println(two.getPlus());
fastQWriter2.println(two.getQualityString());
}
}
}
这FastQRecord
只是一个简单的 POJO,它包含我需要写入文件的数据:
public class FastQRecord {
private String id;
private String rawSequence;
private char plus;
private String qualityString;
public FastQRecord(String id, String rawSequence, char plus, String qualityString) {
this.id = id;
this.rawSequence = rawSequence;
this.plus = plus;
this.qualityString = qualityString;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getRawSequence() {
return rawSequence;
}
public void setRawSequence(String rawSequence) {
this.rawSequence = rawSequence;
}
public char getPlus() {
return plus;
}
public void setPlus(char plus) {
this.plus = plus;
}
public String getQualityString() {
return qualityString;
}
public void setQualityString(String qualityString) {
this.qualityString = qualityString;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FastQRecord that = (FastQRecord) o;
return id.equals(that.id);
}
@Override
public int hashCode() {
return id.hashCode();
}
@Override
public String toString() {
return "FastQRecord{" +
"id=" + id + '\n' +
", rawSequence=" + rawSequence + '\n' +
", plus=" + plus + '\n' +
", qualityString=" + qualityString + '\n' +
'}';
}
}
解决方案
您可以BlockingQueue
通过使用put()
方法而不是add()
继承自Collection
.
但是为了让线程等待put()
操作,您的队列必须知道它的最大大小,将其声明为LinkedBlockingQueue<>(MAX_QUEUE_SIZE)
. 如果您不指定队列的最大容量,则会假定它是Integer.MAX_VALUE
我还建议您在检查队列大小(或者是否已满)之前同步对队列的访问,并且您的run()
方法看起来像这样:
@Override
public void run() {
isRunning = true;
while(isRunning){
//do stuff
synchronized(fastQQueue1){
if(fastQQueue1.remainingCapacity() == 0){ //empty queues - 5000 record pairs at a time
synchronized (fastQQueue1){
synchronized (fastQQueue2){
emptyQueues();
}
}
}
}
}
}
类似的更改将应用于您的emptyQueues()
方法。
推荐阅读
- tensorflow - Pure TensorFlow vs Keras // Tensorflow 解决了问题,但 Keras 没有 - 何时使用 Keras,何时使用 TensorFlow?
- r - 如何只选择包含 [YYYY-MM-DD] 的行?
- javascript - PHP 没有向 XHR 发送完整的响应?
- sql - SQL逗号分隔符列表用单个qoutes替换每个值
- java - 发送 ArrayList
作为使用 Retrofit Android 的一部分 - node.js - GraphQL 仅在查询中请求某个字段时才获取
- angular - Mat 复选框未选中 On Edit time
- php - 使用 SwiftMailer 在 Laravel 中发送动态电子邮件
- javascript - 两个 Twitter API 请求格式相同 - 一个帖子但一个给出 401
- javascript - Vue.js
点击 不工作,但是, $event 工作,为什么?