首页 > 解决方案 > 使用用户线程池 grpc 处理请求

问题描述

我正在浏览这个async_server示例。我对以下代码片段感兴趣,

while (true) {
  // Block waiting to read the next event from the completion queue. The
  // event is uniquely identified by its tag, which in this case is the
  // memory address of a CallData instance.
  // The return value of Next should always be checked. This return value
  // tells us whether there is any kind of event or cq_ is shutting down.
  GPR_ASSERT(cq_->Next(&tag, &ok));
  GPR_ASSERT(ok);
  /*
    How to enqueue this request on an existing thread pool
    For example,
    pool.enqueue() {
         /* what to enqueue so that when one worker thread of my thread pool
         pop one requests it can decide what handler to execute */
    }
  */
  
  // do not process request in this thread, commenting it
  // static_cast<CallData*>(tag)->Proceed();
}

在我的设计中,上述循环由一个专用线程执行,该线程基本上将请求推送到thread-safe队列中。池中的工作线程从thread_safe队列中弹出请求对象,并在多核环境中并行处理请求。

我有一些简单的处理函数,这些函数应该由工作线程从队列中弹出的特定请求触发。将被推入队列的对象应该有足够的信息,以便工作线程可以决定执行哪个处理程序。

例如; 处理函数可以是非常简单的 CPU 绑定任务。

void handler1( .... some argument ) {
    // a CPU bound for loop
    // reply someting to the client }

我的问题是:如何将标签信息排入用户定义的thread_safe队列,假设thread_safe队列提供以下API:

enqueue(someObj task) {
     // task should have tag information required for the worker threads to select 
     // proper handler function.
     // I need to know the following, what should I push to the queue or what 
     // other code changes I need to do to push something meaningful for the 
     // worker threads. Should I create multiple services? Or can I handle 
     // multiple handler functions by having the same service. 
}

当我说我的意思是以下内容时, async_server链接handler function中的现有代码。

    // inside proceed() function
    // The actual processing.
    
    std::string prefix("Hello ");
    reply_.set_message(prefix + request_.name());

谢谢你。

标签: c++multithreadinggrpc

解决方案


推荐阅读