首页 > 解决方案 > 为什么 ZeroMQ ROUTER-DEALER 模式具有高延迟?

问题描述

在 centos 7 上使用 libzmq 4.2.5。当消息从DEALERtoROUTER甚至从ROUTERto发送时,延迟非常高DEALER。所以我使用 tcp 编写了一个简单的客户端-服务器程序,并在它们之间发送消息只是为了比较。Tcp 似乎很快。

DEALER从toROUTER发送单个字节zmq需要 900 微秒。

从客户端向服务器发送单个字节,tcp 需要 150 微秒。

我究竟做错了什么。我认为zmq至少会和 tcp 一样快。我可以做任何调整来zmq加快速度吗?

更新

路由器.cpp

#include <zmq.hpp>
    struct data
    {
    char one[21];
    unsigned long two;
   };
data * pdata;
std::size_t counter=0;

 int main()
{
   zmq::context_t context(1);
   zmq::socket_t Device(context,ZMQ_ROUTER);

   int iHighWaterMark=0;

  Device.setsockopt(ZMQ_SNDHWM,&iHighWaterMark,sizeof(int));
  Device.setsockopt(ZMQ_RCVHWM,&iHighWaterMark,sizeof(int));

  Device.bind("tcp://0.0.0.0:5555");

  pdata=new data[10000];

 struct timespec ts_dtime;
 unsigned long sec;

  zmq::message_t message;

  zmq::pollitem_t arrPollItems[]={{Device, 0, ZMQ_POLLIN, 0},{NULL, 
                                                            0, ZMQ_POLLIN, 0}};

    while(counter < 10000)
      {
        try
      {
        int iAssert = zmq::poll(arrPollItems, 1, -1);
        if (iAssert <= 0)
          {
             if (-1 == iAssert)
          {
             printf("zmq_poll failed errno: %d error:%s", errno, 
                                 zmq_strerror(errno));
          }
            continue;
           }

          if (arrPollItems[0].revents == ZMQ_POLLIN)
           {
             while(true)
            {
               if(! Device.recv(&message,ZMQ_DONTWAIT))
                    break;

               Device.recv(&message);

                strncpy(pdata[counter].one, 
                                      (char*)message.data(),message.size());
               clock_gettime(CLOCK_REALTIME, &ts_dtime);
              pdata[counter].two = (ts_dtime.tv_sec*1e9)+ 
                                                              ts_dtime.tv_nsec;
              ++counter;
            }

           }
      }
          catch(...)
        {

         }

          }

         for(int i=0;i<counter;++i)
         printf("%d %s %lu\n",i+1,pdata[i].one,pdata[i].two);

         return 0;
        }

经销商.cpp

#include <zmq.hpp>
#include<unistd.h>

int main()
{
  zmq::context_t context(1);
  zmq::socket_t Device(context,ZMQ_DEALER);

  int iHighWaterMark=0;

  Device.setsockopt(ZMQ_SNDHWM,&iHighWaterMark,sizeof(int));
  Device.setsockopt(ZMQ_RCVHWM,&iHighWaterMark,sizeof(int));
  Device.setsockopt(ZMQ_IDENTITY,"TEST",4);

   Device.connect("tcp://0.0.0.0:5555");

    usleep(100000);

   struct timespec ts_dtime;
   unsigned long sec;

   for(std::size_t i=0;i<10000;++i)
    {
      clock_gettime(CLOCK_REALTIME, &ts_dtime);
      sec=(ts_dtime.tv_sec*1e9)+ ts_dtime.tv_nsec;
      zmq::message_t message(21);
      sprintf((char *)message.data(),"%lu",sec);
      Device.send(message);
     usleep(500);
    }

  return 0;
 }

更新2:

路由器.cpp

#include <zmq.hpp>
#include <stdio.h>
#include <stdlib.h>

int main (int argc, char *argv[])
{
    const char *bind_to;
    int roundtrip_count;
    size_t message_size;

    int rc;
    int i;


    if (argc != 4) {
        printf ("usage: local_lat <bind-to> <message-size> "
                "<roundtrip-count>\n");
        return 1;
    }
    bind_to = argv[1];
    message_size = atoi (argv[2]);
    roundtrip_count = atoi (argv[3]);

    zmq::context_t ctx(1);
  zmq::socket_t s(ctx,ZMQ_ROUTER);

  zmq::message_t msg,id;

    int iHighWaterMark=0;
    s.setsockopt(ZMQ_SNDHWM , &iHighWaterMark,
                         sizeof (int));
    s.setsockopt(ZMQ_RCVHWM , &iHighWaterMark,
                                              sizeof (int));
    s.bind( bind_to);
    struct timespec ts_dtime;
    unsigned long sec;
for (i = 0; i != roundtrip_count; i++) {
      rc =s.recv(&id);
        if (rc < 0) {
            printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
            return -1;
        }

        rc = s.recv(&msg, 0);
        if (rc < 0) {
            printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
            return -1;
        }

        clock_gettime(CLOCK_REALTIME, &ts_dtime);
        sec=((ts_dtime.tv_sec*1e9)+ ts_dtime.tv_nsec);
        printf("%.*s %lu\n",20,(char *)msg.data(),sec);
}




    s.close();



    return 0;
}

经销商.cpp

#include <zmq.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
int main (int argc, char *argv[])
{
    const char *connect_to;
    int roundtrip_count;
    size_t message_size;

    int rc;
    int i;

    void *watch;
    unsigned long elapsed;
    double latency;

    if (argc != 4) {
        printf ("usage: remote_lat <connect-to> <message-size> "
                "<roundtrip-count>\n");
        return 1;
    }
    connect_to = argv[1];
    message_size = atoi (argv[2]);
    roundtrip_count = atoi (argv[3]);

    zmq::context_t ctx(1);
  zmq::socket_t s(ctx,ZMQ_DEALER);

  struct timespec ts_dtime;
  unsigned long sec;
int iHighWaterMark=0;
    s.setsockopt(ZMQ_SNDHWM , &iHighWaterMark,
                         sizeof (int));
    s.setsockopt(ZMQ_RCVHWM , &iHighWaterMark,
                                              sizeof (int));

    s.connect(connect_to);


    for (i = 0; i != roundtrip_count; i++) {
      zmq::message_t msg(message_size+20);
      clock_gettime(CLOCK_REALTIME, &ts_dtime);
      sec=(ts_dtime.tv_sec*1e9)+ ts_dtime.tv_nsec;
      sprintf((char *)msg.data(),"%lu",sec);
      rc = s.send(msg);
        if (rc < 0) {
            printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
            return -1;
        }

        sleep(1);
}
s.close();


    return 0;
}

输出 :

1562125527489432576 1562125527489773568
1562125528489582848 1562125528489961472
1562125529489740032 1562125529490124032
1562125530489944832 1562125530490288896
1562125531490101760 1562125531490439424
1562125532490261248 1562125532490631680
1562125533490422272 1562125533490798080
1562125534490555648 1562125534490980096
1562125535490745856 1562125535491161856
1562125536490894848 1562125536491245824
1562125537491039232 1562125537491416320
1562125538491229184 1562125538491601152
1562125539491375872 1562125539491764736
1562125540491517184 1562125540491908352
1562125541491657984 1562125541492027392
1562125542491816704 1562125542492193536
1562125543491963136 1562125543492338944
1562125544492103680 1562125544492564992
1562125545492248832 1562125545492675328
1562125546492397312 1562125546492783616
1562125547492543744 1562125547492926720
1562125564495211008 1562125564495629824
1562125565495372032 1562125565495783168
1562125566495515904 1562125566495924224
1562125567495660800 1562125567496006144
1562125568495806464 1562125568496160000
1562125569495896064 1562125569496235520
1562125570496080128 1562125570496547584
1562125571496235008 1562125571496666624
1562125572496391424 1562125572496803584
1562125573496532224 1562125573496935680
1562125574496652800 1562125574497053952
1562125575496843776 1562125575497277184
1562125576496997120 1562125576497417216
1562125577497182208 1562125577497726976
1562125578497336832 1562125578497726464
1562125579497549312 1562125579497928704
1562125580497696512 1562125580498115328
1562125581497847808 1562125581498198528
1562125582497998336 1562125582498340096
1562125583498140160 1562125583498622464
1562125584498295296 1562125584498680832
1562125585498445312 1562125585498842624
1562125586498627328 1562125586499025920

都在 350-450us 的范围内

标签: c++11ubuntu-16.04centos7zeromqlatency

解决方案


Q1:我做错了什么?
我认为zmq至少会和 tcp 一样快。

代码方面,什么都没有。

就性能而言,ZeroMQ 非常棒,而且有很多tcp开箱即用的特性没有也不会提供:

在此处输入图像描述

测试设置“发送单字节......”似乎直接进入了高性能/低延迟消息服务的左边缘:

在此处输入图像描述

让我们首先了解延迟以及它来自哪里:

观察到的延迟数字是资源使用(资源分配 + 资源池管理操作 + 数据操作)和处理工作量(我们尝试对数据做的所有事情,包括时间,我们的任务有)的总和的乘积在等待队列中花费,由于 system-scheduler 计划的多任务工作单元调度,这不是来自我们的测试工作量,而是操作系统必须根据公平调度策略和实际流程来调度和执行-优先级设置)和通信通道传输延迟(通信 E2E 传输延迟)

接下来让我们了解我们尝试与什么进行比较:

tcp传输控制协议 ( raw ) 与智能可扩展正式通信原型的 ZeroMQ框架之间的差异zmq具有一组丰富的高级分布式行为,大约有几个星系那么大。

ZeroMQ 被设计为一个信令和消息传递基础设施,使用了一些这些功能丰富的行为集,这些行为组合在一起——通常由一些类似人类的行为原型来描述:

One PUSH-es,任意数量的加入交易对手PULL

一个- ests REQ,电话另一端的某个小组中的某个人 - 撒谎REP

一个,甚至可能是来自一些更大的代理组的一个,PUB-lishes,任何数量的已经订阅的订阅者都会收到这样的SUB-scribed 消息。

有关详细信息,请阅读关于[ ZeroMQ 层次结构在不到 5 秒内] 部分中主要概念差异的简要概述。

这不是 TCP 协议本身所能提供的。

这是一种舒适,人们喜欢通过一些可以忽略不计的延迟来付出代价。微不足道?是的,与许多人*多年的终极软件工艺相比,任何人都必须为设计另一个至少类似的智能消息传递框架来与 ZeroMQ 竞争而付出的代价可以忽略不计。

Q2:我可以做一些调整来zmq加快速度吗?

可能是可能不是。

更新:
- 尝试避免身份管理(tcp 也没有这样的东西,所以测量的 RTT-s 的可比性或意义较小)
- 尝试避免 HWM 配置的阻塞方式(tcp 也没有这样的东西)
- 可以尝试在非 tcp 协议上测量相同(PAIR/PAIR正式可扩展通信原型,最好在最不复杂的协议数据泵inproc://上,或者ipc://如果您的沙盒测试台仍需要保留分布式非本地副本等)ZeroMQ context-instance 的内部开销分别花在了上面.send().receive()方法
- 可能会尝试通过使用更多可用于Context实例的线程来稍微提高性能
(其他性能去掩码技巧取决于实际使用的性质——作为对丢弃消息的鲁棒性、使用混合操作模式的可行性、与 O/S 更好的缓冲区对齐、零拷贝技巧——所有这些都值得关注在这里,还必须让分布式行为的智能 ZeroMQ 基础设施保持运行,这是迄今为止执行的复杂得多的任务,比其他盲目和隔离的 tcp-socket 字节级操作的简单串行序列 - 所以,比较时间是可能,但将单个龙类赛车(好吧,更好的车辆,甚至不是汽车)与分布式行为的全球运营基础设施(如 Taxify 或优步)进行比较,此处命名只是为了利用大致相同数量级的平凡(不)相似性)留下了报告有关现象的数字,这些数字不提供类似的舒适性、用例的可伸缩性、几乎线性的性能伸缩性和鲁棒性实际使用)
- 可以通过使Context-instance 的各自IoTHREADs-hard-wired 到各自的 CPU 内核来增加更多的调度确定性,以便整体 I/O 性能永远不会从 CPU 调度中被驱逐并保持确定性映射/预锁定甚至是专门用于管理的 CPU 核心 - 如果尝试进行这种终极性能破解,则取决于一定程度的需求和管理策略

对于任何与性能相关的调整,都需要发布一个 MCVE + 一个完整描述的基准测试套件。ZeroMQ 延迟测试结果报告显示:

结论

在受控环境中RDTSC,指令可用于快速测量时间。这使我们能够测量单个消息的延迟/密度,而不是计算整个测试的平均值。

我们使用这种方法来获取 ØMQ 轻量级消息传递内核(版本 0.1)的性能数据,我们得到以下结果:

-- 在低容量情况下,延迟几乎与底层传输 (TCP) 的延迟相同:50 微秒。
-- 延迟的平均抖动最小:0.225 微秒。
-- 发送方的吞吐量是每秒 480 万条消息。
-- 发送方的密度大多约为 0.140 微秒,但偶尔出现峰值,平均密度为 0.208 微秒。
-- 接收方的吞吐量为每秒 270 万条消息。
-- 接收端的密度大多在 0.3 微秒左右。大约每收到 100 条新批次的消息,导致密度增长到 3-6 微秒。平均密度为 0.367 微秒。

如果最终需要延迟剃须,可以尝试,ZeroMQ 的妹妹,由 ZeroMQ 的共同父亲 Martin SUSTRIK 发起(现在由其他人维护 afaik)


推荐阅读