backpressure - RSocket 如何向多个客户端发出租约?
问题描述
我可以为单个客户端创建服务器租约,如下所示:</p>
@Slf4j
public class LeaseServer {
private static final String SERVER_TAG = "server";
public static void main(String[] args) throws InterruptedException {
// Queue for incoming messages represented as Flux
// Imagine that every fireAndForget that is pushed is processed by a worker
int queueCapacity = 50;
BlockingQueue<String> messagesQueue = new ArrayBlockingQueue<>(queueCapacity);
// emulating a worker that process data from the queue
Thread workerThread =
new Thread(
() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
String message = messagesQueue.take();
System.out.println("consume message:" + message);
Thread.sleep(100000); // emulating processing
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
workerThread.start();
CloseableChannel server = getFireAndForgetServer(messagesQueue, workerThread);
TimeUnit.MINUTES.sleep(10);
server.dispose();
}
private static CloseableChannel getFireAndForgetServer(BlockingQueue<String> messagesQueue, Thread workerThread) {
CloseableChannel server =
RSocketServer.create((setup, sendingSocket) ->
Mono.just(new RSocket() {
@Override
public Mono<Void> fireAndForget(Payload payload) {
// add element. if overflows errors and terminates execution
// specifically to show that lease can limit rate of fnf requests in
// that example
try {
if (!messagesQueue.offer(payload.getDataUtf8())) {
System.out.println("Queue has been overflowed. Terminating execution");
sendingSocket.dispose();
workerThread.interrupt();
}
} finally {
payload.release();
}
return Mono.empty();
}
}))
.lease(() -> Leases.create().sender(new LeaseCalculator(SERVER_TAG, messagesQueue)))
.bindNow(TcpServerTransport.create("localhost", 7000));
return server;
}
}
但是如何向连接到该服务器的多个客户端发出租约?
否则我的队列会被多个客户端多次写入,导致服务溢出。
我在公开的文件和资料中找不到详细信息。
非常感谢您的帮助。
解决方案
推荐阅读
- drag-and-drop - 如何使用 ag-grid 将文件拖放到一行中?
- c++ - 在 C++11 及更高版本中,在“范围”之前进行“空”检查是否仍然是最佳选择?
- dart - Flutter:Stepper onStepContinue不触发
- r - 特定列的所有值的显示 ID 为 NA r
- java - 无法索引类 module-info.class
- nginx - 带有 Nginx conf 的未知指令“proxy_cache_revalidate on”
- swift3 - 如何使用 swift 程序从 htmlString 中删除反斜杠?
- git - 从 commit-id 中检索 gerrit 链接
- php - 查询返回错误的数组编码器
- common-lisp - 打开文件,如果文件不存在,则执行其他操作