spring-batch - MultiResourceItemReader 将最后一个文件读取锁定,因此完成处理后无法移动
问题描述
关于为什么 MultiResourceItemReader 将最后一个文件锁定并且我无法使用 Move Tasklet 移动它的想法?移动完成了所有其他文件,但最后一个文件已读取。
IO 异常有:
java.nio.file.FileSystemException: C:\Users\UGDW\MyProjects\ngsa2\oab-outside-assets-batchlauncher\input\EQ_AcctData_4321_03292020.csv -> C:\Users\UGDW\MyProjects\ngsa2\oab-outside-assets-batchlauncher\output\EQ_AcctData_4321_03292020.csv_processed: The process cannot access the file because it is being used by another process.
批处理配置(精简):
<batch:job id="stockPlanAccountDataJob">
<batch:step id="getFilesInInputDirectory" next="fileProcessing">
<tasklet ref="getFilesInInputDirectoryTasklet"/>
</batch:step>
<batch:step id="fileProcessing" next="moveFilesToOuputDirectory">
<tasklet>
<chunk reader="stockPlanAccountDataFileReader" processor="stockPlanAccountDataProcessor" writer="stockPlanConsoleItemWriter"
commit-interval="20" skip-limit="20">
<batch:skippable-exception-classes>
<batch:include class="java.lang.Exception"/>
<batch:exclude class="org.springframework.batch.item.file.FlatFileParseException"/>
</batch:skippable-exception-classes>
</chunk>
</tasklet>
</batch:step>
<batch:step id="moveFilesToOuputDirectory">
<tasklet ref="stockPlanMoveFilesTasklet"/>
</batch:step>
</batch:job>
<bean id="getFilesInInputDirectoryTasklet" class="simplepeekandmulti.GetFilesInInputDirectoryTasklet" scope="step"/>
<bean id="stockPlanAccountDataFileReader" class="simplepeekandmulti.StockPlanAccountDataFileReader" scope="step">
<property name="delegate" ref="preprocessorUsingPeekable"/>
</bean>
<bean id="preprocessorUsingPeekable" class="org.springframework.batch.item.support.SingleItemPeekableItemReader" scope="step">
<property name="delegate" ref="multiFileResourceReader"/>
</bean>
<bean name="multiFileResourceReader" class="org.springframework.batch.item.file.MultiResourceItemReader" scope="step">
<property name="resources" value="file:#{jobExecutionContext[filepattern]}" />
<property name="delegate" ref="genericFlatFileReader" />
<property name="strict" value="true" />
</bean>
<bean id="genericFlatFileReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="lineMapper" ref="genericFileLineMapper"/>
</bean>
<bean name="genericFileLineMapper" class="org.springframework.batch.item.file.mapping.PassThroughLineMapper" scope="step" />
<bean id="stockPlanAccountDataProcessor" class="simplepeekandmulti.StockPlanAccountDataProcessor" scope="step"/>
<bean id="stockPlanMoveFilesTasklet" class="simplepeekandmulti.StockPlanMoveFilesTasklet" scope="step"/>
读者(有愚蠢的逻辑):
package simplepeekandmulti;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.PeekableItemReader;
import simplepeekandmulti.StockPlanAccountData;
import simplepeekandmulti.StockPlanFileInputAccountData;
public class StockPlanAccountDataFileReader implements ItemReader<StockPlanFileInputAccountData> {
private PeekableItemReader<String> delegate;
private AtomicLong itemsRead = new AtomicLong(0L);
private static final String PIPE = "|";
private static final String PIPE_SPLIT = "\\|";
private static final int NUM_RECORDS_PER_LINE = 6;
public PeekableItemReader<String> getDelegate() {
return delegate;
}
public void setDelegate(PeekableItemReader<String> delegate) {
this.delegate = delegate;
}
@Override
public StockPlanFileInputAccountData read() throws Exception {
String currentLine = delegate.read();
StockPlanFileInputAccountData inputData = new StockPlanFileInputAccountData();
int recs = 0;
List<String> errorList = new ArrayList<>();
while (currentLine != null) {
if (currentLine.contains(PIPE)) {
recs++;
setDetailLine(currentLine, inputData, recs, errorList);
} else {
errorList.add(currentLine);
}
if ((errorList.size() % 2) == 0) {
return inputData;
}
itemsRead.incrementAndGet();
currentLine = delegate.read();
}
return null;
}
private void setDetailLine(String inputLine, StockPlanFileInputAccountData inputData,
int numRecs, List<String> errorList) {
String[] entry = inputLine.split(PIPE_SPLIT);
if (entry.length == NUM_RECORDS_PER_LINE) {
inputData.setDataRecordsPerFile(numRecs);
StockPlanAccountData data = new StockPlanAccountData();
data.setExternalClientId(entry[0]);
data.setSSN(entry[1]);
data.setExternalParticipantId(entry[2]);
data.setFirstName(entry[3]);
data.setLastName(entry[4]);
data.setDateOfBitrth(entry[5]);
inputData.addToDataList(data);
} else {
errorList.add("Detail Line Is Invalid, Does NOT have 6 columns, 5 pipes: " + inputLine);
}
}
}
处理器:
package simplepeekandmulti;
import java.util.ArrayList;
import java.util.List;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ItemProcessor;
import com.vanguard.inst.batch.oab.springboot.data.StockPlanFileInputAccountData;
public class StockPlanAccountDataProcessor implements ItemProcessor<StockPlanFileInputAccountData, StockPlanFileInputAccountData> {
private StepExecution stepExecution;
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
public StockPlanFileInputAccountData process(StockPlanFileInputAccountData item) throws Exception {
List<String> errorList = new ArrayList<>(0);
if (errorList.isEmpty()) {
return item;
} else {
//exchangeEmailService.sendEmail(fileName, errorList);
return null;
}
}
}
作家:
package simplepeekandmulti;
import java.util.List;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;
import com.vanguard.inst.batch.oab.springboot.data.StockPlanFileInputAccountData;
@Component
public class StockConsoleOutputItemWriter implements ItemWriter<StockPlanFileInputAccountData> {
@Override
public void write(List<? extends StockPlanFileInputAccountData> arg0) throws Exception {
// TODO Auto-generated method stub
}
}
移动文件小任务(文件名硬编码):循环中的最后一个文件总是失败。
package simplepeekandmulti;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class StockPlanMoveFilesTasklet implements Tasklet {
private static final String CLASS_NAME = StockPlanMoveFilesTasklet.class.getSimpleName();
@Value("$simplepeekandmulti-{INPUT_DIR}")
private String inputDir;
@Value("$simplepeekandmulti-{OUTPUT_DIR}")
private String outputDir;
private static final String PROCESSED = "_processed";
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
String[] fileList = {"EQ_AcctData_3210_03302020.csv", "EQ_AcctData_4321_03302020.csv"};
try {
for (String fileName : fileList) {
Path pathFrom = FileSystems.getDefault().getPath(inputDir, fileName);
Path pathTo = FileSystems.getDefault().getPath(outputDir, fileName + PROCESSED);
Files.move(pathFrom, pathTo, StandardCopyOption.REPLACE_EXISTING);
}
} catch (IOException io) {
System.out.println(io.toString());
}
return RepeatStatus.FINISHED;
}
}
CSV 文件只有;页眉日期,管道分隔的记录,页脚总记录数
2020 年 3 月 30 日
3210|59658625|12000|AADFBCJGH|LLOQMNURS|1962-03-08
3210|10124602|12001|AADFBCJGH|LLOQMNURS|1962-03-08
2
2020 年 3 月 30 日
4321|5690154|13000|AADFBCJGH|LLOQMNURS|1988-10-23
4321|745701|13001|AADFBCJGH|LLOQMNURS|1988-10-23
2
解决方案
推荐阅读
- c# - 如何通过 C# 代码获取 Kentico 的内存统计信息?
- javascript - 组合多个数组的相同索引的最佳方法
- google-apps-script - 我可以在运行分配的脚本时阻止触摸屏设备在表格中选择图像吗?
- templates - 行业认可的技术设计和需求收集模板集(IIBA、PMI 等)?
- javascript - Cheerio 只给我 img 的 src 属性值的一部分?
- .net - 使用 MSBuild 执行 MSTest
- vue.js - 在cmd中创建新项目时vue create不起作用
- python - Move all members to channel command using a foor loop discord.py
- php - centos 7上有多个php版本。php5.6不加载mysqli
- sql - SQL 等效查询