首页 > 解决方案 > 使用 CompositeItemWriter 编写器或分类方法不会被调用

问题描述

我正在使用 Spring Boot 编写一个 Spring Batch,并且我需要根据条件在两个不同的表中写入,所以我尝试这样做,CompositeItemWriter但是当我调用批处理时,编写器没有被调用。

这是我的作业配置课程。

@Configuration
public class JobConfiguration {

    ...
    ...
    ...

    @Bean
    public JdbcCursorItemReader<Notification> reader() {
        JdbcCursorItemReader<Notification> reader = new JdbcCursorItemReader<Notification>();
        reader.setDataSource(dataSource);

    ...
    ...
        reader.setRowMapper(new BeanPropertyRowMapper<>(Notification.class));
        return reader;
    }

    @Bean
    public NotificationItemProcessor notificatonProcessor() {
        return new NotificationItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Notification> updateWriter() {
        JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());
        ...
        writer.setDataSource(dataSource);
        return writer;
    }


    /**
     * Composite Exchange Writer
     * @return
     * @throws InstantiationException
     * @throws IllegalAccessException
     */
    @SuppressWarnings("unchecked")
    @Bean
    public CompositeItemWriter<Notification> compositeExchangeWriter() throws InstantiationException, IllegalAccessException {
        HashMap<String, Object> map = new HashMap<String, Object>();
        map.put(ExchangeRouter.INSERT_EXCHANGE_FOR_NOTIFICATION.getActionName(), exchangeWorkflowWriter());
        map.put(ExchangeRouter.INSERT_EXCHANGE_FOR_PACK.getActionName(), exchangeWriter());
        map.put(ExchangeRouter.DO_NOTHING.getActionName(), doNothing());
        return new CompositeItemWriterBuilder(map, ExchangeWriterRouterClassifier.class).build();
    }

    @Bean
    public JdbcBatchItemWriter<Notification> exchangeWorkflowWriter() {
        JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());

        writer.setSql(" INSERT INTO SOME TABLE..");

        writer.setDataSource(dataSource);
        return writer;
    }

    @Bean
    public JdbcBatchItemWriter<Notification> exchangeWriter() {
        JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());

        writer.setSql("INSERT INTO SOME OTHER TABLE.");

        writer.setDataSource(dataSource);
        return writer;
    }

    @Bean
    public ItemWriter<Document> doNothing() {
        return new DummyWriter();
    }

    @Bean
    public Job generatePdf(JobCompletionNotificationListener listener) throws InstantiationException, IllegalAccessException {
        return jobBuilderFactory.get("generatePdf")
                .incrementer(new RunIdIncrementer())
                .flow(treatStock())
                .end()
                .build();
    }

    @Bean
    public Step treatStock() throws InstantiationException, IllegalAccessException {
        return stepBuilderFactory.get("treatStock")
                .<Notification, Notification>chunk(1)
                .reader(reader())
                .processor(notificatonProcessor())
                .writer(compositeExchangeWriter())
                .writer(updateWriter())
                .build();
    }

}

CompositeItemWriter.java

public class CompositeItemWriterBuilder extends CompositeItemBuilder<CompositeItemWriter> {

    public CompositeItemWriterBuilder(HashMap<String, Object> matcherMap, Class<?> routerDelegate) throws InstantiationException, IllegalAccessException {
        BackToBackPatternClassifier classif = new BackToBackPatternClassifier();
        classif.setRouterDelegate(routerDelegate.newInstance());
        classif.setMatcherMap(matcherMap);

        ClassifierCompositeItemWriter classifier = new ClassifierCompositeItemWriter();
        classifier.setClassifier(classif);

        this.delegates.add(classifier);

    }

    public CompositeItemWriterBuilder(List<Object> delegates) {
        this.delegates = delegates;
    }

    @Override
    protected Class<?> getCompositeItem() {
        return CompositeItemWriter.class;
    }
}

CompositeItemBuiler.java

public abstract class CompositeItemBuilder<T> {

    protected List<Object> delegates = new ArrayList<Object>();

    @SuppressWarnings("unchecked")
    public T build() throws InstantiationException, IllegalAccessException {

        Object compositeItem = getCompositeItem().newInstance();
        Method setDelegates = ReflectionUtils.findMethod(compositeItem.getClass(), "setDelegates", List.class);
        ReflectionUtils.invokeMethod(setDelegates,compositeItem, delegates);

        return (T) compositeItem;
    }

    abstract protected Class<?> getCompositeItem();
}

ExchangeWriterRouterClassifier .java(未调用分类方法)

public class ExchangeWriterRouterClassifier {

    @Classifier
    public String classify(Notification notification) {
        return notification.getExchangesWorkflow().getRouter().getActionName();
    }

}

Spring 是如何调用 Classifier 的?我错过了什么吗?

标签: springspring-batch

解决方案


我正在尝试 CompositeItemWriter 但是当我调用批处理时,编写器没有被调用。

问题出在您的步骤定义中:

@Bean
public Step treatStock() throws InstantiationException, IllegalAccessException {
    return stepBuilderFactory.get("treatStock")
            .<Notification, Notification>chunk(1)
            .reader(reader())
            .processor(notificatonProcessor())
            .writer(compositeExchangeWriter())
            .writer(updateWriter())
            .build();
}

您调用该writer()方法两次,因此updateWriter()将覆盖compositeExchangeWriter(). 您需要使用复合编写器作为参数调用该方法一次,您已经在该参数上设置了委托编写器。

ItemStream作为使用复合编写器时的附带说明,如果委托未实现接口,则需要确保将其注册为流。更多细节在这里:https ://docs.spring.io/spring-batch/4.0.x/reference/html/readersAndWriters.html#delegatePatternAndRegistering

Spring 是如何调用 Classifier 的?

当 aClassifierCompositeItemWriter配置正确时,Spring Batch 将在每个项目上调用分类器以确定要使用的编写器,然后调用适当的编写器来编写项目。

在您的配置中,ClassifierCompositeItemWriter此处未正确配置:

    @SuppressWarnings("unchecked")
public T build() throws InstantiationException, IllegalAccessException {

    Object compositeItem = getCompositeItem().newInstance();
    Method setDelegates = ReflectionUtils.findMethod(compositeItem.getClass(), "setDelegates", List.class);
    ReflectionUtils.invokeMethod(setDelegates,compositeItem, delegates);

    return (T) compositeItem;
}

我不会使用反射来设置代表。问题是您希望该方法compositeExchangeWriter注册 aClassifierCompositeItemWriter但其返回类型是CompositeItemWriter. 因此,复合作者不被视为分类器。

您可以在此处找到如何使用的示例ClassifierCompositeItemWriterhttps ://github.com/spring-projects/spring-batch/blob/master/spring-batch-infrastructure/src/test/java/org/springframework/batch/项目/支持/ClassifierCompositeItemWriterTests.java#L44


推荐阅读