replication - ActiveMQ Artemis 复制无法正常工作
问题描述
我有一个 ActiveMQ Artemis 集群,并将它们配置为通过网络共存的复制。
这是它们的两个配置:
经纪人1
<?xml version='1.0'?>
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>10.1.1.130</name>
<persistence-enabled>true</persistence-enabled>
<!-- this could be ASYNCIO, MAPPED, NIO
ASYNCIO: Linux Libaio
MAPPED: mmap files
NIO: Plain Java Files
-->
<journal-type>ASYNCIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<journal-buffer-timeout>23940000</journal-buffer-timeout>
<journal-max-io>1</journal-max-io>
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<!-- Clustering configuration -->
<connectors>
<connector name="netty-connector">tcp://localhost:61616</connector>
<!-- connector to the server2 -->
<connector name="server2-connector">tcp://10.1.1.131:61616</connector>
</connectors>
<ha-policy>
<replication>
<colocated>
<backup-request-retries>-1</backup-request-retries>
<backup-request-retry-interval>2000</backup-request-retry-interval>
<excludes>
<connector-ref>server2-connector</connector-ref>
<connector-ref>netty-connector</connector-ref>
</excludes>
<max-backups>1</max-backups>
<request-backup>true</request-backup>
<master>
</master>
<slave>
</slave>
</colocated>
</replication>
</ha-policy>
<cluster-user>ACTIVEMQ.CLUSTER.ADMIN.USER</cluster-user>
<cluster-password>123456</cluster-password>
<cluster-connections>
<cluster-connection name="my-cluster">
<connector-ref>netty-connector</connector-ref>
<retry-interval>500</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>STRICT</message-load-balancing>
<max-hops>1</max-hops>
<static-connectors>
<connector-ref>server2-connector</connector-ref>
</static-connectors>
</cluster-connection>
</cluster-connections>
<acceptors>
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:4883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
经纪人2:
<?xml version='1.0'?>
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>10.1.1.131</name>
<persistence-enabled>true</persistence-enabled>
<!-- this could be ASYNCIO, MAPPED, NIO
ASYNCIO: Linux Libaio
MAPPED: mmap files
NIO: Plain Java Files
-->
<journal-type>ASYNCIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<journal-buffer-timeout>23940000</journal-buffer-timeout>
<journal-max-io>1</journal-max-io>
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<!-- Clustering configuration -->
<connectors>
<connector name="netty-connector">tcp://localhost:61616</connector>
<!-- connector to the server1 -->
<connector name="server1-connector">tcp://10.1.1.130:61616</connector>
</connectors>
<ha-policy>
<replication>
<colocated>
<backup-request-retries>-1</backup-request-retries>
<backup-request-retry-interval>2000</backup-request-retry-interval>
<excludes>
<connector-ref>server1-connector</connector-ref>
<connector-ref>netty-connector</connector-ref>
</excludes>
<max-backups>1</max-backups>
<request-backup>true</request-backup>
<master>
</master>
<slave>
</slave>
</colocated>
</replication>
</ha-policy>
<cluster-user>ACTIVEMQ.CLUSTER.ADMIN.USER</cluster-user>
<cluster-password>123456</cluster-password>
<cluster-connections>
<cluster-connection name="my-cluster">
<connector-ref>netty-connector</connector-ref>
<retry-interval>500</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>STRICT</message-load-balancing>
<max-hops>1</max-hops>
<static-connectors>
<connector-ref>server1-connector</connector-ref>
</static-connectors>
</cluster-connection>
</cluster-connections>
<acceptors>
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:4883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
这是发送消息的 Srping Boot java 代码:
@Configuration
public class ArtemisProducerConfig extends BaseObject {
@Value("${artemis.broker-url}")
private String brokerUrl;
@Bean
public ActiveMQConnectionFactory senderActiveMQConnectionFactory() {
return new ActiveMQConnectionFactory(brokerUrl);
}
@Bean
public CachingConnectionFactory cachingConnectionFactory() {
return new CachingConnectionFactory(senderActiveMQConnectionFactory());
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate(cachingConnectionFactory());
template.setExplicitQosEnabled(true);
template.setDeliveryPersistent(true);
return template;
}
}
jmsTemplate.convertAndSend("test.address::test.queue", inputData.getData(), new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws JMSException {
// TODO Auto-generated method stub
message.setJMSCorrelationID(inputData.getCorrID());
return message;
}
});
消息发送到 Broker1 成功,我可以在网站http://10.1.1.130:8161上查看。但是在 Broker2 上现在没有可用的消息。我知道消息必须备份到 Broker2 才能满足 HA。
有人可以帮我举个例子,将 Artemis 配置为通过网络进行共置复制吗?谢谢!
解决方案
ActiveMQ Artemis 使用主动/被动方案来实现高可用性。因此,在复制用例中,一个主动的“活动”代理有一个被动的“从属”代理,它会将消息复制到该代理。在复制和托管配置中,每个 JVM 实际上有 2 个代理(即两个代理位于同一个 JVM 中)。一个代理处于活动状态,另一个代理充当集群中另一个代理的备份。你不会“看到”从服务器上的消息,直到实时代理失败,此时从服务器将激活并成为主服务器。
要确认复制按预期进行,您可以检查主服务器和从服务器上的日志文件。奴隶首先会有这样的东西:
INFO [org.apache.activemq.artemis.core.server] AMQ221109: Apache ActiveMQ Artemis Backup Server version X.X.X [null] started, waiting live to fail before it gets active
然后live会有这样的东西:
INFO [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /path/to/data/journal/activemq-data-2.amq (size=10,485,760) to replica.
INFO [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /path/to/data/bindings/activemq-bindings-3.bindings (size=1,048,576) to replica.
INFO [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /path/to/data/bindings/activemq-bindings-2.bindings (size=1,048,576) to replica.
INFO [org.apache.activemq.artemis.core.server] AMQ221024: Backup server ActiveMQServerImpl::serverUUID=8d7477d0-1518-11ea-abd1-a0afbd82eaba is synchronized with live-server.
然后最终奴隶将拥有:
INFO [org.apache.activemq.artemis.core.server] AMQ221031: backup announced
推荐阅读
- javascript - 持续时间超过 24 小时的 Jquery 输入掩码
- cmake - 使用 CMake,如何在 gtest_discover_tests --gtest_list_tests 调用上设置环境属性?
- c# - 无法加载文件或程序集 'System.ComponentModel.Annotations,版本 = 4.2.0.0,
- javascript - 从本地存储中获取数据(最喜欢的项目)
- azure - 在 Azure 的数据工厂中为 Postgresql(源)创建链接服务时无法连接服务器
- python - 如何在 Visual Studio Code 中获取 Code Runner 扩展以使用选定的 Python 解释器?
- javascript - 为什么我无法收到服务器的响应
- ios - 如何在 Swift 中从图像中检测条形码?
- bluetooth-lowenergy - 如何找到BLE设备不同UUID对应的句柄
- apache-spark - 此查询不支持从检查点位置恢复。删除检查点/testmemeory/offsets 重新开始