首页 > 解决方案 > Spring Batch ItemWriteListener and SkipListener doesn't work hand in hand

问题描述

I am making a POC around Spring Batch SkipListener and ItemWriteListener because my business demands it and I need that way only. In Spring Batch ItemWriteListener and SkipListener doesn't work hand in hand it seems to me.

Here is some POC code I developed, but @BeforeWrite doesn't save anything into DB.

MySkipListener.java

public class MySkipListener implements SkipListener<Integer, Integer> {
    
    @Override
    public void onSkipInRead(Throwable t) {
        System.out.println("@@@MySkipListener| On Skip in Read Error : " + t.getMessage());
    }

    @Override
    public void onSkipInWrite(Integer item, Throwable t) {
        System.out.println("@@@MySkipListener | Skipped in write due to : " + t.getMessage());
    }

    @Override
    public void onSkipInProcess(Integer item, Throwable t) {
        System.out.println("@@@MySkipListener | Skipped in process due to: " + t.getMessage());
    }
}

MyStepListener.java

public class MyStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("beforeStep");
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("afterStep");
        return ExitStatus.COMPLETED;
    }
}

MyItemWriteListener.java

public class MyItemWriteListener {
    
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    @BeforeWrite
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void beforeWrite(List<? extends Integer> items) {
        System.out.println("########### ItemWriteListener | beforeWrite " + items);
        
        // MUST FOR ME to save data into DB here !!!
        jdbcTemplate.update("INSERT INTO `test`.`mytest`(`name`) VALUES( 'A')");
    }
}

MyJob.java

import java.util.Arrays;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyJob {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public ItemReader<Integer> itemReader() {
        return new ListItemReader<>(Arrays.asList(1, 2, 3));
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
        return items -> {
            for (Integer item : items) {
                if (item.equals(1)) {
                    throw new Exception("No 1 here!");
                }
                System.out.println("item = " + item);
            }
        };
    }

    @Bean
    public Step step() {
        return steps.get("step")
                .<Integer, Integer>chunk(5)
                .reader(itemReader())
                .writer(itemWriter())
                .faultTolerant()
                .skip(Exception.class)
                .skipLimit(10)
                .listener(mySkipListener())
                .listener(myStepListener())
                .listener(myItemWriteListener())
                .build();
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(step())
                .build();
    }
    
    @Bean
    public MySkipListener mySkipListener() {
        return new MySkipListener();
    }
    
    @Bean
    public MyStepListener myStepListener() {
        return new MyStepListener();
    }
    
    @Bean
    public MyItemWriteListener myItemWriteListener() {
        return new MyItemWriteListener();
    }
}

标签: springspring-batch

解决方案


It is because when you use @BeforeWrite to declare a listener method , it will create a spring AOP proxy behind the scene that wraps this method, but this proxy is not a spring bean so it does not know how to react to @Transactional and hence it does not have effect.

Try to implement it with ItemWriteListener :

@Component
public class MyItemWriteListener implements ItemWriteListener<Integer> {
    
    void beforeWrite(List<Integer> items){


    }
}

On the other hands, I will not control the transaction related stuff for the items that require batch processing (i.e object read by ItemReader) by my own because spring-batch already helps to manage the transaction for them . It already started a new transaction whenever processing a new chunk of item. Managing it by yourself feels like will mess things up related to the rollback things when it retries or skips a failure item.


推荐阅读