java - Apache ActiveMq Artemis 客户端重新连接到集群 HA 复制/共享数据存储中的下一个可用代理
问题描述
Broker.xml (host1) 和 host2 只是端口号更改为 61616 和 slave 作为配置。参考Apache Artemis 客户端故障转移发现
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/largemessages</large-messages-directory>
<paging-directory>./data/paging</paging-directory>
<!-- Connectors -->
<connectors>
<connector name="netty-connector">tcp://10.64.60.100:61617</connector><!-- direct ip addres of host myhost1 -->
<connector name="broker2-connector">tcp://myhost2:61616</connector> <!-- ip 10.64.60.101 <- mocked up ip for security reasons -->
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="amqp">tcp://0.0.0.0:61617?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true</acceptor>
</acceptors>
<ha-policy>
<replication>
<master/>
</replication>
</ha-policy>
<cluster-connections>
<cluster-connection name="myhost1-cluster">
<connector-ref>netty-connector</connector-ref>
<retry-interval>500</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>ON_DEMAND</message-load-balancing>
<max-hops>1</max-hops>
<static-connectors>
<connector-ref>broker2-connector</connector-ref> <!-- defined in the connectors -->
</static-connectors>
</cluster-connection>
</cluster-connections>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-created-queues>false</auto-delete-created-queues>
<auto-delete-addresses>false</auto-delete-addresses>
</address-setting>
</address-settings>
</core>
</configuration>
客户端使用 Java 和生产者模板将消息推送到直接端点,然后路由到队列产生消息:
说Java-camel-producer-client
package com.demo.artemis;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.spring.SpringCamelContext;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ToRoute {
public static void main(String[] args) throws Exception {
ApplicationContext appContext = new ClassPathXmlApplicationContext(
"camel-context-producer.xml");
ProducerTemplate template = null;
CamelContext camelContext = SpringCamelContext.springCamelContext(
appContext, false);
try {
camelContext.start();
template = camelContext.createProducerTemplate();
String msg = null;
int loop =0;
int i = 0;
while(true) {
if (i%10000==0) {
i=1;
loop=loop+1;
}
if(loop==2) break;
msg ="---> "+(++i);
template.sendBody("direct:toDWQueue", msg);
}
} finally {
if (template != null) {
template.stop();
}
camelContext.stop();
}
}
}
向队列发送消息的骆驼上下文:说camel-producer-client
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="jmsConnectionFactory" class="org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory">
<constructor-arg index="0" value="(tcp://myhost:61616,tcp://myhost1:61617)?ha=true;reconnectAttempts=-1;"/>
</bean>
<bean id="jmsPooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop">
<property name="maxConnections" value="10" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jmsPooledConnectionFactory" />
<property name="concurrentConsumers" value="5" />
</bean>
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="configuration" ref="jmsConfig" />
<property name="streamMessageTypeEnabled" value="true"/>
</bean>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<endpoint id="myqueue" uri="jms:queue:myExampleQueue" />
<route>
<from uri="direct:toMyExample"/>
<transform>
<simple>MSG FRM DIRECT TO MyExampleQueue : ${bodyAs(String)}</simple>
</transform>
<to uri="ref:myqueue"/>
</route>
</camelContext>
</beans>
从 MyExampleQueue 读取并转换为 OutboundQueue 的骆驼消费者:(此上下文文件使用骆驼“org.apache.camel.spring.Main –ac
camel-consumer-client
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.factory.initial">org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory</prop>
<prop key="connectionFactory.ConnectionFactory">(tcp://myhost1:61617,tcp://myhost2:61616)?ha=true;retryInterval=1000;retryIntervalMultiplier=1.0;reconnectAttempts=-1;</prop>
<prop key="queue.queue/MyExampleQueue">MyExampleQueue</prop>
<prop key="queue.queue/OutBoundQueue">OutBoundQueue</prop>
</props>
</property>
</bean>
<bean id="jndiFactoryBean" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="ConnectionFactory"/>
<property name="jndiTemplate" ref="jndiTemplate"/>
<property name="cache" value="true"/>
</bean>
<bean id="jndiDestinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">
<property name="jndiTemplate" ref="jndiTemplate"/>
<property name="cache" value="true"/>
<!-- dynamic destination if the destination name is not found in JNDI -->
<property name="fallbackToDynamicDestination" value="true"/>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jndiFactoryBean"/>
<property name="destinationResolver" ref="jndiDestinationResolver"/>
<property name="concurrentConsumers" value="10" />
</bean>
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<endpoint id="inqueue" uri="jms:queue:MyExampleQueue" />
<endpoint id="outqueue" uri="jms:queue:OutBoundQueue" />
<route>
<from uri="ref:inqueue" />
<convertBodyTo type="java.lang.String" />
<transform>
<simple>MSG FRM MyExampleQueue TO OutboundQueue : ${bodyAs(String)}</simple>
</transform>
<to uri="ref:outqueue" />
</route>
</camelContext>
</beans>
我认为我面临的问题类似于这张票 http://activemq.2283324.n4.nabble.com/Connection-to-the-backup-after-master-shutdown-td4753770.html
使用 Camel 上下文时,我看到客户端消费者在主服务器不可用时被重定向到从属服务器(我使用的是 JNDI 和 JMSPoolConnectionFactory)。
使用生产者模板将Java-producer-client
消息发送到直接端点,该端点被转换并路由到队列。当主服务器关闭时,客户端无法故障转移并连接到从服务器。(问题:这是代理处于 HA 模式时的预期行为)。
重新连接尝试后,当主服务器关闭时,我看到以下异常。
Caused by: javax.jms.JMSException: AMQ219016: Connection failure detected. Unblocking a blocking call that will never get a response
另一方面camel-context-consumer
,它是使用 Main 启动的,能够自动故障转移到从属。在控制台中,我确实注意到消费者计数 10 在从属主机和处理数据中可用。但是当主节点备份时,客户端并没有切换到活动的主节点。问题:这也是预期的吗?
使用以下约定创建连接工厂。
(tcp://myhost:61616,tcp://myhost1:61617)?ha=true;reconnectAttempts=-1;
问题:我们是否需要配置广播和发现组,即使使用静态连接器?
解决方案
如果在客户端处于阻塞调用过程中发生故障转移(例如发送持久消息并等待来自代理的确认、提交事务、ETC。)。这将在文档中进一步讨论。
考虑到您的配置,客户端在返回时不会切换回主代理这一事实也是预期的。简而言之,您没有正确配置故障恢复。你的主人应该有:
<ha-policy>
<replication>
<master>
<check-for-live-server>true</check-for-live-server>
</master>
</replication>
</ha-policy>
你的奴隶应该有:
<ha-policy>
<replication>
<slave>
<allow-failback>true</allow-failback>
</slave>
</replication>
</ha-policy>
这也在文档中进行了讨论。
最后,使用静态连接器时不需要配置广播和发现组。
推荐阅读
- rest - REST:如何删除一个资源并选择删除另一个资源?
- php - axios,无法将数据从反应组件传递到 PHP 文件
- teradata - Terdata 使用井号作为字段名称
- reactjs - 表达式大小超过 Google AMP 状态中的最大值
- java - 使用轻量级流 API 流获取元素匹配的索引
- azure - 在 azure 自动化中运行 powershell 代码时失败
- apache-kafka - Kafka 主题分区在磁盘上均匀分布
- typescript - 有人可以帮我描述一下这个打字稿吗?
- orocrm - 如何将自定义图标添加到 Oro 前台菜单项?
- azure-active-directory - Azure AD:EnforceCloudPasswordPolicyForPasswordSyncedUsers 不适用于现有租户