spring - ElasticsearchItemReader 不断读取相同的记录
问题描述
我真的是 Spring 的初学者,我必须使用 spring-batch 开发一个应用程序。此应用程序必须从弹性搜索索引中读取并将所有记录写入文件中。
当我运行程序时,我没有收到任何错误,并且应用程序读取记录并将它们正确写入文件中。问题是应用程序永远不会停止,并且会不停地读取、处理和写入数据。在下图中,您可以看到多次处理相同的记录。
我认为我的代码或软件设计中一定有问题,所以我将代码中最重要的部分附在下面。
我开发了以下 ElasticsearchItemReader:
public class ElasticsearchItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {
private final Logger logger;
private final ElasticsearchOperations elasticsearchOperations;
private final SearchQuery query;
private final Class<? extends T> targetType;
public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class<? extends T> targetType) {
setName(getShortName(getClass()));
logger = getLogger(getClass());
this.elasticsearchOperations = elasticsearchOperations;
this.query = query;
this.targetType = targetType;
}
@Override
public void afterPropertiesSet() throws Exception {
state(elasticsearchOperations != null, "An ElasticsearchOperations implementation is required.");
state(query != null, "A query is required.");
state(targetType != null, "A target type to convert the input into is required.");
}
@Override
@SuppressWarnings("unchecked")
protected Iterator<T> doPageRead() {
logger.debug("executing query {}", query.getQuery());
return (Iterator<T>)elasticsearchOperations.queryForList(query, targetType).iterator();
}
}
我还写了以下 ReadWriterConfig:
@Configuration
public class ReadWriterConfig {
@Bean
public ElasticsearchItemReader<AnotherElement> elasticsearchItemReader() {
return new ElasticsearchItemReader<>(elasticsearchOperations(), query(), AnotherElement.class);
}
@Bean
public SearchQuery query() {
NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder()
.withQuery(matchAllQuery());
return builder.build();
}
@Bean
public ElasticsearchOperations elasticsearchOperations() {
Client client = null;
try {
Settings settings = Settings.builder()
.build();
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
return new ElasticsearchTemplate(client);
} catch (UnknownHostException e) {
e.printStackTrace();
return null;
}
}
}
我编写了调用读取器、写入器和处理器的批处理配置:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
// tag::readerwriterprocessor[]
@Bean
public ElasticsearchItemReader<AnotherElement> reader() {
return new ReadWriterConfig().elasticsearchItemReader();
}
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
public FlatFileItemWriter itemWriter() {
return new FlatFileItemWriterBuilder<AnotherElement>()
.name("itemWriter")
.resource(new FileSystemResource("target/output.txt"))
.lineAggregator(new PassThroughLineAggregator<>())
.build();
}
// end::readerwriterprocessor[]
// tag::jobstep[]
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step stepA) {
return jobBuilderFactory.get("importUserJob")
.flow(stepA)
.end()
.build();
}
@Bean
public Step stepA(FlatFileItemWriter<AnotherElement> writer) {
return stepBuilderFactory.get("stepA")
.<AnotherElement, AnotherElement> chunk(10)
.reader(reader())
.processor(processor())
.writer(itemWriter())
.build();
}
// end::jobstep[]
}
我附上了一些我为编写此代码而关注的网站:
解决方案
您需要确保您的项目阅读器null
在某个时候返回以表明没有更多数据需要处理并结束工作。
根据评论中的要求,以下是如何导入阅读器的示例:
@Configuration
@org.springframework.context.annotation.Import(ReadWriterConfig.class)
@EnableBatchProcessing
public class BatchConfiguration {
// other bean definitions
@Bean
public Step stepA(ElasticsearchItemReader<AnotherElement> reader, FlatFileItemWriter<AnotherElement> writer) {
return stepBuilderFactory.get("stepA")
.<AnotherElement, AnotherElement> chunk(10)
.reader(reader)
.processor(processor())
.writer(writer)
.build();
}
}
推荐阅读
- sql - SQL 舍入问题
- python - scalar() 参数 1 必须是 numpy.dtype 错误
- java - 如何在使用 geojson 库时修复“默认包不允许被导入包语法”错误?
- docker - 解析 dockerfile 路径时出错:请使用 --dockerfile 在构建上下文中提供 Dockerfile 的有效路径
- java - 关于多线程的Java初学者问题
- javascript - 如何删除旧地图标记并在 Google 地图中更新新标记
- html - 图标不显示
- java - 使用本地 Impala 连接执行单元测试用例
- xml - 如何获取两个标记标签之间的所有元素?
- node.js - 做 pm2 restart 终止正在进行的 rest api