首页 > 解决方案 > 发布订阅频道可能存在问题

问题描述

我有一个应用程序可以监视特定 csv 文件的 FTP 文件夹foo.csv,一旦找到该文件,它就会将其拉到我的本地并生成新的输出格式 bar.csv,然后应用程序会将新文件发送finalBEY.csv回 FTP 文件夹并擦除它来自本地。

既然我已经介绍了一个过程,它使用publishSubscribeChannel它将文件转换为消息然后使用jobLaunchingGateway它将finalBEY.csv使用批处理读取并将其打印到控制台,它不起作用,因为finalBEY.csv在将其发送回 FTP 后从本地文件夹中删除,我在第一个中使用.channel("nullChannel")jobLaunchingGatewaysubscribe假设保留它直到收到批处理的回复,然后移动到下一个subscribe将它发送到 ftp 并将其从本地删除,但似乎情况并非如此从本地删除它,因此找不到批处理finalBEY.csv并引发我在下面粘贴代码的错误。

如果我从第二个中删除建议,subscribe它可以正常工作,因为这将不再从本地删除它。

你能帮忙解决这个问题吗?

public IntegrationFlow localToFtpFlow(Branch myBranch) {

        return IntegrationFlows.from(Files.inboundAdapter(new File(myBranch.getBranchCode()))
                        .filter(new ChainFileListFilter<File>()
                                .addFilter(new RegexPatternFileListFilter("final" + myBranch.getBranchCode() + ".csv"))
                                .addFilter(new FileSystemPersistentAcceptOnceFileListFilter(metadataStore(dataSource), "foo"))),//FileSystemPersistentAcceptOnceFileListFilter
                e -> e.poller(Pollers.fixedDelay(10_000)))
                .enrichHeaders(h ->h.headerExpression("file_originalFile", "new java.io.File('"+ myBranch.getBranchCode() +"/FEFOexport" + myBranch.getBranchCode() + ".csv')",true))
                .transform(p -> {
                    LOG.info("Sending file " + p + " to FTP branch " + myBranch.getBranchCode());
                    return p;
                })

                .log()
                .transform(m -> {
                            this.defaultSessionFactoryLocator.addSessionFactory(myBranch.getBranchCode(),createNewFtpSessionFactory(myBranch));
                            LOG.info("Adding factory to delegation");
                            return m;
                })
                .publishSubscribeChannel(s ->
                        s.subscribe(f ->f.transform(fileMessageToJobRequest())
                                        .handle(jobLaunchingGateway()).channel("nullChannel"))
                        .subscribe(h -> h.handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), FileExistsMode.REPLACE)
                                         .useTemporaryFileName(true)
                                         .autoCreateDirectory(false)
                                         .remoteDirectory(myBranch.getFolderPath()), e -> e.advice(expressionAdvice()))))

                .get();
    }

    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest(){
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
        fileMessageToJobRequest.setFileParameterName("file_path");
        fileMessageToJobRequest.setJob(orderJob);
        return fileMessageToJobRequest;
    }

    @Bean
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

        return jobLaunchingGateway;
    }

    /**
    * Creating the advice for routing the payload of the outbound message on different expressions (success, failure)
    * @return Advice
    */

    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload.delete() + ' was successful'");
        //advice.setOnSuccessExpressionString("inputMessage.headers['file_originalFile'].renameTo(new java.io.File(payload.absolutePath + '.success.to.send'))");
        //advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString("payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }

这是错误,如您所见,第一行显示它已传输到 FTP,然后启动批处理作业,而在订阅中应先执行批处理...

 INFO 10452 --- [ask-scheduler-2] o.s.integration.ftp.session.FtpSession   : File has been successfully renamed from: /ftp/erbranch/EDMS/FEFO/finalBEY.csv.writing to /ftp/erbranch/EDMS/FEFO/finalBEY.csv

Caused by: java.lang.IllegalStateException: Input resource must exist (reader is in 'strict' mode): file [C:\Java Programs\spring4ftpappftp\BEY\finalBEY.csv]
    at org.springframework.batch.item.file.FlatFileItemReader.doOpen(FlatFileItemReader.java:251) ~[spring-batch-infrastructure-4.0.1.RELEASE.jar:4.0.1.RELEASE]
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:146) ~[spring-batch-infrastructure-4.0.1.RELEASE.jar:4.0.1.RELEASE]
    ... 123 common frames omitted

调试代码:

2019-07-15 10:43:02.838  INFO 4280 --- [ask-scheduler-2] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /ftp/erbranch/EDMS/FEFO/finalBEY.csv.writing
2019-07-15 10:43:02.845  INFO 4280 --- [ask-scheduler-2] o.s.integration.ftp.session.FtpSession   : File has been successfully renamed from: /ftp/erbranch/EDMS/FEFO/finalBEY.csv.writing to /ftp/erbranch/EDMS/FEFO/finalBEY.csv
2019-07-15 10:43:02.845 DEBUG 4280 --- [ask-scheduler-2] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'integrationEvaluationContext'
2019-07-15 10:43:02.848 DEBUG 4280 --- [ask-scheduler-2] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'success.input'
2019-07-15 10:43:02.849 DEBUG 4280 --- [ask-scheduler-2] o.s.integration.channel.DirectChannel    : preSend on channel 'success.input', message: AdviceMessage [payload=true was successful, headers={id=eca55e1d-918e-3334-afce-66f8ab650748, timestamp=1563176582848}, inputMessage=GenericMessage [payload=BEY\finalBEY.csv, headers={file_originalFile=BEY\FEFOexportBEY.csv, id=a2f029b0-2609-1a11-67ef-4f56c7dd0752, file_name=finalBEY.csv, file_relativePath=finalBEY.csv, timestamp=1563176582787}]]
2019-07-15 10:43:02.849 DEBUG 4280 --- [ask-scheduler-2] o.s.i.t.MessageTransformingHandler       : success.org.springframework.integration.transformer.MessageTransformingHandler#0 received message: AdviceMessage [payload=true was successful, headers={id=eca55e1d-918e-3334-afce-66f8ab650748, timestamp=1563176582848}, 

inputMessage=GenericMessage [payload=BEY\finalBEY.csv, headers={file_originalFile=BEY\FEFOexportBEY.csv, id=a2f029b0-2609-1a11-67ef-4f56c7dd0752, file_name=finalBEY.csv, file_relativePath=finalBEY.csv, timestamp=1563176582787}]]

    2019-07-15 10:43:02.951 DEBUG 4280 --- [ask-scheduler-2] o.s.b.i.launch.JobLaunchingGateway       : jobLaunchingGateway received message: GenericMessage [payload=JobLaunchRequest: orderJob, parameters={file_path=C:\Java Programs\spring4ftpappftp\BEY\finalBEY.csv, dummy=1563176582946}, headers={file_originalFile=BEY\FEFOexportBEY.csv, id=c98ad6cb-cced-c911-1b93-9d054baeb9d0, file_name=finalBEY.csv, file_relativePath=finalBEY.csv, timestamp=1563176582951}]

2019-07-16 08:35:29.442  INFO 10208 --- [nio-8081-exec-3] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.FTPOutp' has 1 subscriber(s).
2019-07-16 08:35:29.442  INFO 10208 --- [nio-8081-exec-3] o.s.i.endpoint.EventDrivenConsumer       : started 1o.org.springframework.integration.config.ConsumerEndpointFactoryBean#3
2019-07-16 08:35:29.442  INFO 10208 --- [nio-8081-exec-3] o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler} as a subscriber to the '1o.subFlow#1.channel#0' channel
2019-07-16 08:35:29.442  INFO 10208 --- [nio-8081-exec-3] o.s.integration.channel.DirectChannel    : Channel 'application.1o.subFlow#1.channel#0' has 1 subscriber(s).
2019-07-16 08:35:29.442  INFO 10208 --- [nio-8081-exec-3] o.s.i.endpoint.EventDrivenConsumer       : started 1o.subFlow#1.org.springframework.integration.config.ConsumerEndpointFactoryBean#1
2019-07-16 08:35:29.442  INFO 10208 --- [nio-8081-exec-3] o.s.i.endpoint.EventDrivenConsumer       : Adding {bridge} as a subscriber to the 'FTPOutp' channel
2019-07-16 08:35:29.442  INFO 10208 --- [nio-8081-exec-3] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.FTPOutp' has 2 subscriber(s).

标签: spring-integrationspring-batch

解决方案


由于您使用的SyncTaskExecutor是批处理作业,因此应该在调用线程上运行,然后是 FTP 适配器。

使用 DEBUG 日志记录并按照消息流查看为什么没有发生。


推荐阅读