spring - 使用 CompositeItemWriter 编写器或分类方法不会被调用
问题描述
我正在使用 Spring Boot 编写一个 Spring Batch,并且我需要根据条件在两个不同的表中写入,所以我尝试这样做,CompositeItemWriter
但是当我调用批处理时,编写器没有被调用。
这是我的作业配置课程。
@Configuration
public class JobConfiguration {
...
...
...
@Bean
public JdbcCursorItemReader<Notification> reader() {
JdbcCursorItemReader<Notification> reader = new JdbcCursorItemReader<Notification>();
reader.setDataSource(dataSource);
...
...
reader.setRowMapper(new BeanPropertyRowMapper<>(Notification.class));
return reader;
}
@Bean
public NotificationItemProcessor notificatonProcessor() {
return new NotificationItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Notification> updateWriter() {
JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());
...
writer.setDataSource(dataSource);
return writer;
}
/**
* Composite Exchange Writer
* @return
* @throws InstantiationException
* @throws IllegalAccessException
*/
@SuppressWarnings("unchecked")
@Bean
public CompositeItemWriter<Notification> compositeExchangeWriter() throws InstantiationException, IllegalAccessException {
HashMap<String, Object> map = new HashMap<String, Object>();
map.put(ExchangeRouter.INSERT_EXCHANGE_FOR_NOTIFICATION.getActionName(), exchangeWorkflowWriter());
map.put(ExchangeRouter.INSERT_EXCHANGE_FOR_PACK.getActionName(), exchangeWriter());
map.put(ExchangeRouter.DO_NOTHING.getActionName(), doNothing());
return new CompositeItemWriterBuilder(map, ExchangeWriterRouterClassifier.class).build();
}
@Bean
public JdbcBatchItemWriter<Notification> exchangeWorkflowWriter() {
JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());
writer.setSql(" INSERT INTO SOME TABLE..");
writer.setDataSource(dataSource);
return writer;
}
@Bean
public JdbcBatchItemWriter<Notification> exchangeWriter() {
JdbcBatchItemWriter<Notification> writer = new JdbcBatchItemWriter<Notification>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Notification>());
writer.setSql("INSERT INTO SOME OTHER TABLE.");
writer.setDataSource(dataSource);
return writer;
}
@Bean
public ItemWriter<Document> doNothing() {
return new DummyWriter();
}
@Bean
public Job generatePdf(JobCompletionNotificationListener listener) throws InstantiationException, IllegalAccessException {
return jobBuilderFactory.get("generatePdf")
.incrementer(new RunIdIncrementer())
.flow(treatStock())
.end()
.build();
}
@Bean
public Step treatStock() throws InstantiationException, IllegalAccessException {
return stepBuilderFactory.get("treatStock")
.<Notification, Notification>chunk(1)
.reader(reader())
.processor(notificatonProcessor())
.writer(compositeExchangeWriter())
.writer(updateWriter())
.build();
}
}
CompositeItemWriter.java
public class CompositeItemWriterBuilder extends CompositeItemBuilder<CompositeItemWriter> {
public CompositeItemWriterBuilder(HashMap<String, Object> matcherMap, Class<?> routerDelegate) throws InstantiationException, IllegalAccessException {
BackToBackPatternClassifier classif = new BackToBackPatternClassifier();
classif.setRouterDelegate(routerDelegate.newInstance());
classif.setMatcherMap(matcherMap);
ClassifierCompositeItemWriter classifier = new ClassifierCompositeItemWriter();
classifier.setClassifier(classif);
this.delegates.add(classifier);
}
public CompositeItemWriterBuilder(List<Object> delegates) {
this.delegates = delegates;
}
@Override
protected Class<?> getCompositeItem() {
return CompositeItemWriter.class;
}
}
CompositeItemBuiler.java
public abstract class CompositeItemBuilder<T> {
protected List<Object> delegates = new ArrayList<Object>();
@SuppressWarnings("unchecked")
public T build() throws InstantiationException, IllegalAccessException {
Object compositeItem = getCompositeItem().newInstance();
Method setDelegates = ReflectionUtils.findMethod(compositeItem.getClass(), "setDelegates", List.class);
ReflectionUtils.invokeMethod(setDelegates,compositeItem, delegates);
return (T) compositeItem;
}
abstract protected Class<?> getCompositeItem();
}
ExchangeWriterRouterClassifier .java(未调用分类方法)
public class ExchangeWriterRouterClassifier {
@Classifier
public String classify(Notification notification) {
return notification.getExchangesWorkflow().getRouter().getActionName();
}
}
Spring 是如何调用 Classifier 的?我错过了什么吗?
解决方案
我正在尝试 CompositeItemWriter 但是当我调用批处理时,编写器没有被调用。
问题出在您的步骤定义中:
@Bean
public Step treatStock() throws InstantiationException, IllegalAccessException {
return stepBuilderFactory.get("treatStock")
.<Notification, Notification>chunk(1)
.reader(reader())
.processor(notificatonProcessor())
.writer(compositeExchangeWriter())
.writer(updateWriter())
.build();
}
您调用该writer()
方法两次,因此updateWriter()
将覆盖compositeExchangeWriter()
. 您需要使用复合编写器作为参数调用该方法一次,您已经在该参数上设置了委托编写器。
ItemStream
作为使用复合编写器时的附带说明,如果委托未实现接口,则需要确保将其注册为流。更多细节在这里:https ://docs.spring.io/spring-batch/4.0.x/reference/html/readersAndWriters.html#delegatePatternAndRegistering
Spring 是如何调用 Classifier 的?
当 aClassifierCompositeItemWriter
配置正确时,Spring Batch 将在每个项目上调用分类器以确定要使用的编写器,然后调用适当的编写器来编写项目。
在您的配置中,ClassifierCompositeItemWriter
此处未正确配置:
@SuppressWarnings("unchecked")
public T build() throws InstantiationException, IllegalAccessException {
Object compositeItem = getCompositeItem().newInstance();
Method setDelegates = ReflectionUtils.findMethod(compositeItem.getClass(), "setDelegates", List.class);
ReflectionUtils.invokeMethod(setDelegates,compositeItem, delegates);
return (T) compositeItem;
}
我不会使用反射来设置代表。问题是您希望该方法compositeExchangeWriter
注册 aClassifierCompositeItemWriter
但其返回类型是CompositeItemWriter
. 因此,复合作者不被视为分类器。
您可以在此处找到如何使用的示例ClassifierCompositeItemWriter
:https ://github.com/spring-projects/spring-batch/blob/master/spring-batch-infrastructure/src/test/java/org/springframework/batch/项目/支持/ClassifierCompositeItemWriterTests.java#L44