spring-batch - 使用弹簧批处理将表从一个数据库复制到另一个数据库
问题描述
我需要将表从两个不同的数据库复制到单个数据库(目标数据库)。
例如,DB1 中的 Table1 和 DB3 中的 Table1 以及 DB2 中的 Table2 到 DB3 中的 Table2。所有表都共享相同的结构。
我创建了三个数据源和一个阅读器
// Datasource Config
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(entityManagerFactoryRef = "db3EntityManagerFactory",
transactionManagerRef = "db3TransactionManager",
basePackages = { "demo.sample.dao" })
public class Db3Config {
@Primary
@Bean(name = "db3DataSource")
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource dataSource() {
return DataSourceBuilder.create()
.build();
}
@Primary
@Bean(name = "db3EntityManagerFactory")
public LocalContainerEntityManagerFactoryBean db3EntityManagerFactory(EntityManagerFactoryBuilder builder,
@Qualifier("db3DataSource") DataSource dataSource) {
return builder.dataSource(dataSource)
.packages("demo.sample.dao")
.persistenceUnit("db3")
.build();
}
@Primary
@Bean(name = "db3TransactionManager")
public PlatformTransactionManager db3TransactionManager(
@Qualifier("db3EntityManagerFactory") EntityManagerFactory db3EntityManagerFactory) {
return new JpaTransactionManager(db3EntityManagerFactory);
}
}
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(entityManagerFactoryRef = "db2EntityManagerFactory",
transactionManagerRef = "db2TransactionManager",
basePackages = { "demo.sample.dao" })
public class Db2Config {
@Bean(name = "db2DataSource")
@ConfigurationProperties(prefix = "db2.datasource")
public DataSource dataSource() {
return DataSourceBuilder.create()
.build();
}
@Bean(name = "db2EntityManagerFactory")
public LocalContainerEntityManagerFactoryBean db3EntityManagerFactory(EntityManagerFactoryBuilder builder,
@Qualifier("db2DataSource") DataSource dataSource) {
return builder.dataSource(dataSource)
.packages("demo.sample.dao")
.persistenceUnit("db2")
.build();
}
@Bean(name = "db2TransactionManager")
public PlatformTransactionManager db2TransactionManager(
@Qualifier("db2EntityManagerFactory") EntityManagerFactory db2EntityManagerFactory) {
return new JpaTransactionManager(db2EntityManagerFactory);
}
}
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(entityManagerFactoryRef = "db1EntityManagerFactory",
transactionManagerRef = "db1TransactionManager",
basePackages = { "demo.sample.dao" })
public class Db1Config {
@Bean(name = "db1DataSource")
@ConfigurationProperties(prefix = "db1.datasource")
public DataSource dataSource() {
return DataSourceBuilder.create()
.build();
}
@Bean(name = "db1EntityManagerFactory")
public LocalContainerEntityManagerFactoryBean db1EntityManagerFactory(EntityManagerFactoryBuilder builder,
@Qualifier("db1DataSource") DataSource dataSource) {
return builder.dataSource(dataSource)
.packages("demo.sample.dao")
.persistenceUnit("db1")
.build();
}
@Bean(name = "db1TransactionManager")
public PlatformTransactionManager db1TransactionManager(
@Qualifier("db1EntityManagerFactory") EntityManagerFactory db1EntityManagerFactory) {
return new JpaTransactionManager(db1EntityManagerFactory);
}
}
-- Job Config
@Configuration
public class DBImportJob{
@Autowired
Db1Config db1Config ;
@Autowired
Db2Config db2Config ;
@Autowired
Db3Config db3Config ;
@Bean(name = "DBImportJob")
public Job dbImportJob(final JobBuilderFactory jobBuilderFactory, final StepBuilderFactory stepBuilderFactory) {
Step person1= stepBuilderFactory.get("Person1_FROM_DB1_TABLE")
.<Person, Person>chunk(1000)
.reader(personReader(db1Config.dataSource(),
"select * from person"))
.writer(writer(db3Config.dataSource(),
"insert into person1"))
.build();
Step person2= stepBuilderFactory.get("Person2_FROM_DB2_TABLE")
.<Person, Person>chunk(1000)
.reader(personReader1(db2Config.dataSource(),
"select * from person"))
.writer(writer1(db3Config.dataSource(),
"insert into person2"))
.build();
return jobBuilderFactory.get("personImportJob")
.incrementer(new RunIdIncrementer())
.start(person1)
.next(person2)
.build();
}
@Bean
JdbcCursorItemReader<Person> personReader(DataSource dataSource, String sql) {
JdbcCursorItemReader<Person> databaseReader = new JdbcCursorItemReader<>();
databaseReader.setDataSource(dataSource);
databaseReader.setSql(sql);
databaseReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
return databaseReader;
}
@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource, String sql) {
JdbcBatchItemWriter<Person> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setSql(sql);
itemWriter.setDataSource(dataSource);
itemWriter
.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
return itemWriter;
}
@Bean
JdbcCursorItemReader<Person> personReader1(DataSource dataSource, String sql) {
JdbcCursorItemReader<Person> databaseReader = new JdbcCursorItemReader<>();
databaseReader.setDataSource(dataSource);
databaseReader.setSql(sql);
databaseReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
return databaseReader;
}
@Bean
public JdbcBatchItemWriter<Person> writer1(DataSource dataSource, String sql) {
JdbcBatchItemWriter<Person> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setSql(sql);
itemWriter.setDataSource(dataSource);
itemWriter
.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
return itemWriter;
}
}
Main classes
@SpringBootApplication
@EnableBatchProcessing
public class CopyDbApplication {
public static void main(String[] args) throws JobExecutionException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException {
ConfigurableApplicationContext ctx = SpringApplication.run(CopyDbApplication.class, args);
JobLauncher jobLauncher = (JobLauncher) ctx.getBean("jobLauncher");
Job dbImportJob= (Job) ctx.getBean("DBImportJob");
jobLauncher.run(dbImportJob, newExecution("DBImportJob"));
}
private static JobParameters newExecution(String jobName) {
return new JobParametersBuilder().addDate("date", new Date())
.addLong("time", System.currentTimeMillis())
.addString("jobName", jobName)
.toJobParameters();
}
}
执行此程序后,所有记录都被插入到 DB3 中的 person1 表中,而 DB3 中的 person2 表没有被填充。
例外输出:
来自 DB1 的 person1 -> DB3 中的
person1 来自 DB2 的 person2 -> DB3 中的 person2
解决方案
以下程序会将表从两个不同的数据库复制到单个数据库(目标数据库)。例如,DB1 中的 Table1 和 DB3 中的 Table1 和 DB2 中的 Table2 到 DB3 中的 Table2。
// Datasource Config
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(entityManagerFactoryRef = "db3EntityManagerFactory",
transactionManagerRef = "db3TransactionManager",
basePackages = { "demo.sample.dao" })
public class Db3Config {
@Primary
@Bean(name = "db3DataSource")
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource dataSource() {
return DataSourceBuilder.create()
.build();
}
@Primary
@Bean(name = "db3EntityManagerFactory")
public LocalContainerEntityManagerFactoryBean db3EntityManagerFactory(EntityManagerFactoryBuilder builder,
@Qualifier("db3DataSource") DataSource dataSource) {
return builder.dataSource(dataSource)
.packages("demo.sample.dao")
.persistenceUnit("db3")
.build();
}
@Primary
@Bean(name = "db3TransactionManager")
public PlatformTransactionManager db3TransactionManager(
@Qualifier("db3EntityManagerFactory") EntityManagerFactory db3EntityManagerFactory) {
return new JpaTransactionManager(db3EntityManagerFactory);
}
}
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(entityManagerFactoryRef = "db2EntityManagerFactory",
transactionManagerRef = "db2TransactionManager",
basePackages = { "demo.sample.dao" })
public class Db2Config {
@Bean(name = "db2DataSource")
@ConfigurationProperties(prefix = "db2.datasource")
public DataSource dataSource() {
return DataSourceBuilder.create()
.build();
}
@Bean(name = "db2EntityManagerFactory")
public LocalContainerEntityManagerFactoryBean db3EntityManagerFactory(EntityManagerFactoryBuilder builder,
@Qualifier("db2DataSource") DataSource dataSource) {
return builder.dataSource(dataSource)
.packages("demo.sample.dao")
.persistenceUnit("db2")
.build();
}
@Bean(name = "db2TransactionManager")
public PlatformTransactionManager db2TransactionManager(
@Qualifier("db2EntityManagerFactory") EntityManagerFactory db2EntityManagerFactory) {
return new JpaTransactionManager(db2EntityManagerFactory);
}
}
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(entityManagerFactoryRef = "db1EntityManagerFactory",
transactionManagerRef = "db1TransactionManager",
basePackages = { "demo.sample.dao" })
public class Db1Config {
@Bean(name = "db1DataSource")
@ConfigurationProperties(prefix = "db1.datasource")
public DataSource dataSource() {
return DataSourceBuilder.create()
.build();
}
@Bean(name = "db1EntityManagerFactory")
public LocalContainerEntityManagerFactoryBean db1EntityManagerFactory(EntityManagerFactoryBuilder builder,
@Qualifier("db1DataSource") DataSource dataSource) {
return builder.dataSource(dataSource)
.packages("demo.sample.dao")
.persistenceUnit("db1")
.build();
}
@Bean(name = "db1TransactionManager")
public PlatformTransactionManager db1TransactionManager(
@Qualifier("db1EntityManagerFactory") EntityManagerFactory db1EntityManagerFactory) {
return new JpaTransactionManager(db1EntityManagerFactory);
}
}
-- Job Config
@Configuration
public class DBImportJob{
@Autowired
Db1Config db1Config ;
@Autowired
Db2Config db2Config ;
@Autowired
Db3Config db3Config ;
@Bean(name = "DBImportJob")
public Job dbImportJob(final JobBuilderFactory jobBuilderFactory, final StepBuilderFactory stepBuilderFactory) {
Step person1= stepBuilderFactory.get("Person1_FROM_DB1_TABLE")
.<Person, Person>chunk(1000)
.reader(personReader(db1Config.dataSource(),
"select * from person"))
.writer(writer(db3Config.dataSource(),
"insert into person1"))
.build();
Step person2= stepBuilderFactory.get("Person2_FROM_DB2_TABLE")
.<Person, Person>chunk(1000)
.reader(personReader(db2Config.dataSource(),
"select * from person"))
.writer(writer(db3Config.dataSource(),
"insert into person2"))
.build();
return jobBuilderFactory.get("personImportJob")
.incrementer(new RunIdIncrementer())
.start(person1)
.next(person2)
.build();
}
@Bean
@StepScope
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
JdbcCursorItemReader<Person> personReader(DataSource dataSource, String sql) {
JdbcCursorItemReader<Person> databaseReader = new JdbcCursorItemReader<>();
databaseReader.setDataSource(dataSource);
databaseReader.setSql(sql);
databaseReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
return databaseReader;
}
@Bean
@StepScope
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public JdbcBatchItemWriter<Person> writer(DataSource dataSource, String sql) {
JdbcBatchItemWriter<Person> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setSql(sql);
itemWriter.setDataSource(dataSource);
itemWriter
.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
return itemWriter;
}
}
Main classes
@SpringBootApplication
@EnableBatchProcessing
public class CopyDbApplication {
public static void main(String[] args) throws JobExecutionException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException {
ConfigurableApplicationContext ctx = SpringApplication.run(CopyDbApplication.class, args);
JobLauncher jobLauncher = (JobLauncher) ctx.getBean("jobLauncher");
Job dbImportJob= (Job) ctx.getBean("DBImportJob");
jobLauncher.run(dbImportJob, newExecution("DBImportJob"));
}
private static JobParameters newExecution(String jobName) {
return new JobParametersBuilder().addDate("date", new Date())
.addLong("time", System.currentTimeMillis())
.addString("jobName", jobName)
.toJobParameters();
}
}
推荐阅读
- python - 使用重采样获取数据框python中的工作日总和
- java - 哪个reference Finalizer(FinalReference)或Weak/Phantom/Soft Reference对GC有更高的优先级
- python - 如何选择带有硒的切片机?
- javascript - 如何将内部html的文本更改为id的文本
- qt - 如何使 QTreeView 显示不在第一列的子树
- python - 如何在 python 中处理 TypeError?
- c# - C# POST 甚至多次运行方法,但应该只运行一次
- c# - 生成 ListViewItem 时,如何更改此“签入”按钮的颜色?
- java - 如何在 intellij 中查看静态字符串常量的连接值
- json - 如何在 Swift 中从 JSON 中解码大量数字