首页 > 解决方案 > 将块侦听器用于指标模式

问题描述

我正在尝试使用处理器指示器模式使我的工作具有幂等性,我尝试使用 Write Listener、AfterWrite 通过设置字段 Processed: true 来更新 mongo 文档。但是,当有大量块时,就会出现问题。

MongoDB Item Reader(10000 Docs) ---chunk(1000)--> JDBC Batch Item Writer(Step完成后表中只保存5000个)

以下代码是关于该步骤的:

@Bean
    public MongoItemReader<X> Reader() throws Exception {
        MongoItemReader<X> reader = new MongoItemReader<>();
        reader.setTemplate(mongoTemplate);
        reader.setCollection("MY_COLLECTION");
        reader.setTargetType(X.class);
        reader.setQuery("{PROCESSED: {$exists: false}}");
        reader.setSort(new HashMap<String, Sort.Direction>() {{
            put("_id", Sort.Direction.ASC);
        }});
        reader.afterPropertiesSet();
        return reader;
    }

    @Bean
    public XItemProcessor x_item_processor() {
        return new XItemProcessor();
    }

    @Bean
    public X_Item_Listener item_listener() {
        return new X_Item_Listener();
    }

    @Bean
    public X_Step_Listener step_listener() {
        return new X_Step_Listener();
    }

    @Bean
    public JdbcBatchItemWriter<Y> YWriter() {
        JdbcBatchItemWriter<Y> Y_Writer = new JdbcBatchItemWriter<>();
        Y_Writer.setDataSource(dataSource);
        Y_Writer.setAssertUpdates(true);
        Y_Writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        Y_Writer.setSql("INSERT INTO Y (Y1,Y2,Y3,Y4) VALUES (:y1, :y2, :y3, :y4)");
        Y_Writer.afterPropertiesSet();
        return Y_Writer;
    }


    @Bean
    public Step XY_Step() throws Exception {
        return stepBuilderFactory.get("XY")
                .<X, Y>chunk(1000)
                .reader(Reader())
                .processor(x_item_processor())
                .writer(YWriter())
                .faultTolerant()
                .skipLimit(Integer.MAX_VALUE)
                .skip(Exception.class)
                .listener((ItemProcessListener<? super X, ? super Y>) item_listener())
                .listener(step_listener())
                .build();
    }

这是 After Write Listener 中用于更新 mongo 文档的代码片段。

@Autowired
private MongoTemplate mongoTemplate;

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void afterWrite(List<? extends Y> items) {
    BulkOperations ops=mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED,"MY_COLLECTION");
    for (Y item : items) {
        Update update = new Update().set("PROCESSED", true);
        ops.updateOne(new Query(Criteria.where("_id").is(item.getID())), update);
    }
    ops.execute();
}

标签: spring-batchidempotent

解决方案


推荐阅读