go - 如何在 nats 中回复已接受的连接。在 golang 中订阅
问题描述
我正在尝试在 golang 中创建一个 tcp 服务器,它接受连接,读取数据,然后通过 nats.Publish 发布它。但我也想回复并关闭订阅者内部的连接,而不是在主服务器脚本中我看到两种可能的方式:发布连接或在订阅者中创建一个新连接,但我无法实现提前谢谢你
UPD 关于 I. Kozlovic 回答的评论:不完全是我想要的,但可能会有所帮助:) 我正在写关于关闭 tcp 连接 - 不是 nats 连接并在另一个 go 进程中关闭 - 而不是在它被接受的那个。但是由于我没有成功,我可以根据您的回答执行以下操作:我可以通过 nats 发布我需要的信息,对其进行处理,然后回复“OK。关闭连接”并关闭发布者中的 TCP 连接,它是一个 TCP服务器。但是,如果我不能从这个 go 过程而是从另一个过程中回复,那就太好了。我理解这很棘手,但我想做与我在上一条消息中写的相同的操作,但需要进行一些修改。我有 TCP 服务器,它接受连接,然后连接 nats 并发布消息,
解决方案
根据您更新的问题,这是一种可能的方法。请注意,这两个额外的进程在这里由 go-routines 表示,但在实际情况下,您可以让它们成为单独的进程。我也省略了错误检查。
// This represent what would be the last process in your
// example.
go func() {
nc, _ := nats.Connect(nats.DefaultURL)
nc.Subscribe("bar", func(m *nats.Msg) {
fmt.Printf("Received request: %s, final stop, sending back to %v\n", m.Data, m.Reply)
nc.Publish(m.Reply, []byte("I'm here to help!"))
})
nc.Flush()
runtime.Goexit()
}()
// This would be the in-between process that receives
// the message triggered by the TCP accept
go func() {
nc, _ := nats.Connect(nats.DefaultURL)
nc.Subscribe("foo", func(m *nats.Msg) {
fmt.Printf("Received request: %s, forward to bar\n", m.Data)
nc.PublishRequest("bar", m.Reply, []byte(fmt.Sprintf("got %s", m.Data)))
})
nc.Flush()
runtime.Goexit()
}()
// This would be your TCP server
l, _ := net.Listen("tcp", "127.0.0.1:1234")
for {
c, _ := l.Accept()
go func(c net.Conn) {
// Close socket when done
defer c.Close()
// Connect to NATS
nc, _ := nats.Connect(nats.DefaultURL)
// Close NATS connection when done
defer nc.Close()
// Sends the request to first process. Note that this
// has a timeout and so if no response is received, the
// go-routine will exit, closing the TCP connection.
reply, err := nc.Request("foo", []byte("help"), 10*time.Second)
if err != nil {
fmt.Printf("Got error: %v\n", err)
} else {
fmt.Printf("Got reply: %s\n", reply.Data)
}
}(c)
}
请注意,通常不建议创建寿命很短的 NATS 连接。如果适合您的模型,您可能希望重用 NATS 连接。
推荐阅读
- php - 在 php 数组中将键 [0] 更改为 [Name]?
- python - 字母频率替换密码
- python - lxml/python 读取带有 CDATA 部分的 xml
- elasticsearch - 无痛:检查单个文档是否包含密钥
- android - 在 Gradle 构建中引用变量
- javascript - 如何在 React 16+ 中动态添加类?
- python - 为什么 DataFrame 将所有输入的数据类型更改为对象?
- python - Python:如何从每个元素中包含 2 位数字的 str 数字创建列表?
- javascript - 在 React 中访问 API JSON 数据
- linux - 您将如何删除 makefile 变量中每个字符串的基本文件夹以外的所有内容?