首页 > 解决方案 > MultiResourceItemReader 未按预期工作

问题描述

我一直在FlatFileItemReader一个一个地使用和处理文件。但我试图一次使用和提供所有文件,过滤后MultiResourceItemReader有 3 个文件,最多 50 个。运行时即使提供了所有文件,如果我验证结果只有 1 个文件被处理。从文件中读取数据并保存到数据库中,在验证结果后,仅将 1 个文件的数据保存到数据库中。我找不到我做错了什么。我的代码如下:CSVJobCSV
MultiResourceItemReader

@Bean(name = "multiItemReader")
@StepScope
public MultiResourceItemReader<CDSBrokerBOIDMappingEntity> multiResourceItemReader(@Value("#{jobParameters[filenameStartPattern]}") String filenameStartPattern
        , @Value("#{jobParameters[filenameEndPattern]}") String filenameEndPattern, @Value("#{jobParameters[localDirectory]}") String localDirectory) throws Exception {

    String[] localDirectories = localDirectory.split(",");

    List<Resource> inputResources = Collections.synchronizedList(new ArrayList<>());

    for (String localDirectory1 : localDirectories){

        try (Stream<Path> walk = Files.walk(Paths.get(localDirectory1), 1)) {
            walk.filter(Files::isRegularFile)   // is a file
                    .filter(p -> p.getFileName().toString().startsWith(filenameStartPattern) && p.getFileName().toString().endsWith(filenameEndPattern))
                    .findAny().ifPresentOrElse(f -> {
                        log.info("CSV FILE => " + f.getFileName().toString());
                        inputResources.add(new FileSystemResource(f));
                    },
                    () -> {
                        log.info("No file found");
                    });
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    log.info("No. of files => "+inputResources.size());


    MultiResourceItemReader<CDSBrokerBOIDMappingEntity> resourceItemReader = new MultiResourceItemReader<CDSBrokerBOIDMappingEntity>();
    resourceItemReader.setResources(inputResources.toArray(Resource[]::new));
    resourceItemReader.setDelegate(importReader());
    resourceItemReader.setStrict(true);
    return resourceItemReader;
}

FlatFileItemReader代码是:

@Bean
public FlatFileItemReader<CDSBrokerBOIDMappingEntity> importReader() throws Exception {       

    FlatFileItemReader<CDSBrokerBOIDMappingEntity> reader = new FlatFileItemReader<>();

    reader.setLinesToSkip(1);
    reader.setLineMapper(new DefaultLineMapper<CDSBrokerBOIDMappingEntity>() {{
        setLineTokenizer(new DelimitedLineTokenizer() {{
            setNames(new String[]{"BOID", "CLIENT_MEMBER_CODE", "BROKER_ID", "LAST_MODIFIED_DATE", "ISVALID"});
        }});
        setFieldSetMapper(new BeanWrapperFieldSetMapper<CDSBrokerBOIDMappingEntity>() {{
            setTargetType(CDSBrokerBOIDMappingEntity.class);
        }});
    }});

    reader.setStrict(true);
    reader.afterPropertiesSet();

    return reader;
}

作家是:

@Bean
public ItemWriter<CDSBrokerBOIDMappingEntity> writer() {

    //  log.info("Writer current thread. {}", Thread.currentThread().getName());

    RepositoryItemWriter<CDSBrokerBOIDMappingEntity> writer = new RepositoryItemWriter<CDSBrokerBOIDMappingEntity>();

    writer.setRepository(cdsBrokerBOIDMappingRepository);
//    writer.setMethodName("save");

    try {
        writer.afterPropertiesSet();
    } catch (Exception e) {
        e.printStackTrace();
    }

    return writer;
}

和:Step_Job

@Bean
public Job importUserJob(MultiResourceItemReader<CDSBrokerBOIDMappingEntity> importReader, JobCompletionNotificationListener listener) {
    return jobBuilderFactory
            .get("importUserJob")
            .incrementer(new RunIdIncrementer())
            .listener(listener)
            .flow(step1(importReader))
            .end()
            .build();
}

@Bean
public Step step1(@Qualifier("multiItemReader") MultiResourceItemReader<CDSBrokerBOIDMappingEntity> importReader) {
    return stepBuilderFactory.get("step1").<CDSBrokerBOIDMappingEntity, CDSBrokerBOIDMappingEntity>chunk(200)
            .reader(importReader)
            .processor(processor())
            .writer(writer())
            .listener(stepListener())
            .taskExecutor(taskExecutor())
            .build();
}

我尝试了多次,但只读取了 1 个文件。代码有什么问题吗?还是我的方法不对?

标签: spring-bootcsvspring-batchbatch-processingjava-11

解决方案


MultiResourceItemReader使用 Spring Batch v4.3.3 可以按预期工作,这是一个简单的示例:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.PassThroughLineMapper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;

@Configuration
@EnableBatchProcessing
public class SO68125366 {

    @Bean
    public MultiResourceItemReader<String> itemReader() {
        FlatFileItemReader<String> itemReader = new FlatFileItemReaderBuilder<String>()
                .name("itemReader")
                .lineMapper(new PassThroughLineMapper())
                .build();

        MultiResourceItemReader<String> multiResourceItemReader = new MultiResourceItemReader<>();
        multiResourceItemReader.setDelegate(itemReader);
        Resource resource1 = new FileSystemResource("file1.txt");
        Resource resource2 = new FileSystemResource("file2.txt");
        multiResourceItemReader.setResources(new Resource[] {resource1, resource2});
        return multiResourceItemReader;
    }

    @Bean
    public ItemWriter<String> itemWriter() {
        return items -> items.forEach(System.out::println);
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(steps.get("step")
                        .<String, String>chunk(5)
                        .reader(itemReader())
                        .writer(itemWriter())
                        .build())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(SO68125366.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

有两个文件file1.txtfile2.txt分别包含helloworld,示例打印:

hello
world

这意味着MultiResourceItemReader阅读两种资源,而不仅仅是你提到的一种。完整的示例可以在这个repo中找到。


推荐阅读