java - 如何使用 JMS API 从 Solace 侦听器向 Solace 队列发送 NACK?
问题描述
需要您的帮助才能找到解决方案。当前实施细节:
SolConnectionFactory connectionFactory = SolJmsUtility.createConnectionFactory(); // for create connection factory using host , vpn,trust store,keystore with auth scheme AUTHENTICATION_SCHEME_CLIENT_CERTIFICATE
this.connection = connectionFactory.createConnection(); // connection creation
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // session creation using client acknowledge
使用 MessageListener 监听队列
public class MyListener implements MessageListener
public void onMessage(Message message) // receive the message and process
{
/* in this method validating the message (poison message)
* and publish to kafka, once receive the success message
* from kafka acknowledge the message
*/
message.acknowledge();
}
问题:如果 Kafka 代理宕机了如何重试?我们与内部 Solace 团队进行了讨论。
我们假设如果客户端没有调用message.acknowledge()
Solace 将进行重试,但内部 Solace 团队澄清它正在使用窗口机制,因此如果后续消息是确认,那么之前的消息也会确认并删除。
他们的建议是发回失败消息的 NACK(否定确认),然后 Solace 可以重试该消息。如何使用 Java API 发送 NACK?
Max_Un_Ack_Message
和的建议值是Max_Redeliver_Count
多少?
Solace 的 Maven 依赖项:
<dependency>
<groupId>com.solacesystems</groupId>
<artifactId>sol-jms</artifactId>
<version>10.0.0</version>
</dependency>
解决方案
如果您使用 JMS API 并且想要触发重新交付,那么您有几个选择。
您可以使用事务处理会话。当处理消息成功时,您确认消息commit()
和javax.jms.Session
. 当处理不成功时,您rollback()
调用javax.jms.Session
.
或者,您可以使用javax.jms.Session.recover()
. 这是 JavaDoc 所说的:
在此会话中停止消息传递,并使用最旧的未确认消息重新开始消息传递。
所有消费者都按顺序传递消息。确认收到的消息会自动确认已传递给客户端的所有消息。
重新启动会话会导致它执行以下操作:
- 停止消息传递
- 将所有可能已送达但未确认的邮件标记为“重新送达”
- 重新启动传递序列,包括之前已传递的所有未确认消息。重新传递的消息不必完全按照其原始传递顺序传递。
当然,这假定 Solace JMS 客户端实际上实现了此处的指定行为。
推荐阅读
- orm - Toplink 到 Eclipselink 迁移支持
- powershell - 在 PowerShell DSC 资源中捕获异常
- c# - 使用导航属性的 T-SQL 到 LINQ 到 SQL
- php - 无法将 PHP 会话变量返回给 JQuery
- vba - (预期表达式)vba 错误
- python - struct staticfield 数据类型的新构造 2.9 语法
- javascript - 多个 Var 值 - jQuery
- google-cloud-platform - 如何在云数据流 python 管道中读取多种数据存储类型
- python - 使用 Python 通过 ftp 上传 csv 文件
- python - 为什么使用 print(iterable) 时输出不打印?