java - java-nats-streaming:服务器重新连接后发布消息
问题描述
我有一个设置了 3 个节点的 NATS 流集群。我的 java 应用程序在服务器停机期间发布的 NATS 消息似乎丢失了(即当我的服务器备份并运行时不会再次重新发布)。
更详细的描述:
- NATS 集群在线。发布者和订阅者应用程序上线。Publisher 开始每秒发布一条消息。订阅者接收消息。
- NATS 服务器已关闭。Publisher 继续发布消息(我们称这些消息为“离线消息”)。订阅者停止接收任何内容
- NATS 服务器重新联机。订阅者再次开始接收消息,但从未收到“离线消息”。
我的发布者和订阅者应用程序都配置为尝试重新连接到 NATS 服务器并且不会超时。我没有得到任何例外。
NATS 连接:
Options options = new Options.Builder().servers(serverList).maxReconnects(-1).build();
Connection nc = Nats.connect(options);
StreamingConnectionFactory cf = new StreamingConnectionFactory(natsProperties.getClusterId(), natsProperties.getClientId());
cf.setNatsConnection(nc);
streamingConnection = cf.createConnection();
出版商:
// subject and message String variables are passed in
streamingConnection.publish(subject, message.getBytes());
订户:
streamingConnection.subscribe(subject, new MessageHandler() {
public void onMessage(Message m) {
System.out.prinf("Received msg: %s\n", m.getData())
}
}, new SubscriptionOptions.Builder().durableName(durableName).build());
从文档来看,Java NATS 客户端似乎内置了一个重新连接缓冲区。我尝试将缓冲区增加 10 倍但无济于事(另外,我的消息仅包含 2 位数字)。如何让它重新发送这些“离线消息”?
解决方案
我有同样的问题,我看到另一种订阅方法被占用的唯一解决方案,保存消息序列但我认为这不是最好的
// Receive messages starting at a specific sequence number
sc.subscribe("foo", new MessageHandler() {
public void onMessage(Message m) {
logger.info("Sequence message " + m.getSequence());
System.out.printf("Received a message: %s\n", m.getData());
}
}, new SubscriptionOptions.Builder().startAtSequence(22).build());
推荐阅读
- view - 以演示模式打开 WebStorm
- logging - 在用户登录时在 zend 记录器中设置额外字段
- c++ - (ROS) 如何用相对路径保存日志文件?
- php - 如何在 PHP 的邮件中使用 PHP 三元条件运算符?
- swift - Swift,隐藏/删除窗口(查看?)
- c - 指数函数图
- docker - Mysql 通过 ssh 远程连接到 kubernetes pod
- swift - 如何在 UITableiew 标题中添加自定义控件
- laravel - 在 nova 资源索引视图中显示特定的 eloquent 查询
- javascript - How do I change the format for my uptime command its currently hours I would like to change it to days