首页 > 解决方案 > 为什么 zmq ( inproc:// )-connection 的顺序很重要,不像 for ( tcp:// )?

问题描述

当启动 zmq 服务器和客户端时,以任何随机顺序通过tcp://传输类进行通信,它们足够聪明,可以连接/重新连接,无论顺序如何。

但是,当尝试在inproc://传输类上运行相同的程序时,我发现它仅在客户端在服务器之后启动时才有效。我们怎样才能避免这种情况?


MCVE 代码:

这是一些 kotlin MCVE 代码示例,用于重现声明(这是众所周知的天气示例的修改版本)

server.kt- 运行它以独立运行服务器

package sandbox.zmq

import org.zeromq.ZMQ
import org.zeromq.ZMQ.Context
import sandbox.util.Util.sout
import java.util.*

fun main(args: Array<String>) {
   server(
      context = ZMQ.context(1),
//      publishTo = "tcp://localhost:5556"
      publishTo = "tcp://localhost:5557"
   )
}

fun server(context: Context, publishTo: String) {
   val publisher = context.socket(ZMQ.PUB)
   publisher.bind(publishTo)

   //  Initialize random number generator
   val srandom = Random(System.currentTimeMillis())
   while (!Thread.currentThread().isInterrupted) {
      //  Get values that will fool the boss
      val zipcode: Int
      val temperature: Int
      val relhumidity: Int
      zipcode = 10000 + srandom.nextInt(10)
      temperature = srandom.nextInt(215) - 80 + 1
      relhumidity = srandom.nextInt(50) + 10 + 1

      //  Send message to all subscribers
      val update = String.format("%05d %d %d", zipcode, temperature, relhumidity)
      println("server >> $update")
      publisher.send(update, 0)
      Thread.sleep(500)
   }

   publisher.close()
   context.term()
}

client.kt- 为客户端独立运行

package sandbox.zmq

import org.zeromq.ZMQ
import org.zeromq.ZMQ.Context
import java.util.*

fun main(args: Array<String>) {
   client(
      context = ZMQ.context(1),
      readFrom = "tcp://localhost:5557"
   )
}

fun client(context: Context, readFrom: String) {
   //  Socket to talk to server
   println("Collecting updates from weather server")
   val subscriber = context.socket(ZMQ.SUB)
   //        subscriber.connect("tcp://localhost:");
   subscriber.connect(readFrom)

   //  Subscribe to zipcode, default is NYC, 10001
   subscriber.subscribe("".toByteArray())

   //  Process 100 updates
   var update_nbr: Int
   var total_temp: Long = 0
   update_nbr = 0
   while (update_nbr < 10000) {
      //  Use trim to remove the tailing '0' character
      val string = subscriber.recvStr(0).trim { it <= ' ' }
      println("client << $string")
      val sscanf = StringTokenizer(string, " ")
      val zipcode = Integer.valueOf(sscanf.nextToken())
      val temperature = Integer.valueOf(sscanf.nextToken())
      val relhumidity = Integer.valueOf(sscanf.nextToken())

      total_temp += temperature.toLong()
      update_nbr++

   }
   subscriber.close()
}

inproc.kt- 运行它并修改为inproc://场景调用的样本

package sandbox.zmq

import org.zeromq.ZMQ
import kotlin.concurrent.thread


fun main(args: Array<String>) {
//   clientFirst()
   clientLast()
}

fun println(string: String) {
   System.out.println("${Thread.currentThread().name} : $string")
}

fun clientFirst() {

   val context = ZMQ.context(1)

   val client = thread {
      client(
         context = context,
         readFrom = "inproc://backend"
      )
   }

   // use this to maintain order
   Thread.sleep(10)

   val server = thread {
      server(
         context = context,
         publishTo = "inproc://backend"
      )
   }

   readLine()
   client.interrupt()
   server.interrupt()
}

fun clientLast() {

   val context = ZMQ.context(1)

   val server = thread {
      server(
         context = context,
         publishTo = "inproc://backend"
      )
   }

   // use this to maintain order
   Thread.sleep(10)

   val client = thread {
      client(
         context = context,
         readFrom = "inproc://backend"
      )
   }

   readLine()
   client.interrupt()
   server.interrupt()
}

标签: zeromq

解决方案


为什么 zmqinproc://连接顺序很重要,不像 for tcp://

好吧,这是一种设计行为

鉴于本机 ZeroMQ API 会警告这种设计行为(自此以后),所以问题不是问题,而是预期的属性。

另外还必须满足一个额外的属性:

名称 [an_endpoint_name中的意思.connect("inproc://<_an_endpoint_name_>")]
必须是先前通过将其分配给与正在连接的套接字相同的ØMQ 上下文中的
至少一个套接字而创建的。

较新版本的本机 ZeroMQ API(4.0 后),如果确实部署在各自的语言绑定/包装器下,可能允许释放这些要求中的前者:

zmq_bind()从 4.0 版开始,和tcp 传输类型的顺序zmq_connect()无关紧要。


我们怎样才能避免这种情况?

好吧,一个更难的部分......

如果在 ZeroMQ 原生 API v4.2+ 之上还没有一种简单的方法,可以卷起袖子,重新分解 4.x 之前的语言包装器/绑定,以使引擎到达那里,或者,也许,测试一下Martin SUSTRIK的第二个可爱的孩子,nanomsg是否适合实现这一目标的场景。


推荐阅读