首页 > 解决方案 > Spring Integration 卡在网关通道而不返回轮询通道

问题描述

我有一个从 sftp 位置读取到本地计算机的要求。处理和后处理文件,然后将其移动到 sftp 位置的存档文件夹中:

我使用 Spring 集成如下:

我有一个 InboundChannelAdapter,它将有一个 cron 轮询器查看 sftp 位置并将文件拉到本地进行处理。

我有一个serviceActivator调用处理服务的对应。然后我调用网关方法,该方法调用另一个ServiceActivator存档文件的方法

流程是:

InboundChannelAdapter (with cron polling to pull every 10 seconds)
ServiceAdaptor
     processes file
     Invokes Messaginggateway method to archive
     deletes file from local
ServiceAdaptor(Archiving)

文件观察器:

@Configuration
class FileWatcher {
    Logger logger = LoggerFactory.getLogger(FileWatcher.class)

    @Autowired
    XmlParserService xmlParserService

    @Autowired
    UploadGateway gateway

    @Value("\${ssftp-server}")
    String sftpHost

    @Value("\${sftp-user}")
    String sftpUser

    @Value("\${sftp-password}")
    String sftpPassword

    @Value("\${sftp-filter}")
    String sftpFileFilter

    @Value("\${sftp-port}")
    Integer sftpPort

    @Value("\${sftp-source-path}")
    String sourcePath

    @Value("\${sftp-local-path}")
    String localPath

    @Value("\${sftp-archive-path}")
    String archivePath

    @Value("\${sftp-error-path}")
    String errorPath

    @Value("\${cron.expression}")
    String cronPoller

    @Value("\${sessionTimeout}")
    String sftpSessionTimeOut

    @Value("\${channelTimeout}")
    String sftpChannelTimeOut

    @Bean
    SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory(){
        def factory = new DefaultSftpSessionFactory(false)
        factory.setHost(sftpHost)
        factory.setPort(sftpPort)
        factory.setUser(sftpUser)
        factory.setPassword(sftpPassword)
        factory.setAllowUnknownKeys(true)
        return new CachingSessionFactory<ChannelSftp.LsEntry>(factory)
    }

    @Bean
    SftpInboundFileSynchronizer sftpInboundFileSynchronizer(){
        def fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory())
        fileSynchronizer.setDeleteRemoteFiles(true)
        fileSynchronizer.setRemoteDirectory(sourcePath)
        fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter(sftpFileFilter))
        return fileSynchronizer
    }

    @Bean
    @InboundChannelAdapter(channel = "inputChannel",poller = [@Poller(cron="*/10 * * * * *")])
    MessageSource<File> source(){
        def source = new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer())
        source.setLocalDirectory(new File(localPath))
        source.setAutoCreateLocalDirectory(true)
        source.setLocalFilter(new AcceptOnceFileListFilter())
        source.maxFetchSize = 1
        return source
    }

    @Bean
    @ServiceActivator(inputChannel = "inputChannel")
    MessageHandler messageHandler(){
        def message = new MessageHandler() {
            @Override
            void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
                def messageFile = message.payload as File
                try{
                    if (messageFile.name.endsWith(".xml")){
                        logger.info("processing the file: " + messageFile.name)
                        xmlParserService.process(messageFile)
                        if(messageFile.exists()){
                            println("to archive the file")
                            gateway.archiveSpsAsnFile(messageFile)
                            println("archived the file successfully")
                            messageFile.delete()
                        }
                    }
                }catch (IOException ioe){
                    logger.error("file could not be processed: " + messageFile.name)
                }
            }
        }
        return message
    }

    @Bean
    @ServiceActivator(inputChannel = "toArchiveLocation")
    MessageHandler moveFileToArchive(){
        SftpMessageHandler sftpHandler = new SftpMessageHandler(sftpSessionFactory())
        sftpHandler.setRemoteDirectoryExpression(new LiteralExpression(archivePath))
        sftpHandler.setFileNameGenerator(new FileNameGenerator() {
            @Override
            String generateFileName(org.springframework.messaging.Message<?> message) {
                   if(((File)message.getPayload()).exists()){
                       println("message: " + message.getPayload())
                       println("message: " + ((File)message.getPayload()).getAbsolutePath())
                       println("message: " + ((File)message.getPayload()).getName())
                     ((File)(message.getPayload())).getName()
                   }else{
                       throw new IllegalArgumentException("file expected as payload")
                   }
            }
        })
        return sftpHandler
    }
}

网关:

@MessagingGateway
interface UploadGateway {


    @Gateway(requestChannel ="toArchiveLocation")
    archiveSpsAsnFile(File file)
}
2021-06-18 12:11:00.788  INFO 15653 --- [ask-scheduler-1] com.jcraft.jsch                          : Authentication succeeded (keyboard-interactive).
2021-06-18 12:11:00.862 DEBUG 15653 --- [ask-scheduler-1] o.s.integration.util.SimplePool          : Obtained new org.springframework.integration.sftp.session.SftpSession@6dd7b63a.
2021-06-18 12:11:01.123 DEBUG 15653 --- [ask-scheduler-1] o.s.i.s.i.SftpInboundFileSynchronizer    : deleted remote file: //856POMRMN00407684_24100158471_202106140820.xml
2021-06-18 12:11:01.124 DEBUG 15653 --- [ask-scheduler-1] o.s.i.f.r.session.CachingSessionFactory  : Releasing Session org.springframework.integration.sftp.session.SftpSession@6dd7b63a back to the pool.
2021-06-18 12:11:01.124 DEBUG 15653 --- [ask-scheduler-1] o.s.integration.util.SimplePool          : Releasing org.springframework.integration.sftp.session.SftpSession@6dd7b63a back to the pool
2021-06-18 12:11:01.124 DEBUG 15653 --- [ask-scheduler-1] o.s.i.s.i.SftpInboundFileSynchronizer    : 1 files transferred from '/'
2021-06-18 12:11:01.125 DEBUG 15653 --- [ask-scheduler-1] sageSource$LocalFileReadingMessageSource : Added to queue: [/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml]
2021-06-18 12:11:01.131 DEBUG 15653 --- [ask-scheduler-1] o.s.i.e.SourcePollingChannelAdapter      : Poll resulted in Message: GenericMessage [payload=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, headers={file_remoteHostPort=xfer2.fingerhut.com:22, file_name=856POMRMN00407684_24100158471_202106140820.xml, file_remoteDirectory=//, file_originalFile=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, id=766bb7de-3bed-698b-f683-ccccdb6ff6fb, file_relativePath=856POMRMN00407684_24100158471_202106140820.xml, file_remoteFile=856POMRMN00407684_24100158471_202106140820.xml, timestamp=1624036261130}]
2021-06-18 12:11:01.133 DEBUG 15653 --- [ask-scheduler-1] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'inputChannel'', message: GenericMessage [payload=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, headers={file_remoteHostPort=xfer2.fingerhut.com:22, file_name=856POMRMN00407684_24100158471_202106140820.xml, file_remoteDirectory=//, file_originalFile=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, id=766bb7de-3bed-698b-f683-ccccdb6ff6fb, file_relativePath=856POMRMN00407684_24100158471_202106140820.xml, file_remoteFile=856POMRMN00407684_24100158471_202106140820.xml, timestamp=1624036261130}]
2021-06-18 12:11:01.134 DEBUG 15653 --- [ask-scheduler-1] .i.h.ReplyProducingMessageHandlerWrapper : bean 'fileWatcher.messageHandler.serviceActivator.handler' received message: GenericMessage [payload=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, headers={file_remoteHostPort=xfer2.fingerhut.com:22, file_name=856POMRMN00407684_24100158471_202106140820.xml, file_remoteDirectory=//, file_originalFile=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, id=766bb7de-3bed-698b-f683-ccccdb6ff6fb, file_relativePath=856POMRMN00407684_24100158471_202106140820.xml, file_remoteFile=856POMRMN00407684_24100158471_202106140820.xml, timestamp=1624036261130}]
2021-06-18 12:11:01.158  INFO 15653 --- [ask-scheduler-1] c.b.f.s.watcher.FileWatcher              : processing the file: 856POMRMN00407684_24100158471_202106140820.xml
to archive the file
2021-06-18 12:11:01.191 DEBUG 15653 --- [ask-scheduler-1] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'toArchiveLocation'', message: GenericMessage [payload=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, id=ceee7c55-2a76-3335-7df2-7270d3273ac3, timestamp=1624036261191}]
2021-06-18 12:11:01.192 DEBUG 15653 --- [ask-scheduler-1] .i.h.ReplyProducingMessageHandlerWrapper : bean 'fileWatcher.moveFileToArchive.serviceActivator.handler' received message: GenericMessage [payload=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, id=ceee7c55-2a76-3335-7df2-7270d3273ac3, timestamp=1624036261191}]
2021-06-18 12:11:01.192 DEBUG 15653 --- [ask-scheduler-1] o.s.i.sftp.outbound.SftpMessageHandler   : bean 'moveFileToArchive'; defined in: 'class path resource [com/bsb/fms/spsasnprocessor/watcher/FileWatcher.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@3047254d' received message: GenericMessage [payload=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, id=ceee7c55-2a76-3335-7df2-7270d3273ac3, timestamp=1624036261191}]
2021-06-18 12:11:01.194 DEBUG 15653 --- [ask-scheduler-1] o.s.integration.util.SimplePool          : Obtained org.springframework.integration.sftp.session.SftpSession@6dd7b63a from pool.
message: /Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml
message: /Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml
message: 856POMRMN00407684_24100158471_202106140820.xml
2021-06-18 12:11:01.326 DEBUG 15653 --- [ask-scheduler-1] o.s.i.sftp.session.SftpSession           : File: /Archive/856POMRMN00407684_24100158471_202106140820.xml.writing was successfully renamed to /Archive/856POMRMN00407684_24100158471_202106140820.xml
2021-06-18 12:11:01.326 DEBUG 15653 --- [ask-scheduler-1] o.s.i.f.r.session.CachingSessionFactory  : Releasing Session org.springframework.integration.sftp.session.SftpSession@6dd7b63a back to the pool.
2021-06-18 12:11:01.326 DEBUG 15653 --- [ask-scheduler-1] o.s.integration.util.SimplePool          : Releasing org.springframework.integration.sftp.session.SftpSession@6dd7b63a back to the pool
2021-06-18 12:11:01.327 DEBUG 15653 --- [ask-scheduler-1] .i.h.ReplyProducingMessageHandlerWrapper : handler 'bean 'fileWatcher.moveFileToArchive.serviceActivator.handler'' produced no reply for request Message: GenericMessage [payload=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, id=ceee7c55-2a76-3335-7df2-7270d3273ac3, timestamp=1624036261191}]
2021-06-18 12:11:01.327 DEBUG 15653 --- [ask-scheduler-1] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'toArchiveLocation'', message: GenericMessage [payload=/Users/user.name/New/856POMRMN00407684_24100158471_202106140820.xml, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2e34aad9, id=ceee7c55-2a76-3335-7df2-7270d3273ac3, timestamp=1624036261191}]'''

标签: javaspring-integration

解决方案


MessagingGatewaywith non -return 类型被void视为请求-回复场景。根据您的存档流程逻辑,您确实希望没有回复。它只是单向流,因此是网关合约。然而在同一类型中,没有返回类型的 Groovy 方法不是void,而是Object没有返回类型的 Groovy 方法签名。因此,您的网关合约不再是单向的。考虑void明确提出,你很好。在文档中查看有关消息传递网关的更多信息:https ://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#gateway


推荐阅读