首页 > 解决方案 > 如何退出 C++ 阻塞队列?

问题描述

在阅读了一些其他文章之后,我知道我可以像这样实现一个 c++ 阻塞队列:

template<typename T>
class BlockingQueue {
public:
    std::mutex mtx;
    std::condition_variable not_full;
    std::condition_variable not_empty;
    std::queue<T> queue;
    size_t capacity{5};

    BlockingQueue()=default;
    BlockingQueue(int cap):capacity(cap) {}
    BlockingQueue(const BlockingQueue&)=delete;
    BlockingQueue& operator=(const BlockingQueue&)=delete;

    void push(const T& data) {
        std::unique_lock<std::mutex> lock(mtx);
        while (queue.size() >= capacity) {
            not_full.wait(lock, [&]{return queue.size() < capacity;});
        }
        queue.push(data);
        not_empty.notify_all();
    }

    T pop() {
        std::unique_lock<std::mutex> lock(mtx);
        while (queue.empty()) {
            not_empty.wait(lock, [&]{return !queue.empty();});
        }
        T res = queue.front();
        queue.pop();
        not_full.notify_all();
        return res;
    }

    bool empty() {
        std::unique_lock<std::mutex> lock(mtx);
        return queue.empty();
    }

    size_t size() {
        std::unique_lock<std::mutex> lock(mtx);
        return queue.size();
    }

    void set_capacity(const size_t capacity) {
        this->capacity = (capacity > 0 ? capacity : 10);
    }
};

这对我有用,但我不知道如果我在后台线程中启动它怎么能关闭它:

void main() {
    BlockingQueue<float> q;
    bool stop{false};
    auto fun = [&] {
        std::cout << "before entering loop\n";
        while (!stop) {
            q.push(1);
        }
        std::cout << "after entering loop\n";
    };
    std::thread t_bg(fun);
    t_bg.detach();
    // Some other tasks here
    stop = true;
    // How could I shut it down before quit here, or could I simply let the operation system do that when the whole program is over?
}

问题是当我想关闭后台线程时,后台线程可能一直在休眠,因为队列已满,推送操作被阻塞。当我希望后台线程停止时,我怎么能停止它?

标签: c++multithreadingqueue

解决方案


一种简单的方法是添加一个从外部设置的标志,当您想要中止pop()已被阻止的操作时。然后你必须决定 abortedpop()将返回什么。一种方法是让它抛出异常,另一种方法是返回一个std::optional<T>. 这是第一种方法(我只会写改变的部分。)

在您认为合适的地方添加此类型:

struct AbortedPopException {};

将此添加到您的课程字段中:

mutable std::atomic<bool> abort_flag = false;

还要添加这个方法:

void abort () const {
    abort_flag = true;
}

像这样更改方法while中的循环pop():(您根本不需要while,因为我相信wait()接受 lambda 的条件变量方法不会虚假地唤醒/返回;即循环已经在等待中。)

    not_empty.wait(lock, [this]{return !queue.empty() || abort_flag;});

    if (abort_flag)
        throw AbortedPopException{};

就是这样(我相信。)

在您的main()中,当您想关闭“消费者”时,您可以调用abort()您的队列。但是你也必须在那里处理抛出的异常。基本上,这是你的“退出”信号。

一些旁注:

  1. 不要脱离线程!特别是在这里,AFAICT 没有理由这样做(也有一些实际危险。)只需发出信号让他们退出(以任何适当的方式)和join()他们。

  2. 你的stop标志应该是原子的。您在后台线程中读取它并从主线程中写入它,并且这些可以(实际上确实)在时间上重叠,所以......数据竞争!

  3. 我不明白为什么您的队列中有“完整”状态和“容量”。想想它们是否有必要。

更新 1:响应 OP 关于分离的评论......这是您的主线程中发生的事情:

  1. 您产生了“生产者”线程(即,将东西推入队列的线程)
  2. 然后你做所有你想做的工作(例如消费队列中的东西)
  3. 有时,也许在 结束时main(),您发出停止线程的信号(例如,通过将stop标志设置为true
  4. 然后,只有你join()与线程。

确实,您的主线程在等待线程接收“停止”信号、退出其循环并从其线程函数返回时会阻塞,但这是一个非常非常短的等待。而你没有别的事可做。更重要的是,您将知道您的线程干净且可预测地退出,并且从那时起,您肯定知道该线程将不会运行(在这里对您来说并不重要,但对于其他一些线程任务可能很重要。)

这是您在生成循环短任务的工作线程时通常要遵循的模式。

更新 2:关于队列的“满”和“容量”。没关系。这当然是你的决定。没问题。

更新 3:关于“抛出”与返回“空”对象以表示中止的“阻塞pop()”。我不认为这样扔有什么问题;特别是因为它非常罕见(仅在生产者/消费者操作结束时发生一次。)但是,如果T您要存储的所有类型都处于Queue“无效”或“空”状态,那么您当然可以可以用那个。但是投掷更普遍,如果对某些人来说更“恶心”。


推荐阅读