首页 > 解决方案 > 使用带有 zmq 的计时器

问题描述

我正在做一个必须使用 zmq_poll 的项目。但我并不完全明白它的作用。

所以我也尝试实现它:

zmq_pollitem_t timer_open(void){

  zmq_pollitem_t items[1];


    if( items[0].socket  == nullptr ){
         printf("error socket %s: %s\n", zmq_strerror(zmq_errno()));
         return;        
    }
else{
    items[0].socket = gsock; 
} 


items[0].fd = -1;   
items[0].events = ZMQ_POLLIN;  


 // get a timer
items[0].fd  = timerfd_create( CLOCK_REALTIME, 0 );
    if( items[0].fd  == -1 )
    {
    printf("timerfd_create() failed: errno=%d\n", errno);
            items[0].socket  = nullptr;

            return;
    }

int rc = zmq_poll(items,1,-1);

if(rc == -1){
    printf("error poll %s: %s\n", zmq_strerror(zmq_errno()));
    return;
} 
else
     return items[0];
}

我对这个主题很陌生,我必须修改一个旧的现有项目并将功能替换为 zmq 的功能。在其他网站上,我看到了他们在无限循环中使用两个项目和 zmq_poll 函数的示例。我已经阅读了文档,但仍然无法正确理解它是如何工作的。这些是我实现的另外两个功能。我不知道这是否是这样实现它的正确方法:

   void timer_set(zmq_pollitem_t items[] , long msec, ipc_timer_mode_t mode ) {


    struct itimerspec t;

    ...

    timerfd_settime( items[0].fd , 0, &t, NULL );

}


void timer_close(zmq_pollitem_t items[]){

if( items[0].fd != -1 )
       close(items[0].fd);

items[0].socket = nullptr; 

}

我不确定是否需要 zmq_poll 函数,因为我使用的是计时器。

编辑:

void some_function_timer_example() {
   // We want to wait on two timers
   zmq_pollitem_t items[2] ;

   // Setup first timer
   ipc_timer_open_(&items[0]);
   ipc_timer_set_(&items[0], 1000, IPC_TIMER_ONE_SHOT);
   // Setup second timer
   ipc_timer_open_(&items[1]);
   ipc_timer_set_(&items[1], 1000, IPC_TIMER_ONE_SHOT);

   // Now wait for the timers in a loop
   while (1) {
        //ipc_timer_set_(&items[0], 1000, IPC_TIMER_REPEAT);
        //ipc_timer_set_(&items[1], 5000, IPC_TIMER_REPEAT);

      int rc = zmq_poll (items, 2, -1);
      assert (rc >= 0); /* Returned events will be stored in items[].revents */

        if (items [0].revents & ZMQ_POLLIN) {
            //  Process task
            std::cout << "revents: 1" << std::endl;
        }
        if (items [1].revents & ZMQ_POLLIN) {
            //  Process weather update

            std::cout << "revents: 2" << std::endl;

        }
   }
}

现在它仍然打印得非常快并且没有等待。它仍然只是在开始等待。并且当 timer_set 在循环内时,它会正确等待,仅当等待时间相同时:ipc_timer_set(&items[1], 1000,...) and ipctimer_set(&items[0], 1000,...)

那么我该如何改变呢?或者这是正确的行为?

标签: c++csocketstimerzeromq

解决方案


zmq_poll像 select 一样工作,但它允许一些额外的东西。例如,您可以在常规同步文件描述符和特殊异步套接字之间进行选择。

在您的情况下,您可以像尝试那样使用计时器 fd,但您需要进行一些小的更改。

首先,您必须考虑如何调用这些计时器。我认为用例是如果您想创建多个计时器并等待它们。这通常是您当前代码中的函数,该函数可能正在为计时器使用循环(使用 select() 或他们可能正在做的任何其他事情)。它会是这样的:

void some_function() {
   // We want to wait on two timers
   zmq_pollitem items[2];

   // Setup first timer
   ipc_timer_open(&item[0]);
   ipc_timer_set(&item[0], 1000, IPC_TIMER_ONE_REPEAT);
   // Setup second timer
   ipc_timer_open(&item[1]);
   ipc_timer_set(&item[1], 5000, IPC_TIMER_ONE_SHOT);

   // Now wait for the timers in a loop
   while (1) {
      int rc = zmq_poll (items, 2, -1);
      assert (rc >= 0); /* Returned events will be stored in items[].revents */
   }
}

现在,您需要修复 ipc_timer_open。这将非常简单 - 只需创建计时器 fd。

// Takes a pointer to pre-allocated zmq_pollitem_t and returns 0 for success, -1 for error
int ipc_timer_open(zmq_pollitem_t *items){
    items[0].socket = NULL; 
    items[0].events = ZMQ_POLLIN;  
    // get a timer
    items[0].fd  = timerfd_create( CLOCK_REALTIME, 0 );
    if( items[0].fd  == -1 )
    {
        printf("timerfd_create() failed: errno=%d\n", errno);
        return -1; // error
    }
    return 0;
}

编辑:添加为对评论的回复,因为这很长:来自文档: If both socket and fd are set in a single zmq_pollitem_t, the ØMQ socket referenced by socket shall take precedence and the value of fd shall be ignored.

因此,如果您要传递 fd,则必须将 socket 设置为 NULL。我什至不清楚gsock来自哪里。这是在文档中吗?我找不到它。

它什么时候会脱离 while(1) 循环?

这是应用程序逻辑,您必须根据需要编写代码。zmq_poll 只是在每次计时器命中时不断返回。在此示例中,zmq_poll 每秒返回一次,因为第一个计时器(它是重复的)不断触发。但是在 5 秒时,它也会因为第二个计时器(这是一次)而返回。何时退出循环由您决定。你想让这个无限吗?您是否需要检查不同的条件才能退出循环?你想这样做 100 次然后返回吗?您可以在此代码之上编写任何您想要的逻辑。

以及返回什么样的事件

ZMQ_POLLIN 因为计时器 fds 的行为类似于可读文件描述符。


推荐阅读