spring-batch - 将块侦听器用于指标模式
问题描述
我正在尝试使用处理器指示器模式使我的工作具有幂等性,我尝试使用 Write Listener、AfterWrite 通过设置字段 Processed: true 来更新 mongo 文档。但是,当有大量块时,就会出现问题。
MongoDB Item Reader(10000 Docs) ---chunk(1000)--> JDBC Batch Item Writer(Step完成后表中只保存5000个)
以下代码是关于该步骤的:
@Bean
public MongoItemReader<X> Reader() throws Exception {
MongoItemReader<X> reader = new MongoItemReader<>();
reader.setTemplate(mongoTemplate);
reader.setCollection("MY_COLLECTION");
reader.setTargetType(X.class);
reader.setQuery("{PROCESSED: {$exists: false}}");
reader.setSort(new HashMap<String, Sort.Direction>() {{
put("_id", Sort.Direction.ASC);
}});
reader.afterPropertiesSet();
return reader;
}
@Bean
public XItemProcessor x_item_processor() {
return new XItemProcessor();
}
@Bean
public X_Item_Listener item_listener() {
return new X_Item_Listener();
}
@Bean
public X_Step_Listener step_listener() {
return new X_Step_Listener();
}
@Bean
public JdbcBatchItemWriter<Y> YWriter() {
JdbcBatchItemWriter<Y> Y_Writer = new JdbcBatchItemWriter<>();
Y_Writer.setDataSource(dataSource);
Y_Writer.setAssertUpdates(true);
Y_Writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
Y_Writer.setSql("INSERT INTO Y (Y1,Y2,Y3,Y4) VALUES (:y1, :y2, :y3, :y4)");
Y_Writer.afterPropertiesSet();
return Y_Writer;
}
@Bean
public Step XY_Step() throws Exception {
return stepBuilderFactory.get("XY")
.<X, Y>chunk(1000)
.reader(Reader())
.processor(x_item_processor())
.writer(YWriter())
.faultTolerant()
.skipLimit(Integer.MAX_VALUE)
.skip(Exception.class)
.listener((ItemProcessListener<? super X, ? super Y>) item_listener())
.listener(step_listener())
.build();
}
这是 After Write Listener 中用于更新 mongo 文档的代码片段。
@Autowired
private MongoTemplate mongoTemplate;
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void afterWrite(List<? extends Y> items) {
BulkOperations ops=mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED,"MY_COLLECTION");
for (Y item : items) {
Update update = new Update().set("PROCESSED", true);
ops.updateOne(new Query(Criteria.where("_id").is(item.getID())), update);
}
ops.execute();
}
解决方案
推荐阅读
- python - 如何在 matplotlib 中制作带注释的分组堆叠条形图?
- python - Export single pandas dataframe to multiple SQL tables (automatic normalization)
- orbeon - 如何计算 Orbeon Forms 中的数据集?
- postgresql - How to read a zip entry into a postgres table using the JDBC copymanager
- angular - RXJS 如何处理返回的布尔值或 Observable
在开关图中 - python - Mongoengine documents with DataFrames
- mongodb - MongoDB Official GoLang Driver Comparision Operator
- docker - How to find url of docker-in-docker running in gitlab
- python - Will Thread be garbage collected in this example after thread.join()?
- javascript - React Js textbox adding addition backslashes to the input