websocket - Send message only to certain client using websockets with Rsocket and Spring Webflux
问题描述
I am trying to use Rsocket with websocket in one of my POC projects. In my case user login is not required. I would like to send a message only to certain clients when I receive a message from another service. Basically, my flow goes like this.
Service A Service B
|--------| websocket |------------------| Queue based comm |---------------|
| Web |----------------->| Rsocket server |--------------------->| Another |
| |<-----------------| using Websocket |<---------------------| service |
|--------| websocket |------------------| Queue based comm |---------------|
In my case, I am thinking to use a unique id for each connection and each request. Merge both identifiers as correlation id and send the message to Service B and when I get the message from Service B figure which client it needs to go to and send it. Now I understand I may not need 2 services to do this but I am doing this for a few other reasons. Though I have a rough idea about how to implement other pieces. I am new to the Rsocket concept. Is it possible to send a message to the only certain client by a certain id using Spring Boot Webflux, Rsocket, and websocket?
解决方案
基本上,我认为您在这里有两个选择。第一个是过滤来自的通量Service B
,第二个是使用RSocketRequester
和Map
@NikolaB描述的。
第一个选项:
data class News(val category: String, val news: String)
data class PrivateNews(val destination: String, val news: News)
class NewsProvider {
private val duration: Long = 250
private val externalNewsProcessor = DirectProcessor.create<News>().serialize()
private val sink = externalNewsProcessor.sink()
fun allNews(): Flux<News> {
return Flux
.merge(
carNews(), bikeNews(), cosmeticsNews(),
externalNewsProcessor)
.delayElements(Duration.ofMillis(duration))
}
fun externalNews(): Flux<News> {
return externalNewsProcessor;
}
fun addExternalNews(news: News) {
sink.next(news);
}
fun carNews(): Flux<News> {
return Flux
.just("new lambo!!", "amazing ferrari!", "great porsche", "very cool audi RS4 Avant", "Tesla i smarter than you")
.map { News("CAR", it) }
.delayElements(Duration.ofMillis(duration))
.log()
}
fun bikeNews(): Flux<News> {
return Flux
.just("specialized enduro still the biggest dream", "giant anthem fast as hell", "gravel long distance test")
.map { News("BIKE", it) }
.delayElements(Duration.ofMillis(duration))
.log()
}
fun cosmeticsNews(): Flux<News> {
return Flux
.just("nivea - no one wants to hear about that", "rexona anti-odor test")
.map { News("COSMETICS", it) }
.delayElements(Duration.ofMillis(duration))
.log()
}
}
@RestController
@RequestMapping("/sse")
@CrossOrigin("*")
class NewsRestController() {
private val log = LoggerFactory.getLogger(NewsRestController::class.java)
val newsProvider = NewsProvider()
@GetMapping(value = ["/news/{category}"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun allNewsByCategory(@PathVariable category: String): Flux<News> {
log.info("hello, getting all news by category: {}!", category)
return newsProvider
.allNews()
.filter { it.category == category }
}
}
该类NewsProvider
是您的模拟Service B
,应该返回Flux<>
。每当您调用时,addExternalNews
它都会推送该方法News
返回的allNews
值。在NewsRestController
课堂上,我们按类别过滤新闻。打开浏览器localhost:8080/sse/news/CAR
仅查看汽车新闻。
如果您想改用 RSocket,可以使用如下方法:
@MessageMapping("news.{category}")
fun allNewsByCategory(@DestinationVariable category: String): Flux<News> {
log.info("RSocket, getting all news by category: {}!", category)
return newsProvider
.allNews()
.filter { it.category == category }
}
第二种选择:
让我们RSocketRequester
用.HashMap
@ConnectMapping
@Controller
class RSocketConnectionController {
private val log = LoggerFactory.getLogger(RSocketConnectionController::class.java)
private var requesterMap: Map<String, RSocketRequester> = HashMap.empty()
@Synchronized
private fun getRequesterMap(): Map<String, RSocketRequester> {
return requesterMap
}
@Synchronized
private fun addRequester(rSocketRequester: RSocketRequester, clientId: String) {
log.info("adding requester {}", clientId)
requesterMap = requesterMap.put(clientId, rSocketRequester)
}
@Synchronized
private fun removeRequester(clientId: String) {
log.info("removing requester {}", clientId)
requesterMap = requesterMap.remove(clientId)
}
@ConnectMapping("client-id")
fun onConnect(rSocketRequester: RSocketRequester, clientId: String) {
val clientIdFixed = clientId.replace("\"", "") //check serialezer why the add " to strings
// rSocketRequester.rsocket().dispose() //to reject connection
rSocketRequester
.rsocket()
.onClose()
.subscribe(null, null, {
log.info("{} just disconnected", clientIdFixed)
removeRequester(clientIdFixed)
})
addRequester(rSocketRequester, clientIdFixed)
}
@MessageMapping("private.news")
fun privateNews(news: PrivateNews, rSocketRequesterParam: RSocketRequester) {
getRequesterMap()
.filterKeys { key -> checkDestination(news, key) }
.values()
.forEach { requester -> sendMessage(requester, news) }
}
private fun sendMessage(requester: RSocketRequester, news: PrivateNews) {
requester
.route("news.${news.news.category}")
.data(news.news)
.send()
.subscribe()
}
private fun checkDestination(news: PrivateNews, key: String): Boolean {
val list = destinations(news)
return list.contains(key)
}
private fun destinations(news: PrivateNews): List<String> {
return news.destination
.split(",")
.map { it.trim() }
}
}
请注意,我们必须在rsocket-js
客户端中添加两件事:在 SETUP 帧中的有效负载,以提供客户端 ID 并注册响应程序,以处理由RSocketRequester
.
const client = new RSocketClient({
// send/receive JSON objects instead of strings/buffers
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer
},
setup: {
//for connection mapping on server
payload: {
data: "provide-unique-client-id-here",
metadata: String.fromCharCode("client-id".length) + "client-id"
},
// ms btw sending keepalive to server
keepAlive: 60000,
// ms timeout if no keepalive response
lifetime: 180000,
// format of `data`
dataMimeType: "application/json",
// format of `metadata`
metadataMimeType: "message/x.rsocket.routing.v0"
},
responder: responder,
transport
});
有关这方面的更多信息,请参阅此问题:如何使用 RSocket 处理从服务器发送到客户端的消息?
推荐阅读
- spring-boot - Spring JMS 和本机 IBM MQ 库之间总消息长度的差异
- sed - Sed命令在模式匹配时添加一个字符?
- cox-regression - 关于 R 中的生存分析
- sql - SELECT DISTINCT ON (col) * 有效吗?
- ruby-on-rails - NameError: 未初始化的常量 RuboCop::Error
- javascript - 反应我如何重定向主页?
- openpyxl - 如何遍历 Excel 文件和 ID 彩色单元格值
- html - 悬停时按钮不改变颜色
- cpu-architecture - 空间局部性未命中数
- xcode - Xcode 12.5 部署:上传到 App Store 时出错