zeromq - 为什么 zmq ( inproc:// )-connection 的顺序很重要,不像 for ( tcp:// )?
问题描述
当启动 zmq 服务器和客户端时,以任何随机顺序通过tcp://
传输类进行通信,它们足够聪明,可以连接/重新连接,无论顺序如何。
但是,当尝试在inproc://
传输类上运行相同的程序时,我发现它仅在客户端在服务器之后启动时才有效。我们怎样才能避免这种情况?
MCVE 代码:
这是一些 kotlin MCVE 代码示例,用于重现声明(这是众所周知的天气示例的修改版本)
server.kt
- 运行它以独立运行服务器
package sandbox.zmq
import org.zeromq.ZMQ
import org.zeromq.ZMQ.Context
import sandbox.util.Util.sout
import java.util.*
fun main(args: Array<String>) {
server(
context = ZMQ.context(1),
// publishTo = "tcp://localhost:5556"
publishTo = "tcp://localhost:5557"
)
}
fun server(context: Context, publishTo: String) {
val publisher = context.socket(ZMQ.PUB)
publisher.bind(publishTo)
// Initialize random number generator
val srandom = Random(System.currentTimeMillis())
while (!Thread.currentThread().isInterrupted) {
// Get values that will fool the boss
val zipcode: Int
val temperature: Int
val relhumidity: Int
zipcode = 10000 + srandom.nextInt(10)
temperature = srandom.nextInt(215) - 80 + 1
relhumidity = srandom.nextInt(50) + 10 + 1
// Send message to all subscribers
val update = String.format("%05d %d %d", zipcode, temperature, relhumidity)
println("server >> $update")
publisher.send(update, 0)
Thread.sleep(500)
}
publisher.close()
context.term()
}
client.kt
- 为客户端独立运行
package sandbox.zmq
import org.zeromq.ZMQ
import org.zeromq.ZMQ.Context
import java.util.*
fun main(args: Array<String>) {
client(
context = ZMQ.context(1),
readFrom = "tcp://localhost:5557"
)
}
fun client(context: Context, readFrom: String) {
// Socket to talk to server
println("Collecting updates from weather server")
val subscriber = context.socket(ZMQ.SUB)
// subscriber.connect("tcp://localhost:");
subscriber.connect(readFrom)
// Subscribe to zipcode, default is NYC, 10001
subscriber.subscribe("".toByteArray())
// Process 100 updates
var update_nbr: Int
var total_temp: Long = 0
update_nbr = 0
while (update_nbr < 10000) {
// Use trim to remove the tailing '0' character
val string = subscriber.recvStr(0).trim { it <= ' ' }
println("client << $string")
val sscanf = StringTokenizer(string, " ")
val zipcode = Integer.valueOf(sscanf.nextToken())
val temperature = Integer.valueOf(sscanf.nextToken())
val relhumidity = Integer.valueOf(sscanf.nextToken())
total_temp += temperature.toLong()
update_nbr++
}
subscriber.close()
}
inproc.kt
- 运行它并修改为inproc://
场景调用的样本
package sandbox.zmq
import org.zeromq.ZMQ
import kotlin.concurrent.thread
fun main(args: Array<String>) {
// clientFirst()
clientLast()
}
fun println(string: String) {
System.out.println("${Thread.currentThread().name} : $string")
}
fun clientFirst() {
val context = ZMQ.context(1)
val client = thread {
client(
context = context,
readFrom = "inproc://backend"
)
}
// use this to maintain order
Thread.sleep(10)
val server = thread {
server(
context = context,
publishTo = "inproc://backend"
)
}
readLine()
client.interrupt()
server.interrupt()
}
fun clientLast() {
val context = ZMQ.context(1)
val server = thread {
server(
context = context,
publishTo = "inproc://backend"
)
}
// use this to maintain order
Thread.sleep(10)
val client = thread {
client(
context = context,
readFrom = "inproc://backend"
)
}
readLine()
client.interrupt()
server.interrupt()
}
解决方案
为什么 zmq
inproc://
连接顺序很重要,不像 fortcp://
?
好吧,这是一种设计行为
鉴于本机 ZeroMQ API 会警告这种设计行为(自此以后),所以问题不是问题,而是预期的属性。
另外还必须满足一个额外的属性:
名称 [
an_endpoint_name
中的意思.connect("inproc://<_an_endpoint_name_>")
]
必须是先前通过将其分配给与正在连接的套接字相同的ØMQ 上下文中的
至少一个套接字而创建的。
较新版本的本机 ZeroMQ API(4.0 后),如果确实部署在各自的语言绑定/包装器下,可能允许释放这些要求中的前者:
zmq_bind()
从 4.0 版开始,和tcp 传输类型的顺序zmq_connect()
无关紧要。
我们怎样才能避免这种情况?
好吧,一个更难的部分......
如果在 ZeroMQ 原生 API v4.2+ 之上还没有一种简单的方法,可以卷起袖子,重新分解 4.x 之前的语言包装器/绑定,以使引擎到达那里,或者,也许,测试一下Martin SUSTRIK的第二个可爱的孩子,nanomsg
是否适合实现这一目标的场景。
推荐阅读
- nlp - WordNet 语料库中的单词说明
- python - json 模块安装错误我使用的是 windows 10 pro 64 位,请给出解决方案来纠正这个问题
- javascript - v-toggle 的自定义按钮激活
- mongodb - mongodb管道聚合组结果成数组
- git - 如何将 GitLab FOSS 从 12.7.5 升级到 12.8.5(综合包)?
- c# - 我在 C# 中创建家谱时遇到了麻烦
- arm - 检测 Thumb-2 指令和 PC 偏移的位置
- matlab - 找到线性系统精确解的最快方法是什么,matlab
- c# - 为什么此 LINQ 表达式返回只有一项的子列表
- redux - Immutable.js:创建一个包含一个元素的列表