spring-integration - Spring集成TCP/IP关闭连接问题
问题描述
我使用 spring 集成作为全双工通信系统的网关模块。该流程是客户端应用程序<--> spring-integration-ip-module(siid)<-->服务器应用程序问题是当客户端应用程序关闭时,ssid无法关闭与服务器应用程序端的连接?这是我的代码
// siid connet to client
@Bean
public TcpNetServerConnectionFactory server(){
TcpNetServerConnectionFactory server=new TcpNetServerConnectionFactory(1234);
server.setMapper(new TcpSerMapper()); // use 'mapper' attribute in XML
MySerializer mySeri=new MySerializer();
server.setDeserializer(mySeri);
server.setSerializer(mySeri);
return server;
}
// inboundGateway, inChannel as reqeustChannel
@Bean
public TcpInboundGateway inGate(){
TcpInboundGateway inGate=new TcpInboundGateway();
inGate.setConnectionFactory(server());
inGate.setRequestChannelName("inChannel");
inGate.setReplyChannelName("outputChannel");
return inGate;
}
// serviceActivator to get inChannel's payload msg and send though a gateway.
@ServiceActivator(inputChannel = "inChannel")
public byte[]doClientForward(Message<?> msg){
byte[]msgPayload=(byte[])(msg.getPayload());
byte[]sendResult=null;
ToTCP toTcp=(ToTCP)contextBean.get("toTcpBean"); // ToTCP is a gateway
sendResult=toTcp.sends((msgPayload),"localhost",7779);
QueueChannel outputChannel=(QueueChannel)contextBean.get("outputChannel");
return sendResult;
}
public static class DynamicSerSeri extends AbstractPooledBufferByteArraySerializer {
protected byte[] doDeserialize(InputStream inputStream, byte[] buffer) throws IOException {
byte[] bytes = this.copyBuffer(inputStream, buffer);
return bytes;
}
public void serialize(byte[] object, OutputStream outputStream) throws IOException {
outputStream.write(object);
}
public byte[] copyBuffer(InputStream inputStream, byte[] buffer) throws IOException {
int n = 0;
int bite = 0;
try {
while (true) {
bite = inputStream.read(); // blocked here
this.setMaxMessageSize(inputStream.available() + 1);
buffer = new byte[inputStream.available() + 1];
if (bite < 0 && n == 0) {
throw new SoftEndOfStreamException("Stream closed between payloads");
}
checkClosure(bite);
buffer[n++] = (byte) bite;
if (bite == -1) {
break;
}
if (n == this.maxMessageSize) {
break;
}
}
return buffer;
} catch (SoftEndOfStreamException e) {
throw e; // I was stuck here. when client closed, cf can't receive this exception and send close singnal to server side
} catch (IOException e) {
publishEvent(e, buffer, n);
throw e;
} catch (RuntimeException e) {
publishEvent(e, buffer, n);
throw e;
}
}
}
@MessagingGateway()
public interface ToTCP {
@Gateway(requestChannel = "toTcp.input", replyChannel = "outputChannel")
public byte[] sends(byte[] data, @Header("host") String host, @Header("port") int port);
}
@Bean
public IntegrationFlow toTcp() {
return f -> f.route(new ClientTcpRouter());
}
// I am not sure I understand IntegrationFlowContext,but it works
public static class ClientTcpRouter extends AbstractMessageRouter {
@Autowired
private IntegrationFlowContext flowContext;
@Override
protected synchronized Collection<MessageChannel> determineTargetChannels(Message<?> message) {
// connection to server side.
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port); //?? this connection factory does's closed when inGate's connection factory throw SoftEndOfStreamException
TcpOutboundGateway handler = new TcpOutboundGateway();
handler.setConnectionFactory(cf);
cf.setDeserializer(new DynamicSerSeri());
cf.setSerializer(new DynamicSerSeri());
IntegrationFlow flow = f -> f.handle(handler);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow)
.addBean(cf)
.id(hostPort + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(hostPort, inputChannel);
return inputChannel;
}
}
TcpInboundGateway 获取从客户端到 inputChannel 的连接,我使用 serviceActivator 获取 inputChannel 的有效负载并通过 TcpOutboundGateway 发送到服务器端,该 TcpOutboundGateway 与服务器端有一个连接工厂。当客户端关闭与 spring-integration-ip-module 的连接时,TcpInboundGateway 可以在 SoftEndOfStreamException 中得到异常,但我不知道如何关闭 TcpOutboundGateway 与服务器端的连接。
解决方案
使用ApplicationListener
bean 或@EventListener
方法来监听TCP 事件。
首次打开出站连接时,您将获得一个TcpConnectionOpenEvent
. 默认情况下,它会在调用线程上发布(并将在其上接收)。您可以将出站连接 ID 与入站相关联。
TcpConnectionCloseEvent
从入站连接工厂监听;然后,您可以使用其关闭出站连接connectionId
。
outboundFactory.closeConnection(connectionId);
编辑
由于您使用的是 a TcpNetServerConnectionFactory
,因此您可以使用 a ThreadAffinityClientConnectionFactory
,它将自动将传出连接与传入连接相关联。
当您收到关闭传入连接的事件时,它将位于同一线程上,因此您只需调用releaseConnection()
该线程即可关闭传出连接。
这是一个例子
@SpringBootApplication
public class So55207274Application {
public static void main(String[] args) {
SpringApplication.run(So55207274Application.class, args);
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(Tcp.inboundGateway(server()))
.log()
.handle(Tcp.outboundGateway(threadBoundClient()))
.get();
}
@Bean
public TcpNetServerConnectionFactory server() {
return new TcpNetServerConnectionFactory(1234);
}
@Bean
public ThreadAffinityClientConnectionFactory threadBoundClient() {
return new ThreadAffinityClientConnectionFactory(client());
}
public TcpNetClientConnectionFactory client() {
TcpNetClientConnectionFactory client = new TcpNetClientConnectionFactory("localhost", 1235);
client.setSingleUse(true);
return client;
}
@EventListener
public void listen(TcpConnectionCloseEvent event) {
if (event.getConnectionFactoryName().equals("server")) {
try {
threadBoundClient().releaseConnection();
}
catch (Exception e) {
e.printStackTrace();
}
}
System.out.println(event);
}
// Test server
@Bean
public IntegrationFlow test() {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1235)))
.transform(Transformers.objectToString())
.<String, String>transform(p -> p.toUpperCase())
.get();
}
}
推荐阅读
- php - 语法错误或访问冲突:1064 - PDO 拒绝执行多个查询(包括 CREATE TEMPORARY TABLE)
- visual-studio-code - 如何在 VSCode 扩展中添加这些代码完成功能?
- webpack - Webpack、Babel 7.4.0 和 core-js 3
- aws-pinpoint - 是否可以通过 REST API 创建 Amazon Pinpoint 获利事件?
- javascript - Date.getDate() 函数是如何工作的?
- c# - 如何在 ilNumerics 中使用 ZXYPositions 正确绘制 3D 表面?
- r - 如何调用R6父类中的函数,其中父函数依赖于其他重写的辅助函数
- python - 如何将 HTML 表中的数据抓取到 Python 列表/字典中?
- barcode - Acumatica 和 code128 条码无法扫描
- android-studio - 如何在 Android Studio 上摆脱这个鼠标指针并获得可编辑的指针