首页 > 解决方案 > 如何使用 JMS API 从 Solace 侦听器向 Solace 队列发送 NACK?

问题描述

需要您的帮助才能找到解决方案。当前实施细节:

SolConnectionFactory connectionFactory = SolJmsUtility.createConnectionFactory(); // for create connection factory using host , vpn,trust store,keystore with auth scheme AUTHENTICATION_SCHEME_CLIENT_CERTIFICATE
this.connection = connectionFactory.createConnection(); // connection creation
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // session creation using client acknowledge

使用 MessageListener 监听队列

public class MyListener implements MessageListener 

    public void onMessage(Message message) // receive the message and process
    { 
        /* in this method validating the message (poison message) 
         * and publish to kafka, once receive the success message
         * from kafka acknowledge the message
         */
        message.acknowledge();
    }

问题:如果 Kafka 代理宕机了如何重试?我们与内部 Solace 团队进行了讨论。

我们假设如果客户端没有调用message.acknowledge()Solace 将进行重试,但内部 Solace 团队澄清它正在使用窗口机制,因此如果后续消息是确认,那么之前的消息也会确认并删除。

他们的建议是发回失败消息的 NACK(否定确认),然后 Solace 可以重试该消息。如何使用 Java API 发送 NACK?

Max_Un_Ack_Message和的建议值是Max_Redeliver_Count多少?

Solace 的 Maven 依赖项:

<dependency>
    <groupId>com.solacesystems</groupId>
    <artifactId>sol-jms</artifactId>
    <version>10.0.0</version>
</dependency>

标签: javajmssolaceretry-logic

解决方案


如果您使用 JMS API 并且想要触发重新交付,那么您有几个选择。

您可以使用事务处理会话。当处理消息成功时,您确认消息commit()javax.jms.Session. 当处理不成功时,您rollback()调用javax.jms.Session.

或者,您可以使用javax.jms.Session.recover(). 这是 JavaDoc 所说的:

在此会话中停止消息传递,并使用最旧的未确认消息重新开始消息传递。

所有消费者都按顺序传递消息。确认收到的消息会自动确认已传递给客户端的所有消息。

重新启动会话会导致它执行以下操作:

  • 停止消息传递
  • 将所有可能已送达但未确认的邮件标记为“重新送达”
  • 重新启动传递序列,包括之前已传递的所有未确认消息。重新传递的消息不必完全按照其原始传递顺序传递。

当然,这假定 Solace JMS 客户端实际上实现了此处的指定行为。


推荐阅读