首页 > 解决方案 > 当关联对象“陈旧”时清理 Redisson pub/sub 侦听器

问题描述

我正在尝试使用 Redis 和 Redisson 库在 Java 中实现一个简单的 Websocket 应用程序,该应用程序能够水平扩展。

Websocket 服务器基本上跟踪连接的客户端,并将收到的消息发送到 Rtopic - 这很好用。

要使用,我有在注册客户端时添加监听器的代码:它通过以下方式将客户端对象与监听器相关联:

private static RedissonClient redisson = RedissonRedisServer.createRedisConnectionWithConfig();
public static final RTopic subcriberTopic = redisson.getTopic("clientsMapTopic");

public static boolean sendToPubSub(ConnectedClient q, String message) {
        boolean[] success = {true};
        MessageListener<Message> listener = new MessageListener<Message>() {
            @Override
            public void onMessage(CharSequence channel, Message message) {
                logger.debug("The message is : " + message.getMediaId());

                try {
                    logger.debug("ConnectedClient mediaid: " + q.getMediaid() + ",Message mediaid " + message.getMediaId());
                    if (q.getMediaid().equals(message.getMediaId())) {
                        // we need to verify if the message goes to the right receiver
                        logger.debug("MESSAGE from PUBSUB to (" + q.getId() + ") @ " + q.getSession().getId() + " " + message);
                        // this is the actual message to the websocket client
                        // this executes on the wrong connected client when the connection is closed and reopened
                        q.getSession().getBasicRemote().sendText(message.getMessage());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    success[0] = false;
                }
            }
        };
        int listenerId = subcriberTopic.addListener(Message.class, listener);
}

我观察到的问题如下:

如果客户端被删除,我似乎只需要删除客户端的侦听器,但我还没有找到一个好的方法,因为虽然我在调试器中看到侦听器具有关联的连接客户端对象,但我无法在不为此添加代码的情况下检索它们。

我是否正确地观察到了这一点,什么是使它正常工作的好方法?

标签: javawebsocketredisredisson

解决方案


当我写这个问题时,我有点倾向于一个我想到的答案并尝试过,它奏效了。我添加了一个 ConcurrentHashmap 来跟踪连接的客户端和侦听器之间的关系。在我处理指向客户端删除的 websocket 错误的逻辑中,然后我删除了关联的侦听器(以及地图中的条目)。现在它按预期工作。

小片段:

int listenerId = subcriberTopic.addListener(Message.class, listener);
clientListeners.put(q,(Integer)listenerId);

然后在触发清理的 websocket onError 处理程序中:

// remove the associated listener
int listenerIdForClient = MessageContainer.clientListeners.get(cP);
MessageContainer.subcriberTopic.removeListener((Integer) listenerIdForClient);
// remove entry from map
MessageContainer.clientListeners.remove(cP);

现在侦听器已正确清理,下次创建新侦听器并处理消息时。


推荐阅读