spring - 使用 Amazon MQ 作为 Spring Websockets + STOMP 的代理中继
问题描述
是否可以使用 Amazon MQ 作为 Spring + Websockets + STOMP 的外部代理?我正在尝试没有运气。我的配置如下:
@Configuration
@EnableWebSocketMessageBroker
@AllArgsConstructor
public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic")
.setRelayHost("my.amazon.stomp.endpoint").setRelayPort(61614)
.setSystemLogin("xxxxxxxxx").setSystemPasscode("xxxxxxxxx");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-myapp").setAllowedOrigins("*").withSockJS();
}
}
但是在运行应用程序时,我从日志(调试)中得到了这个:
2018-06-13 16:16:42.290 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a] REGISTERED
2018-06-13 16:16:42.291 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a] CONNECT: my.amazon.stomp.endpoint:61614
2018-06-13 16:16:42.398 INFO [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : CONNECTED: [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614]
2018-06-13 16:16:42.399 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] ACTIVE
2018-06-13 16:16:42.399 DEBUG [] 17743 --- [eactor-tcp-io-5] o.s.m.s.s.StompBrokerRelayMessageHandler : TCP connection opened in session=_system_
2018-06-13 16:16:42.404 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] WRITE: 94B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 43 4f 4e 4e 45 43 54 0a 61 63 63 65 70 74 2d 76 |CONNECT.accept-v|
|00000010| 65 72 73 69 6f 6e 3a 31 2e 31 2c 31 2e 32 0a 6c |ersion:1.1,1.2.l|
|00000020| 6f 67 69 6e 3a 70 61 70 65 72 6c 65 73 73 0a 70 |ogin:xxxxxxxxx.p|
|00000030| 61 73 73 63 6f 64 65 3a 4d 38 7c 42 61 6e 41 47 |asscode:xxxxxxxx|
|00000040| 4c 2d 45 61 0a 68 65 61 72 74 2d 62 65 61 74 3a |xxxx.heart-beat:|
|00000050| 31 30 30 30 30 2c 31 30 30 30 30 0a 0a 00 |10000,10000... |
+--------+-------------------------------------------------+----------------+
2018-06-13 16:16:42.405 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] FLUSH
2018-06-13 16:16:42.406 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] USER_EVENT: reactor.io.net.impl.netty.NettyChannelHandlerBridge$ChannelInputSubscriberEvent@37c47287
2018-06-13 16:16:42.512 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] READ: 7B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 15 03 03 00 02 02 0a |....... |
+--------+-------------------------------------------------+----------------+
2018-06-13 16:16:42.513 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] READ COMPLETE
2018-06-13 16:16:42.514 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] READ COMPLETE
2018-06-13 16:16:42.623 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 ! R:my.amazon.stomp.endpoint:61614] INACTIVE
2018-06-13 16:16:42.624 DEBUG [] 17743 --- [eactor-tcp-io-5] o.s.m.s.s.StompBrokerRelayMessageHandler : TCP connection to broker closed in session _system_
2018-06-13 16:16:42.624 DEBUG [] 17743 --- [eactor-tcp-io-5] o.s.m.s.s.StompBrokerRelayMessageHandler : Cleaning up connection state for session _system_
2018-06-13 16:16:42.624 INFO [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : CLOSED: [id: 0xcd26491a, L:/my.local.ip:50392 ! R:my.amazon.stomp.endpoint:61614]
2018-06-13 16:16:42.625 INFO [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : Failed to connect to reactor.io.net.impl.netty.tcp.NettyTcpClient$ReconnectingChannelListener$3@1ad2dfa9. Attempting reconnect in 5000ms.
2018-06-13 16:16:42.625 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 ! R:my.amazon.stomp.endpoint:61614] UNREGISTERED
它没有连接。有任何想法吗?它在连接到本地 ActiveMQ 实例时完美运行,不同之处在于它在本地使用 tcp://host vs Amazon,后者提供 stomp+ssl://host uri。
无论如何,我不包括协议。它似乎已连接,但未收到对 CONNECT 消息的任何答复。我正在设置用于连接到管理控制台的用户和密码。我可以通过 javascript 连接到 wss:// 端点,但我需要将其设置为 Spring Boot 应用程序的外部代理。
解决方案
最后,问题在于启用 SSL 协议。我是这样做的:
1)我创建了一个自定义的 StompTcpFactory,以创建一个启用 SSL 的 tcp 客户端:
/**
* A TCP Client Factory to enable SSL connection
*/
public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {
private final Environment environment;
private final EventLoopGroup eventLoopGroup;
private final boolean ssl;
private final List<String> addresses;
public StompTcpFactory(List<String> addresses, boolean ssl) {
this.addresses = addresses;
this.ssl = ssl;
this.environment = new Environment(
() -> new ReactorConfiguration(Collections.emptyList(), "sync", new Properties()));
this.eventLoopGroup = Reactor2TcpClient.initEventLoopGroup();
}
@Override
public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
Supplier<InetSocketAddress> supplier = new InetSocketAddressSupplier(addresses);
return tcpClientSpec.env(environment).options(new NettyClientSocketOptions().eventLoopGroup(eventLoopGroup))
.codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder())).ssl(ssl ? new SslOptions() : null)
.connect(supplier);
}
}
注意 .ssl 调用设置标志以使用 ssl。我还收到了一个地址列表来配置端点和一个布尔值,以便能够打开和关闭 ssl(用于开发)。创建 InetSocketAddressSupplier 是为了在使用 HA 设置时亚马逊为您提供的 2 个地址之间切换:
/**
* Address supplier for failover connection
*/
@Slf4j
public class InetSocketAddressSupplier implements Supplier<InetSocketAddress> {
private static int counter = 0;
private final List<String> addresses;
InetSocketAddressSupplier(List<String> addresses) {
Assert.notNull(addresses, "addresses list cannot be null");
Assert.isTrue(!addresses.isEmpty() && (addresses.size() == 2), "Addresses list must be of size 2");
this.addresses = addresses;
}
@Override
public InetSocketAddress get() {
int serverIndex = counter % 2;
counter++;
String[] info = addresses.get(serverIndex).split(":");
log.debug("Returning server {} {}:{} for connection", serverIndex, info[0], info[1]);
return new InetSocketAddress(info[0], Integer.valueOf(info[1]));
}
}
如果您不需要 HA 设置,您可以简单地创建一个内联供应商,如Spring 文档中所示。
2)然后您可以配置您的 websocket 端点以使用此工厂创建客户端,并设置系统和客户端连接的用户/密码:
@Slf4j
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {
@Qualifier("websocket")
private MessageBrokerProperties config;
private InetSocketAddressSupplier supplier;
public WebSocketConfiguration(MessageBrokerProperties config) {
this.config = config;
this.supplier = new InetSocketAddressSupplier(config.getAddresses());
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
InetSocketAddress address = supplier.get();
registry.enableStompBrokerRelay("/topic").setRelayHost(address.getHostName()).setRelayPort(address.getPort())
.setSystemLogin(config.getSystemLogin()).setSystemPasscode(config.getSystemPasscode())
.setClientLogin(config.getClientLogin()).setClientPasscode(config.getClientPasscode())
.setTcpClient(createTcpClient());
registry.setApplicationDestinationPrefixes("/app");
}
private TcpOperations<byte[]> createTcpClient() {
return new Reactor2TcpClient<>(new StompTcpFactory(config.getAddresses(), config.isUseSSL()));
}
}
希望能帮助到你...
推荐阅读
- sql-server - 从表 1 中获取平均值,求和表 2 中的匹配区间
- python - 在 if 语句检查,Python 池上休眠线程
- python - Python cx_Oracle 绑定变量列表
- regex - 如何在 Visual Studio Code 正则表达式中查找和替换某些文本?
- javascript - 使用 push() 方法时如何避免二维数组中的重复?
- airflow - 将其他参数传递给 on_failure_callback
- c++ - Visual Studio 无法打开项目
- python - 我什么时候可以在 Django 中使用“请求”对象?
- tensorflow - 在数据集解析器函数中加载 NumPy 数组
- spring-cloud-sleuth - 使用 spring-cloud-sleuth 自定义跟踪日志