首页 > 解决方案 > 基于异步队列的文件写入

问题描述

我正在用 Java 编写一个多线程程序,并有一个编写器线程在旁边运行。一旦线程处理了一大块数据,它们就会LinkedBlockingQueue通过synchronized writeToFile.writer

这个想法是,一旦队列达到一定大小,线程就会被阻止附加到队列,并且数据会输出到文件中。我正在处理大量数据(一次 20-50GB),这有助于减少使用的 RAM。(如果有更好的方法来做到这一点,我愿意接受建议!)

我遇到的问题是,尽管使方法同步,并通过块中的writeToFile写入文件,线程仍在附加到队列中,而线程正在写入文件。emptyQueuessynchonrized

@Component("writer")
public class WriterImpl implements Writer {

private boolean isRunning;
private PrintWriter fastQWriter1, fastQWriter2;
private final Queue<FastQRecord> fastQQueue1 = new LinkedBlockingQueue<>();
private final Queue<FastQRecord> fastQQueue2 = new LinkedBlockingQueue<>();
private final int MAX_QUEUE_SIZE = 5000;

@Override
public void setOutputFiles(File fastQ1, File fastQ2) {
    try{
        fastQWriter1 = new PrintWriter(new FileOutputStream(fastQ1));
        fastQWriter2 = new PrintWriter(new FileOutputStream(fastQ2));
    }catch (IOException ioe){
        System.out.println(ioe.getMessage());
    }
}

@Override
public synchronized void writeToFile(FastQRecord one, FastQRecord two) {
    fastQQueue1.add(one);
    fastQQueue2.add(two);
}

@Override
public void close() {
    isRunning = false;

    emptyQueues();

    fastQWriter1.flush();
    fastQWriter1.close();
    fastQWriter2.flush();
    fastQWriter2.close();
}

@Override
public void run() {
    isRunning = true;

    while(isRunning){
        //do stuff
        if(fastQQueue1.size() > MAX_QUEUE_SIZE){ //empty queues - 5000 record pairs at a time

            synchronized (fastQQueue1){
                synchronized (fastQQueue2){
                    emptyQueues();
                }
            }
        }
    }
}

private void emptyQueues() {
    while(fastQQueue1.size() > 0){
        FastQRecord one = fastQQueue1.poll();

        fastQWriter1.println(one.getId());
        fastQWriter1.println(one.getRawSequence());
        fastQWriter1.println(one.getPlus());
        fastQWriter1.println(one.getQualityString());
    }

    while(fastQQueue2.size() > 0){

        FastQRecord two = fastQQueue2.poll();
        fastQWriter2.println(two.getId());
        fastQWriter2.println(two.getRawSequence());
        fastQWriter2.println(two.getPlus());
        fastQWriter2.println(two.getQualityString());

    }
}
}  

FastQRecord只是一个简单的 POJO,它包含我需要写入文件的数据:

public class FastQRecord {

private String id;
private String rawSequence;
private char plus;
private String qualityString;

public FastQRecord(String id, String rawSequence, char plus, String qualityString) {
    this.id = id;
    this.rawSequence = rawSequence;
    this.plus = plus;
    this.qualityString = qualityString;
}

public String getId() {
    return id;
}

public void setId(String id) {
    this.id = id;
}

public String getRawSequence() {
    return rawSequence;
}

public void setRawSequence(String rawSequence) {
    this.rawSequence = rawSequence;
}

public char getPlus() {
    return plus;
}

public void setPlus(char plus) {
    this.plus = plus;
}

public String getQualityString() {
    return qualityString;
}

public void setQualityString(String qualityString) {
    this.qualityString = qualityString;
}

@Override
public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;

    FastQRecord that = (FastQRecord) o;

    return id.equals(that.id);
}

@Override
public int hashCode() {
    return id.hashCode();
}

@Override
public String toString() {
    return "FastQRecord{" +
            "id=" + id + '\n' +
            ", rawSequence=" + rawSequence + '\n' +
            ", plus=" + plus + '\n' +
            ", qualityString=" + qualityString + '\n' +
            '}';
}
}

标签: javamultithreadingio

解决方案


您可以BlockingQueue通过使用put()方法而不是add()继承自Collection.

但是为了让线程等待put()操作,您的队列必须知道它的最大大小,将其声明为LinkedBlockingQueue<>(MAX_QUEUE_SIZE). 如果您不指定队列的最大容量,则会假定它是Integer.MAX_VALUE

我还建议您在检查队列大小(或者是否已满)之前同步对队列的访问,并且您的run()方法看起来像这样:

@Override
public void run() {
    isRunning = true;

    while(isRunning){
        //do stuff
        synchronized(fastQQueue1){
            if(fastQQueue1.remainingCapacity() == 0){ //empty queues - 5000 record pairs at a time

                synchronized (fastQQueue1){
                    synchronized (fastQQueue2){
                        emptyQueues();
                    }
                }
            }
        }
    }
}

类似的更改将应用​​于您的emptyQueues()方法。


推荐阅读