java - Spring Integration 卡在网关通道而不返回轮询通道
问题描述
我有一个从 sftp 位置读取到本地计算机的要求。处理和后处理文件,然后将其移动到 sftp 位置的存档文件夹中:
我使用 Spring 集成如下:
我有一个 InboundChannelAdapter,它将有一个 cron 轮询器查看 sftp 位置并将文件拉到本地进行处理。
我有一个serviceActivator
调用处理服务的对应。然后我调用网关方法,该方法调用另一个ServiceActivator
存档文件的方法
流程是:
InboundChannelAdapter (with cron polling to pull every 10 seconds)
ServiceAdaptor
processes file
Invokes Messaginggateway method to archive
deletes file from local
ServiceAdaptor(Archiving)
文件观察器:
@Configuration
class FileWatcher {
Logger logger = LoggerFactory.getLogger(FileWatcher.class)
@Autowired
XmlParserService xmlParserService
@Autowired
UploadGateway gateway
@Value("\${ssftp-server}")
String sftpHost
@Value("\${sftp-user}")
String sftpUser
@Value("\${sftp-password}")
String sftpPassword
@Value("\${sftp-filter}")
String sftpFileFilter
@Value("\${sftp-port}")
Integer sftpPort
@Value("\${sftp-source-path}")
String sourcePath
@Value("\${sftp-local-path}")
String localPath
@Value("\${sftp-archive-path}")
String archivePath
@Value("\${sftp-error-path}")
String errorPath
@Value("\${cron.expression}")
String cronPoller
@Value("\${sessionTimeout}")
String sftpSessionTimeOut
@Value("\${channelTimeout}")
String sftpChannelTimeOut
@Bean
SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory(){
def factory = new DefaultSftpSessionFactory(false)
factory.setHost(sftpHost)
factory.setPort(sftpPort)
factory.setUser(sftpUser)
factory.setPassword(sftpPassword)
factory.setAllowUnknownKeys(true)
return new CachingSessionFactory<ChannelSftp.LsEntry>(factory)
}
@Bean
SftpInboundFileSynchronizer sftpInboundFileSynchronizer(){
def fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory())
fileSynchronizer.setDeleteRemoteFiles(true)
fileSynchronizer.setRemoteDirectory(sourcePath)
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter(sftpFileFilter))
return fileSynchronizer
}
@Bean
@InboundChannelAdapter(channel = "inputChannel",poller = [@Poller(cron="*/10 * * * * *")])
MessageSource<File> source(){
def source = new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer())
source.setLocalDirectory(new File(localPath))
source.setAutoCreateLocalDirectory(true)
source.setLocalFilter(new AcceptOnceFileListFilter())
source.maxFetchSize = 1
return source
}
@Bean
@ServiceActivator(inputChannel = "inputChannel")
MessageHandler messageHandler(){
def message = new MessageHandler() {
@Override
void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
def messageFile = message.payload as File
try{
if (messageFile.name.endsWith(".xml")){
logger.info("processing the file: " + messageFile.name)
xmlParserService.process(messageFile)
if(messageFile.exists()){
println("to archive the file")
gateway.archiveSpsAsnFile(messageFile)
println("archived the file successfully")
messageFile.delete()
}
}
}catch (IOException ioe){
logger.error("file could not be processed: " + messageFile.name)
}
}
}
return message
}
@Bean
@ServiceActivator(inputChannel = "toArchiveLocation")
MessageHandler moveFileToArchive(){
SftpMessageHandler sftpHandler = new SftpMessageHandler(sftpSessionFactory())
sftpHandler.setRemoteDirectoryExpression(new LiteralExpression(archivePath))
sftpHandler.setFileNameGenerator(new FileNameGenerator() {
@Override
String generateFileName(org.springframework.messaging.Message<?> message) {
if(((File)message.getPayload()).exists()){
println("message: " + message.getPayload())
println("message: " + ((File)message.getPayload()).getAbsolutePath())
println("message: " + ((File)message.getPayload()).getName())
((File)(message.getPayload())).getName()
}else{
throw new IllegalArgumentException("file expected as payload")
}
}
})
return sftpHandler
}
}
网关:
@MessagingGateway
interface UploadGateway {
@Gateway(requestChannel ="toArchiveLocation")
archiveSpsAsnFile(File file)
}
2021-06-18 12:11:00.788 INFO 15653 --- [ask-scheduler-1] com.jcraft.jsch : Authentication succeeded (keyboard-interactive).
2021-06-18 12:11:00.862 DEBUG 15653 --- [ask-scheduler-1] o.s.integration.util.SimplePool : Obtained new org.springframework.integration.sftp.session.SftpSession@6dd7b63a.
2021-06-18 12:11:01.123 DEBUG 15653 --- [ask-scheduler-1] o.s.i.s.i.SftpInboundFileSynchronizer : deleted remote file: //856POMRMN00407684_24100158471_202106140820.xml
2021-06-18 12:11:01.124 DEBUG 15653 --- [ask-scheduler-1] o.s.i.f.r.session.CachingSessionFactory : Releasing Session org.springframework.integration.sftp.session.SftpSession@6dd7b63a back to the pool.
2021-06-18 12:11:01.124 DEBUG 15653 --- [ask-scheduler-1] o.s.integration.util.SimplePool : Releasing org.springframework.integration.sftp.session.SftpSession@6dd7b63a back to the pool
2021-06-18 12:11:01.124 DEBUG 15653 --- [ask-scheduler-1] o.s.i.s.i.SftpInboundFileSynchronizer : 1 files transferred from '/'
2021-06-18 12:11:01.125 DEBUG 15653 --- [ask-scheduler-1] sageSource$LocalFileReadingMessageSource : Added to queue: [/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml]
2021-06-18 12:11:01.131 DEBUG 15653 --- [ask-scheduler-1] o.s.i.e.SourcePollingChannelAdapter : Poll resulted in Message: GenericMessage [payload=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, headers={file_remoteHostPort=xfer2.fingerhut.com:22, file_name=856POMRMN00407684_24100158471_202106140820.xml, file_remoteDirectory=//, file_originalFile=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, id=766bb7de-3bed-698b-f683-ccccdb6ff6fb, file_relativePath=856POMRMN00407684_24100158471_202106140820.xml, file_remoteFile=856POMRMN00407684_24100158471_202106140820.xml, timestamp=1624036261130}]
2021-06-18 12:11:01.133 DEBUG 15653 --- [ask-scheduler-1] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'inputChannel'', message: GenericMessage [payload=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, headers={file_remoteHostPort=xfer2.fingerhut.com:22, file_name=856POMRMN00407684_24100158471_202106140820.xml, file_remoteDirectory=//, file_originalFile=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, id=766bb7de-3bed-698b-f683-ccccdb6ff6fb, file_relativePath=856POMRMN00407684_24100158471_202106140820.xml, file_remoteFile=856POMRMN00407684_24100158471_202106140820.xml, timestamp=1624036261130}]
2021-06-18 12:11:01.134 DEBUG 15653 --- [ask-scheduler-1] .i.h.ReplyProducingMessageHandlerWrapper : bean 'fileWatcher.messageHandler.serviceActivator.handler' received message: GenericMessage [payload=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, headers={file_remoteHostPort=xfer2.fingerhut.com:22, file_name=856POMRMN00407684_24100158471_202106140820.xml, file_remoteDirectory=//, file_originalFile=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, id=766bb7de-3bed-698b-f683-ccccdb6ff6fb, file_relativePath=856POMRMN00407684_24100158471_202106140820.xml, file_remoteFile=856POMRMN00407684_24100158471_202106140820.xml, timestamp=1624036261130}]
2021-06-18 12:11:01.158 INFO 15653 --- [ask-scheduler-1] c.b.f.s.watcher.FileWatcher : processing the file: 856POMRMN00407684_24100158471_202106140820.xml
to archive the file
2021-06-18 12:11:01.191 DEBUG 15653 --- [ask-scheduler-1] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'toArchiveLocation'', message: GenericMessage [payload=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, id=ceee7c55-2a76-3335-7df2-7270d3273ac3, timestamp=1624036261191}]
2021-06-18 12:11:01.192 DEBUG 15653 --- [ask-scheduler-1] .i.h.ReplyProducingMessageHandlerWrapper : bean 'fileWatcher.moveFileToArchive.serviceActivator.handler' received message: GenericMessage [payload=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, id=ceee7c55-2a76-3335-7df2-7270d3273ac3, timestamp=1624036261191}]
2021-06-18 12:11:01.192 DEBUG 15653 --- [ask-scheduler-1] o.s.i.sftp.outbound.SftpMessageHandler : bean 'moveFileToArchive'; defined in: 'class path resource [com/bsb/fms/spsasnprocessor/watcher/FileWatcher.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@3047254d' received message: GenericMessage [payload=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, id=ceee7c55-2a76-3335-7df2-7270d3273ac3, timestamp=1624036261191}]
2021-06-18 12:11:01.194 DEBUG 15653 --- [ask-scheduler-1] o.s.integration.util.SimplePool : Obtained org.springframework.integration.sftp.session.SftpSession@6dd7b63a from pool.
message: /Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml
message: /Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml
message: 856POMRMN00407684_24100158471_202106140820.xml
2021-06-18 12:11:01.326 DEBUG 15653 --- [ask-scheduler-1] o.s.i.sftp.session.SftpSession : File: /Archive/856POMRMN00407684_24100158471_202106140820.xml.writing was successfully renamed to /Archive/856POMRMN00407684_24100158471_202106140820.xml
2021-06-18 12:11:01.326 DEBUG 15653 --- [ask-scheduler-1] o.s.i.f.r.session.CachingSessionFactory : Releasing Session org.springframework.integration.sftp.session.SftpSession@6dd7b63a back to the pool.
2021-06-18 12:11:01.326 DEBUG 15653 --- [ask-scheduler-1] o.s.integration.util.SimplePool : Releasing org.springframework.integration.sftp.session.SftpSession@6dd7b63a back to the pool
2021-06-18 12:11:01.327 DEBUG 15653 --- [ask-scheduler-1] .i.h.ReplyProducingMessageHandlerWrapper : handler 'bean 'fileWatcher.moveFileToArchive.serviceActivator.handler'' produced no reply for request Message: GenericMessage [payload=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, id=ceee7c55-2a76-3335-7df2-7270d3273ac3, timestamp=1624036261191}]
2021-06-18 12:11:01.327 DEBUG 15653 --- [ask-scheduler-1] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'toArchiveLocation'', message: GenericMessage [payload=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, id=ceee7c55-2a76-3335-7df2-7270d3273ac3, timestamp=1624036261191}]'''
解决方案
MessagingGateway
with non -return 类型被void
视为请求-回复场景。根据您的存档流程逻辑,您确实希望没有回复。它只是单向流,因此是网关合约。然而在同一类型中,没有返回类型的 Groovy 方法不是void
,而是Object
:没有返回类型的 Groovy 方法签名。因此,您的网关合约不再是单向的。考虑void
明确提出,你很好。在文档中查看有关消息传递网关的更多信息:https ://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#gateway
推荐阅读
- javascript - npm start 命令的问题
- python - 如何将年、月、日列表转换为日期时间列表?
- react-dropdown-tree-select - 如何更改占位符文本?
- java - 如何使用 java 查询 mongoDB 数据?
- javascript - 返回的所有可能的数字组合
- windows - 为注册表中的现有值添加值(更新它)
- java - shift in mysql between sql timestamp and the hour I put
- grpc - 如何向 gRPC-JSON 转码器端点提交请求?
- docker - 使用 docker-compose 的内部和外部网络?
- kubernetes - Terraform 污点资源命名约定 (v0.11.13)