首页 > 解决方案 > 一段时间后 CPU 使用率过高

问题描述

我有以下代码在基于 linux 的设备中作为服务运行。

  1. 它有一个 mqtt 回调,只要有人在订阅的主题上发布消息,它就会接收消息。
  2. 一个线程是处理队列中的传入消息。一旦他们处理结果消息将被推送到 out_message 队列。
  3. 另一个线程是处理传出消息队列。
  4. 我已经使用 condition_variable 在线程之间共享资源。

问题是一段时间后(随机时间)此应用程序的 CPU 利用率达到 100%。给定代码中的任何问题以纠正我的过程。请帮我 !!非常感谢您提前。

void pushMessage(std::string rData) {
    in_mutex.lock();
    in_queue.push(rData);
    in_mutex.unlock();
    in_cv.notify_all();
}

void pushOutGoingMessage(Json::Value data) {
    out_mutex.lock();
    out_queue.push(data);
    out_mutex.unlock();
    out_cv.notify_all();
}
void processOutGoingMessages() {
    while (true) {
        Json::Value data;
        {
            std::unique_lock<std::mutex> lock(out_mutex);
            while (out_queue.empty()) {
                out_cv.wait(lock);
            }
            data = out_queue.front();
            out_queue.pop();
            lock.unlock();
        }
        if (!data.isNull()) {
            parseOutGoingMessages(data);
        }
    }
}

void processMessage() {
    while (true) {
        std::string data = "NO_DATA";
        {
            std::unique_lock<std::mutex> lock(in_mutex, std::try_to_lock);
            if (!lock.owns_lock()) {

            } else {
                while (in_queue.empty()) {
                    in_cv.wait(lock);
                }
                data = in_queue.front();
                in_queue.pop();
                lock.unlock();
            }
        }
        if (data.compare("NO_DATA") != 0) {
            parseMessage(data);
        }
    }
}

void parseOutGoingMessages(Json::Value rJsonMessage) {
    // mqtt client send method
    mqtt_client.push_message(rJsonMessage.toStyledString(),
            rJsonMessage["destination"].asString());
}

void parseMessage(std::string rMessage) {
    try {
        debug(rMessage);
        // application logic
    } catch (std::exception &e) {
        debug("ERRO HANDLED IN PARSING ::" + std::string(e.what()));
    }
}

void connectMQTT() {
    // connection params
}

void OnConnectionLost(void *context, char *cause) {
    // retry logic
    connectMQTT();
}

void delivered(void *context, MQTTClient_deliveryToken dt) {

}
int OnMessageArrived(void *context, char *topicName, int topicLen,
        MQTTClient_message *message) {
    if (!message->retained) {
        std::string msg((char *) message->payload, message->payloadlen);
        pushMessage(msg);
    }
    MQTTClient_freeMessage(&message);
    MQTTClient_free(topicName);
    return 1;
}
void send(Json::Value rData,std::string rDestination) {
    Json::Value jsonNotification;
    jsonNotification["destination"] = rDestination;
    jsonNotification["data"] = rData;
    pushOutGoingMessage(jsonNotification);
}
int main(int argc, char **argv) {
    connectMQTT();
    std::thread procInMessage(processMessage);
    std::thread procOutMessage(processOutGoingMessages);
    procInMessage.join();
    procOutMessage.join();
}

标签: c++linuxmultithreadingc++11mqtt

解决方案


谢谢@MatthewFisher 和@Mike Vine。我只是修改了两个队列的推送方法。

void pushMessage(std::string rData) {
//  in_mutex.lock();
//  in_queue.push(rData);
//  in_mutex.unlock();
//  in_cv.notify_all();
    std::unique_lock<std::mutex> lock(in_mutex);
    in_queue.push(rData);
    lock.unlock();
    in_cv.notify_all();
}

void pushOutGoingMessage(Json::Value data) {
//  out_mutex.lock();
//  out_queue.push(data);
//  out_mutex.unlock();
//  out_cv.notify_all();
    std::unique_lock<std::mutex> lock(out_mutex);
    out_queue.push(data);
    lock.unlock();
    out_cv.notify_all();
}

我猜这个问题已经解决了。由于两个队列的推送方法中的 (!lock.owns_lock()) {} 是purley。


推荐阅读