java - 如何使用 QueueChannel 和 ServiceActivator 正确配置 TCP inboundAdapter
问题描述
我正在尝试配置一个 TCP 套接字,它name,value
以不同消息的格式接收数据。这些消息平均每秒到达,有时更快,有时更慢。
我能够设置一个工作配置,但我对 Spring Integration 中实际发生的事情缺乏基本的了解。
我的配置文件如下所示:
@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
@Bean
public IntegrationFlow server(
final CSVProcessingService csvProcessingService,
@Value("${tcp.socket.server.port}") final int port
)
{
return IntegrationFlows.from(
Tcp.inboundAdapter(
Tcp.nioServer(port)
.deserializer(serializer())
.leaveOpen(true)
)
.autoStartup(true)
.outputChannel(queueChannel())
).transform(new ObjectToStringTransformer())
.handle(csvProcessingService)
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller()
{
return Pollers.fixedDelay(50, TimeUnit.MILLISECONDS).get();
}
@Bean
public MessageChannel queueChannel()
{
return MessageChannels.queue("queue", 50).get();
}
@Bean
public ByteArrayLfSerializer serializer()
{
final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();
serializer.setMaxMessageSize(10240);
return serializer;
}
}
CSVProcessingService
看起来像这样(缩写)
:
@Slf4j
@Service
public class CSVProcessingService
{
@ServiceActivator
public void process(final String message)
{
log.debug("DATA RECEIVED: \n" + message);
final CsvMapper csvMapper = new CsvMapper();
final CsvSchema csvSchema = csvMapper.schemaFor(CSVParameter.class);
if (StringUtils.contains(message, StringUtils.LF))
{
processMultiLineInput(message, csvMapper, csvSchema);
}
else
{
processSingleLineInput(message, csvMapper, csvSchema);
}
}
}
我对此配置的目标如下:
- 在配置的端口上接收消息
- 在不丢失消息的情况下承受更高的负载
- 反序列化消息
- 将它们放入队列通道
- (理想情况下也记录错误)
- 队列通道每 50 毫秒轮询一次,来自队列通道的消息传递到
ObjectToStringTransformer
- 在转换器之后,转换后的消息被传递给
CSVProcessingService
进一步处理
我是正确地实现了所有这些目标,还是因为我误解了 Spring Integration 而犯了错误?是否有可能以某种方式结合Poller
和@ServiceActivator
?
此外,我在可视化我配置的 IntegrationFlow 如何实际“流动”时遇到问题,也许有人可以帮助我更好地理解这一点。
编辑:
在 Artems 发表评论后,我重新设计了我的配置。现在看起来像这样:
@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
@Value("${tcp.socket.server.port}") int port;
@Bean
public IntegrationFlow server(
final CSVProcessingService csvProcessingService
)
{
return IntegrationFlows.from(
Tcp.inboundAdapter(
tcpNioServer()
)
.autoStartup(true)
.errorChannel(errorChannel())
)
.transform(new ObjectToStringTransformer())
.handle(csvProcessingService)
.get();
}
@Bean
public AbstractServerConnectionFactory tcpNioServer()
{
return Tcp.nioServer(port)
.deserializer(serializer())
.leaveOpen(true)
.taskExecutor(
new ThreadPoolExecutor(0, 20,
30L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new DefaultThreadFactory("TCP-POOL"))
).get();
}
@Bean
public MessageChannel errorChannel()
{
return MessageChannels.direct("errors").get();
}
@Bean
public IntegrationFlow errorHandling()
{
return IntegrationFlows.from(errorChannel()).log(LoggingHandler.Level.DEBUG).get();
}
@Bean
public ByteArrayLfSerializer serializer()
{
final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();
serializer.setMaxMessageSize(10240);
return serializer;
}
}
我还@ServiceActivator
从方法中删除了注释CSVProcessingService#process
。
解决方案
不确定是什么让您感到困惑,但您的配置和逻辑看起来不错。
您可能会错过中间不需要 a 的事实QueueChannel
,因为 anAbstractConnectionFactory.processNioSelections()
已经是多线程的,它会安排一个任务从套接字读取消息。所以,你只需要配置一个合适Executor
的 for Tcp.nioServer()
. 尽管无论如何它都是Executors.newCachedThreadPool()
默认的。
另一方面,在内存中QueueChannel
,您确实可能会丢失消息,因为它们已经从网络中读取。
当你做 Java DSL 时,你应该考虑poller()
在端点上使用选项。如果您在那里有属性,则将起作用,但在将覆盖该属性时使用相同,@Poller
因此您将不会被应用。不要将 Java DSL 和注解配置混为一谈!@ServiceActivator
inputChannel
handle()
inputChannel
@Poller
您的配置中的其他一切都很好。
推荐阅读
- python - Python seaborn kdeplot 看起来与使用 R 的相同密度曲线非常不同?
- php - 为什么所有的php代码都可以解码
- angularjs - AngularJs - 在 ui-router 视图中看不到传单地图
- javascript - 如何在 html 中获取数组的所有部分?
- javascript - github 中的 Codacy 功能给了我一些我无法理解的错误
- function - Unix shell - 加载从子脚本可见的函数
- c# - 将非托管方法作为回调传递给托管 C++/CLI 类
- debugging - 如果我在调试应用程序时留下所有打印语句以供将来参考,已发布的应用程序是否有任何问题?
- sass - 在 mixin 中使用本地 Sass 变量
- javascript - 在 React 中使用 setState 更新对象(solidity 事件过滤器)