首页 > 解决方案 > ElasticsearchItemReader 不断读取相同的记录

问题描述

我真的是 Spring 的初学者,我必须使用 spring-batch 开发一个应用程序。此应用程序必须从弹性搜索索引中读取并将所有记录写入文件中。

当我运行程序时,我没有收到任何错误,并且应用程序读取记录并将它们正确写入文件中。问题是应用程序永远不会停止,并且会不停地读取、处理和写入数据。在下图中,您可以看到多次处理相同的记录。

在此处输入图像描述

我认为我的代码或软件设计中一定有问题,所以我将代码中最重要的部分附在下面。

我开发了以下 ElasticsearchItemReader:

public class ElasticsearchItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {

private final Logger logger;

private final ElasticsearchOperations elasticsearchOperations;

private final SearchQuery query;

private final Class<? extends T> targetType;

public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class<? extends T> targetType) {
    setName(getShortName(getClass()));
    logger = getLogger(getClass());
    this.elasticsearchOperations = elasticsearchOperations;
    this.query = query;
    this.targetType = targetType;
}

@Override
public void afterPropertiesSet() throws Exception {
    state(elasticsearchOperations != null, "An ElasticsearchOperations implementation is required.");
    state(query != null, "A query is required.");
    state(targetType != null, "A target type to convert the input into is required.");
}

@Override
@SuppressWarnings("unchecked")
protected Iterator<T> doPageRead() {

    logger.debug("executing query {}", query.getQuery());

    return (Iterator<T>)elasticsearchOperations.queryForList(query, targetType).iterator();
}
}

我还写了以下 ReadWriterConfig:

@Configuration
public class ReadWriterConfig {

@Bean
public ElasticsearchItemReader<AnotherElement> elasticsearchItemReader() {

    return new ElasticsearchItemReader<>(elasticsearchOperations(), query(), AnotherElement.class);
}


@Bean
public SearchQuery query() {

    NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder()
            .withQuery(matchAllQuery());

    return builder.build();
}

@Bean
public ElasticsearchOperations elasticsearchOperations()  {

    Client client = null;
    try {
        Settings settings = Settings.builder()
                .build();

        client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
        return new ElasticsearchTemplate(client);
    } catch (UnknownHostException e) {
        e.printStackTrace();
        return null;
    }


}
}

我编写了调用读取器、写入器和处理器的批处理配置:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;

// tag::readerwriterprocessor[]
@Bean
public ElasticsearchItemReader<AnotherElement> reader() {
    return  new ReadWriterConfig().elasticsearchItemReader();
}

@Bean
public PersonItemProcessor processor() {
    return new PersonItemProcessor();
}

@Bean
public FlatFileItemWriter itemWriter() {
    return  new FlatFileItemWriterBuilder<AnotherElement>()
            .name("itemWriter")
            .resource(new FileSystemResource("target/output.txt"))
            .lineAggregator(new PassThroughLineAggregator<>())
            .build();
}

// end::readerwriterprocessor[]

// tag::jobstep[]
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step stepA) {
    return jobBuilderFactory.get("importUserJob")
            .flow(stepA)
            .end()
            .build();
}



@Bean
public Step stepA(FlatFileItemWriter<AnotherElement> writer) {
    return stepBuilderFactory.get("stepA")
            .<AnotherElement, AnotherElement> chunk(10)
            .reader(reader())
            .processor(processor())
            .writer(itemWriter())
            .build();
}
// end::jobstep[]

}

我附上了一些我为编写此代码而关注的网站:

https://github.com/spring-projects/spring-batch-extensions/blob/master/spring-batch-elasticsearch/README.md

https://spring.io/guides/gs/batch-processing/

标签: springelasticsearchspring-batchspring-data-elasticsearch

解决方案


您需要确保您的项目阅读器null在某个时候返回以表明没有更多数据需要处理并结束工作。

根据评论中的要求,以下是如何导入阅读器的示例:

@Configuration
@org.springframework.context.annotation.Import(ReadWriterConfig.class)
@EnableBatchProcessing
public class BatchConfiguration {

   // other bean definitions

   @Bean
   public Step stepA(ElasticsearchItemReader<AnotherElement> reader, FlatFileItemWriter<AnotherElement> writer) {
      return stepBuilderFactory.get("stepA")
        .<AnotherElement, AnotherElement> chunk(10)
        .reader(reader)
        .processor(processor())
        .writer(writer)
        .build();
   }
}

推荐阅读