首页 > 解决方案 > 我可以从blockingQueue 中获取一个元素并将该元素添加到地图中吗?

问题描述

我想知道有没有办法将 take() 函数用于阻塞队列并添加我正在使用的每个元素并将其添加到地图中?还是在添加到队列时必须添加到地图?

我正在做的是将输入文本解析为 ngram,然后将它们添加到阻塞队列中,一旦阻塞队列具有所有元素,我想take()从队列中将它们添加到我在另一个类中的地图中。


import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.BlockingQueue;

public class FileParser implements Runnable{

    private BlockingQueue<Task> queue;
    private String fileName;
    private int k;
    private Database dbFile = null;

    public FileParser(BlockingQueue<Task> queue,String fileName, int k) {
        this.queue=queue;
        this.fileName = fileName;
        this.k = k;
    }


    public void setDbFile(Database dbFile) {
        this.dbFile = dbFile;
    }


    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            file();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    public void file() throws IOException, InterruptedException {
        BufferedReader br = null;

        try {
            br = new BufferedReader(new InputStreamReader(new FileInputStream(fileName)));
            String line;
            while ((line = br.readLine()) != null) {
                nGram(line);
            }
        } catch (FileNotFoundException e) {
            System.out.println("File not found");
        }
    }

    private void nGram(String j) throws IOException, InterruptedException {
        for (int i = 0; i <= j.length() - k; i++) {
            CharSequence ngrams = j.substring(i, i + k);
            Task t=new Task(ngrams);
            queue.put(t);
            System.out.println("Adding "+t);
        }
    }

}

//Responsible for removing from the queue

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    private BlockingQueue<Task> queue;
    private volatile boolean keepRunning = true;
    private Database dbFile = null;

    public Consumer(BlockingQueue<Task> queue) {
        super();
        this.queue = queue;
    }

    @Override
    public void run() {
        while (keepRunning) {
            try {
                Task t = (queue.take());
                if (t instanceof Poison) {
                    keepRunning = false;
                    System.out.println("Queue poisoned->" + t);
                }

                System.out.println("Removing " + t);
//Error here:
                //dbFile.addFile(t);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
** Output **
Adding  N-gram [ ye]
Removing  N-gram [ He]
Adding  N-gram [ eh]
Adding  N-gram [ hi]
Exception in thread "Thread-2" Adding  N-gram [ ih]
Adding  N-gram [ hi]
Adding  N-gram [ ay]
java.lang.NullPointerException
    at ie.gmit.sw.Consumer.run(Consumer.java:27)
    at java.lang.Thread.run(Thread.java:748)

标签: javablockingqueue

解决方案


推荐阅读