首页 > 解决方案 > RSocket 如何向多个客户端发出租约?

问题描述

我可以为单个客户端创建服务器租约,如下所示:</p>

    @Slf4j
    public class LeaseServer {
    
        private static final String SERVER_TAG = "server";
    
        public static void main(String[] args) throws InterruptedException {
            // Queue for incoming messages represented as Flux
            // Imagine that every fireAndForget that is pushed is processed by a worker
            int queueCapacity = 50;
            BlockingQueue<String> messagesQueue = new ArrayBlockingQueue<>(queueCapacity);
            // emulating a worker that process data from the queue
            Thread workerThread =
                    new Thread(
                            () -> {
                                try {
                                    while (!Thread.currentThread().isInterrupted()) {
                                        String message = messagesQueue.take();
                                        System.out.println("consume message:&quot; + message);
                                        Thread.sleep(100000); // emulating processing
                                    }
                                } catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                            });
            workerThread.start();
            CloseableChannel server = getFireAndForgetServer(messagesQueue, workerThread);
            TimeUnit.MINUTES.sleep(10);
            server.dispose();
        }
    
        private static CloseableChannel getFireAndForgetServer(BlockingQueue<String> messagesQueue, Thread workerThread) {
            CloseableChannel server =
                    RSocketServer.create((setup, sendingSocket) ->
                            Mono.just(new RSocket() {
                                @Override
                                public Mono<Void> fireAndForget(Payload payload) {
                                    // add element. if overflows errors and terminates execution
                                    // specifically to show that lease can limit rate of fnf requests in
                                    // that example
                                    try {
                                        if (!messagesQueue.offer(payload.getDataUtf8())) {
                                            System.out.println("Queue has been overflowed. Terminating execution");
                                            sendingSocket.dispose();
                                            workerThread.interrupt();
                                        }
                                    } finally {
                                        payload.release();
                                    }
                                    return Mono.empty();
                                }
                            }))
                            .lease(() -> Leases.create().sender(new LeaseCalculator(SERVER_TAG, messagesQueue)))
                            .bindNow(TcpServerTransport.create("localhost", 7000));
            return server;
        }
    }

但是如何向连接到该服务器的多个客户端发出租约?

否则我的队列会被多个客户端多次写入,导致服务溢出。

我在公开的文件和资料中找不到详细信息。

非常感谢您的帮助。

标签: backpressurersocket

解决方案


推荐阅读