spring-integration - 使用 Spring Integration 和 Batch 读取 CSV 文件并将其插入数据库
问题描述
我有一个应用程序可以监视特定 csv 文件的 FTP 文件夹foo.csv
,一旦找到该文件,它会将其拉到我的本地并生成新的输出格式bar.csv
,然后应用程序会将新文件发送bar.csv
回 FTP 文件夹并从本地删除它.
现在我想介绍一个进程,它会bar.csv
在再次发送到 FTP 服务器之前读取并将其插入到数据库表中。
我假设这可以使用 Spring Batch Integration 来完成,但我找不到如何去做。
以下是我的应用程序代码供参考和建议。
public IntegrationFlow localToFtpFlow(Branch myBranch) {
return IntegrationFlows.from(Files.inboundAdapter(new File(myBranch.getBranchCode()))
.filter(new ChainFileListFilter<File>()
.addFilter(new RegexPatternFileListFilter("final" + myBranch.getBranchCode() + ".csv"))
.addFilter(new FileSystemPersistentAcceptOnceFileListFilter(metadataStore(dataSource), "foo"))),//FileSystemPersistentAcceptOnceFileListFilter
e -> e.poller(Pollers.fixedDelay(10_000)))
.enrichHeaders(h ->h.headerExpression("file_originalFile", "new java.io.File('"+ myBranch.getBranchCode() +"/FEFOexport" + myBranch.getBranchCode() + ".csv')",true))
.transform(p -> {
LOG.info("Sending file " + p + " to FTP branch " + myBranch.getBranchCode());
return p;
})
.log()
.transform(m -> {
this.defaultSessionFactoryLocator.addSessionFactory(myBranch.getBranchCode(),createNewFtpSessionFactory(myBranch));
LOG.info("Adding factory to delegation");
return m;
})
.publishSubscribeChannel(s ->
s.subscribe(f -> f.transform(fileMessageToJobRequest()).handle(jobLaunchingGateway()).channel("nullChannel"))
.subscribe(h -> h.handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), FileExistsMode.REPLACE)
.useTemporaryFileName(true)
.autoCreateDirectory(false)
.remoteDirectory(myBranch.getFolderPath()), e -> e.advice(expressionAdvice()))))
/*.handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), FileExistsMode.REPLACE)
.useTemporaryFileName(true)
.autoCreateDirectory(false)
.remoteDirectory(myBranch.getFolderPath()), e -> e.advice(expressionAdvice()))*/
.get();
}
@Bean
public FileMessageToJobRequest fileMessageToJobRequest(){
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(orderJob);
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
//simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
return jobLaunchingGateway;
}
将消息归档到作业类
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());//message.getPayload().getAbsolutePath()
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
在运行时添加流时出现错误。
2019-07-02 12:42:49.292 INFO 7476 --- [ask-scheduler-4] o.s.i.file.FileReadingMessageSource : Created message: [GenericMessage [payload=BEY\finalBEY.csv, headers={file_originalFile=BEY\finalBEY.csv, id=38827020-c9c1-aa35-526c-cc6848ca5e11, file_name=finalBEY.csv, file_relativePath=finalBEY.csv, timestamp=1562060569292}]]
2019-07-02 12:42:49.292 INFO 7476 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler : GenericMessage [payload=BEY\finalBEY.csv, headers={file_originalFile=BEY\FEFOexportBEY.csv, id=abbe6169-7f46-fcb9-be9e-44533ab05ec8, file_name=finalBEY.csv, file_relativePath=finalBEY.csv, timestamp=1562060569292}]
2019-07-02 12:42:49.295 ERROR 7476 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [jobLaunchingGateway]; nested exception is java.lang.NullPointerException, failedMessage=GenericMessage [payload=JobLaunchRequest: orderJob, parameters={input.file.name=C:\Java Programs\spring4ftpappftp\BEY\finalBEY.csv}, headers={file_originalFile=BEY\FEFOexportBEY.csv, id=f7b2560e-e435-009b-2b8e-27ef168a6767, file_name=finalBEY.csv, file_relativePath=finalBEY.csv, timestamp=1562060569293}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:220)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:277)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.lambda$run$0(AbstractPollingEndpoint.java:378)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:53)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:372)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:98)
at org.springframework.batch.integration.launch.JobLaunchingMessageHandler.launch(JobLaunchingMessageHandler.java:50)
at org.springframework.batch.integration.launch.JobLaunchingGateway.handleRequestMessage(JobLaunchingGateway.java:76)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
... 100 more
解决方案
您需要考虑publishSubscribeChannel()
在IntegtrationFlow
定义中使用 a 。将您的订阅者作为一个订阅者.handle(Ftp.outboundAdapter())
(最好是第二个)。作为第一个,它应该类似于.handle(jobLaunchingGateway).channel("nullChannel")
.
JobLaunchingGateway
您可以在 Spring Batch参考手册中阅读有关内容。
关键是您想将相同的消息发送到多个地方。所以,PublishSubscribeChannel
最好的方法是:https ://docs.spring.io/spring-integration/docs/5.1.6.RELEASE/reference/html/#java-dsl-subflows
我建议您完全按照该顺序拥有这些订阅者,因为您真的想在发送到 FTP 之前存储到 DB 中。没有executor
第二个订阅者将等到第一个订阅者完成其工作。
这.channel("nullChannel")
是必要的,因为JobLaunchingGateway
它确实是一个网关,它返回 aJobExecution
作为回复。由于您对此不感兴趣,因此您只需要忽略即可。当然,handle()
在该网关之后,您可能还有另一个以JobExecution
某种方式处理此问题。关键是不要从第一个订阅者那里返回任何内容作为回复。它会以某种方式阻止你的主流。
更新
我认为.transform(fileMessageToJobRequest())
必须在之前转到下面的第一个订阅者jobLaunchingGateway
:
.publishSubscribeChannel(s ->
s.subscribe(f -> f.transform(fileMessageToJobRequest()).handle(jobLaunchingGateway()).channel("nullChannel"))
.subscribe(h -> h.handle(Ftp.outboundAdapter(createNewFtpSessionFactory(myBranch), ...))
关键是您想将相同的文件发送到下一个handle()
,但是在上游的转换器之后,它将被更改为JobLaunchRequest
,这对 有好处jobLaunchingGateway
,但对Ftp.outboundAdapter()
.
你的例外:
Caused by: java.lang.NullPointerException
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:98)
导致这段代码:Assert.notNull(jobParameters, "The JobParameters must not be null.");
. 所以,不知何故,jobParametersBuilder.toJobParameters()
评估为null
。
我无法为您提供这么多自定义代码。
更新2
好的。看起来您的问题出在JobLaunchingGateway
bean 定义中。你在那里做一个new SimpleJobLauncher()
没有JobRepository
注入的显式。
貌似Spring BootBasicBatchConfigurer
的里面有一个,BatchConfigurerConfiguration
可以注入到这个里面JobLaunchingGateway
。因此,我们将通过一个 NPE。之后还有其他错误,但这已经是另一回事了......
推荐阅读
- excel - VB脚本按行循环Excel数据
- iframe - Apex 在 IFrame 中添加的额外样式属性
- python - 如果在 Pandas DataFrame 中找到单词,则样式化单元格?
- python - 如何创建可索引的 map() 或修饰的 list()?
- angular - 角度 4-如何在 POST 中将文件发送到后端
- mongodb - 了解 MongoDB 块拆分
- c# - 从没有标头的资源中提取特定数据
- algorithm - Cracking the Coding Interview 中的算法似乎做错了什么
- python - Scrapy CrawlSpider 未加入
- python - Glade 3 导入不适用于 gtk 和 gi