首页 > 解决方案 > java-nats-streaming:服务器重新连接后发布消息

问题描述

我有一个设置了 3 个节点的 NATS 流集群。我的 java 应用程序在服务器停机期间发布的 NATS 消息似乎丢失了(即当我的服务器备份并运行时不会再次重新发布)。

更详细的描述:

  1. NATS 集群在线。发布者和订阅者应用程序上线。Publisher 开始每秒发布一条消息。订阅者接收消息。
  2. NATS 服务器已关闭。Publisher 继续发布消息(我们称这些消息为“离线消息”)。订阅者停止接收任何内容
  3. NATS 服务器重新联机。订阅者再次开始接收消息,但从未收到“离线消息”。

我的发布者和订阅者应用程序都配置为尝试重新连接到 NATS 服务器并且不会超时。我没有得到任何例外。

NATS 连接:

Options options = new Options.Builder().servers(serverList).maxReconnects(-1).build();

Connection nc = Nats.connect(options);

StreamingConnectionFactory cf = new StreamingConnectionFactory(natsProperties.getClusterId(), natsProperties.getClientId());
cf.setNatsConnection(nc);
streamingConnection = cf.createConnection();

出版商:

// subject and message String variables are passed in
streamingConnection.publish(subject, message.getBytes());

订户:

streamingConnection.subscribe(subject, new MessageHandler() {
    public void onMessage(Message m) {
        System.out.prinf("Received msg: %s\n", m.getData())
    }
},  new SubscriptionOptions.Builder().durableName(durableName).build());

从文档来看,Java NATS 客户端似乎内置了一个重新连接缓冲区。我尝试将缓冲区增加 10 倍但无济于事(另外,我的消息仅包含 2 位数字)。如何让它重新发送这些“离线消息”?

标签: javanats-streaming-server

解决方案


我有同样的问题,我看到另一种订阅方法被占用的唯一解决方案,保存消息序列但我认为这不是最好的

   // Receive messages starting at a specific sequence number
   sc.subscribe("foo", new MessageHandler() {
   public void onMessage(Message m) {
     logger.info("Sequence message " +  m.getSequence());
     System.out.printf("Received a message: %s\n", m.getData());
   }
   }, new SubscriptionOptions.Builder().startAtSequence(22).build());

推荐阅读