首页 > 解决方案 > 使用弹簧批处理将表从一个数据库复制到另一个数据库

问题描述

我需要将表从两个不同的数据库复制到单个数据库(目标数据库)。
例如,DB1 中的 Table1 和 DB3 中的 Table1 以及 DB2 中的 Table2 到 DB3 中的 Table2。所有表都共享相同的结构。
我创建了三个数据源和一个阅读器

// Datasource Config

@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(entityManagerFactoryRef = "db3EntityManagerFactory",
    transactionManagerRef = "db3TransactionManager",
    basePackages = { "demo.sample.dao" })
public class Db3Config {

    @Primary
    @Bean(name = "db3DataSource")
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource dataSource() {
    return DataSourceBuilder.create()
        .build();
    }

    @Primary
    @Bean(name = "db3EntityManagerFactory")
    public LocalContainerEntityManagerFactoryBean db3EntityManagerFactory(EntityManagerFactoryBuilder builder,
        @Qualifier("db3DataSource") DataSource dataSource) {
    return builder.dataSource(dataSource)
        .packages("demo.sample.dao")
        .persistenceUnit("db3")
        .build();
    }

    @Primary
    @Bean(name = "db3TransactionManager")
    public PlatformTransactionManager db3TransactionManager(
        @Qualifier("db3EntityManagerFactory") EntityManagerFactory db3EntityManagerFactory) {
    return new JpaTransactionManager(db3EntityManagerFactory);
    }

}

@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(entityManagerFactoryRef = "db2EntityManagerFactory",
    transactionManagerRef = "db2TransactionManager",
    basePackages = { "demo.sample.dao" })
public class Db2Config {

    @Bean(name = "db2DataSource")
    @ConfigurationProperties(prefix = "db2.datasource")
    public DataSource dataSource() {
    return DataSourceBuilder.create()
        .build();
    }

    @Bean(name = "db2EntityManagerFactory")
    public LocalContainerEntityManagerFactoryBean db3EntityManagerFactory(EntityManagerFactoryBuilder builder,
        @Qualifier("db2DataSource") DataSource dataSource) {
    return builder.dataSource(dataSource)
        .packages("demo.sample.dao")
        .persistenceUnit("db2")
        .build();
    }


    @Bean(name = "db2TransactionManager")
    public PlatformTransactionManager db2TransactionManager(
        @Qualifier("db2EntityManagerFactory") EntityManagerFactory db2EntityManagerFactory) {
    return new JpaTransactionManager(db2EntityManagerFactory);
    }

}

@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(entityManagerFactoryRef = "db1EntityManagerFactory",
    transactionManagerRef = "db1TransactionManager",
    basePackages = { "demo.sample.dao" })
public class Db1Config {
    @Bean(name = "db1DataSource")
    @ConfigurationProperties(prefix = "db1.datasource")
    public DataSource dataSource() {
    return DataSourceBuilder.create()
        .build();
    }
     @Bean(name = "db1EntityManagerFactory")
    public LocalContainerEntityManagerFactoryBean db1EntityManagerFactory(EntityManagerFactoryBuilder builder,
        @Qualifier("db1DataSource") DataSource dataSource) {
    return builder.dataSource(dataSource)
        .packages("demo.sample.dao")
        .persistenceUnit("db1")
        .build();
    }

    @Bean(name = "db1TransactionManager")
    public PlatformTransactionManager db1TransactionManager(
        @Qualifier("db1EntityManagerFactory") EntityManagerFactory db1EntityManagerFactory) {
    return new JpaTransactionManager(db1EntityManagerFactory);
    }

}

-- Job Config

@Configuration
public class DBImportJob{

    @Autowired
    Db1Config db1Config ;

    @Autowired
    Db2Config db2Config ;

    @Autowired
    Db3Config db3Config ;

 @Bean(name = "DBImportJob")
    public Job dbImportJob(final JobBuilderFactory jobBuilderFactory, final StepBuilderFactory stepBuilderFactory) {


Step person1= stepBuilderFactory.get("Person1_FROM_DB1_TABLE")
        .<Person, Person>chunk(1000)
        .reader(personReader(db1Config.dataSource(),
            "select * from person"))
        .writer(writer(db3Config.dataSource(),
            "insert into person1"))
        .build();

    Step person2= stepBuilderFactory.get("Person2_FROM_DB2_TABLE")
        .<Person, Person>chunk(1000)
        .reader(personReader1(db2Config.dataSource(),
            "select * from person"))
        .writer(writer1(db3Config.dataSource(),
            "insert into person2"))     
            .build();


    return jobBuilderFactory.get("personImportJob")
        .incrementer(new RunIdIncrementer())
        .start(person1)
        .next(person2)      
        .build();

    }

    @Bean
    JdbcCursorItemReader<Person> personReader(DataSource dataSource, String sql) {
    JdbcCursorItemReader<Person> databaseReader = new JdbcCursorItemReader<>();
    databaseReader.setDataSource(dataSource);
    databaseReader.setSql(sql);
    databaseReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
    return databaseReader;
    }

    @Bean
    public JdbcBatchItemWriter<Person> writer(DataSource dataSource, String sql) {
    JdbcBatchItemWriter<Person> itemWriter = new JdbcBatchItemWriter<>();
    itemWriter.setSql(sql);
    itemWriter.setDataSource(dataSource);
    itemWriter
        .setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
    return itemWriter;
    }
 @Bean
    JdbcCursorItemReader<Person> personReader1(DataSource dataSource, String sql) {
    JdbcCursorItemReader<Person> databaseReader = new JdbcCursorItemReader<>();
    databaseReader.setDataSource(dataSource);
    databaseReader.setSql(sql);
    databaseReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
    return databaseReader;
    }

    @Bean
    public JdbcBatchItemWriter<Person> writer1(DataSource dataSource, String sql) {
    JdbcBatchItemWriter<Person> itemWriter = new JdbcBatchItemWriter<>();
    itemWriter.setSql(sql);
    itemWriter.setDataSource(dataSource);
    itemWriter
        .setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
    return itemWriter;
    }

}

Main classes
@SpringBootApplication
@EnableBatchProcessing
public class CopyDbApplication {

    public static void main(String[] args) throws JobExecutionException, JobRestartException,
        JobInstanceAlreadyCompleteException, JobParametersInvalidException {

    ConfigurableApplicationContext ctx = SpringApplication.run(CopyDbApplication.class, args);

    JobLauncher jobLauncher = (JobLauncher) ctx.getBean("jobLauncher");
    Job dbImportJob= (Job) ctx.getBean("DBImportJob");
    jobLauncher.run(dbImportJob, newExecution("DBImportJob"));

    }

    private static JobParameters newExecution(String jobName) {
    return new JobParametersBuilder().addDate("date", new Date())
        .addLong("time", System.currentTimeMillis())
        .addString("jobName", jobName)
        .toJobParameters();

    }
}

执行此程序后,所有记录都被插入到 DB3 中的 person1 表中,而 DB3 中的 person2 表没有被填充。

例外输出:
来自 DB1 的 person1 -> DB3 中的
person1 来自 DB2 的 person2 -> DB3 中的 person2

标签: spring-batch

解决方案


以下程序会将表从两个不同的数据库复制到单个数据库(目标数据库)。例如,DB1 中的 Table1 和 DB3 中的 Table1 和 DB2 中的 Table2 到 DB3 中的 Table2

 // Datasource Config
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(entityManagerFactoryRef = "db3EntityManagerFactory",
    transactionManagerRef = "db3TransactionManager",
    basePackages = { "demo.sample.dao" })
public class Db3Config {

    @Primary
    @Bean(name = "db3DataSource")
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource dataSource() {
    return DataSourceBuilder.create()
        .build();
    }

    @Primary
    @Bean(name = "db3EntityManagerFactory")
    public LocalContainerEntityManagerFactoryBean db3EntityManagerFactory(EntityManagerFactoryBuilder builder,
        @Qualifier("db3DataSource") DataSource dataSource) {
    return builder.dataSource(dataSource)
        .packages("demo.sample.dao")
        .persistenceUnit("db3")
        .build();
    }

    @Primary
    @Bean(name = "db3TransactionManager")
    public PlatformTransactionManager db3TransactionManager(
        @Qualifier("db3EntityManagerFactory") EntityManagerFactory db3EntityManagerFactory) {
    return new JpaTransactionManager(db3EntityManagerFactory);
    }

}

@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(entityManagerFactoryRef = "db2EntityManagerFactory",
    transactionManagerRef = "db2TransactionManager",
    basePackages = { "demo.sample.dao" })
public class Db2Config {

    @Bean(name = "db2DataSource")
    @ConfigurationProperties(prefix = "db2.datasource")
    public DataSource dataSource() {
    return DataSourceBuilder.create()
        .build();
    }

    @Bean(name = "db2EntityManagerFactory")
    public LocalContainerEntityManagerFactoryBean db3EntityManagerFactory(EntityManagerFactoryBuilder builder,
        @Qualifier("db2DataSource") DataSource dataSource) {
    return builder.dataSource(dataSource)
        .packages("demo.sample.dao")
        .persistenceUnit("db2")
        .build();
    }


    @Bean(name = "db2TransactionManager")
    public PlatformTransactionManager db2TransactionManager(
        @Qualifier("db2EntityManagerFactory") EntityManagerFactory db2EntityManagerFactory) {
    return new JpaTransactionManager(db2EntityManagerFactory);
    }

}

@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(entityManagerFactoryRef = "db1EntityManagerFactory",
    transactionManagerRef = "db1TransactionManager",
    basePackages = { "demo.sample.dao" })
public class Db1Config {
    @Bean(name = "db1DataSource")
    @ConfigurationProperties(prefix = "db1.datasource")
    public DataSource dataSource() {
    return DataSourceBuilder.create()
        .build();
    }
     @Bean(name = "db1EntityManagerFactory")
    public LocalContainerEntityManagerFactoryBean db1EntityManagerFactory(EntityManagerFactoryBuilder builder,
        @Qualifier("db1DataSource") DataSource dataSource) {
    return builder.dataSource(dataSource)
        .packages("demo.sample.dao")
        .persistenceUnit("db1")
        .build();
    }

    @Bean(name = "db1TransactionManager")
    public PlatformTransactionManager db1TransactionManager(
        @Qualifier("db1EntityManagerFactory") EntityManagerFactory db1EntityManagerFactory) {
    return new JpaTransactionManager(db1EntityManagerFactory);
    }

}

-- Job Config

@Configuration
public class DBImportJob{

    @Autowired
    Db1Config db1Config ;

    @Autowired
    Db2Config db2Config ;

    @Autowired
    Db3Config db3Config ;

 @Bean(name = "DBImportJob")
    public Job dbImportJob(final JobBuilderFactory jobBuilderFactory, final StepBuilderFactory stepBuilderFactory) {


Step person1= stepBuilderFactory.get("Person1_FROM_DB1_TABLE")
        .<Person, Person>chunk(1000)
        .reader(personReader(db1Config.dataSource(),
            "select * from person"))
        .writer(writer(db3Config.dataSource(),
            "insert into person1"))
        .build();

    Step person2= stepBuilderFactory.get("Person2_FROM_DB2_TABLE")
        .<Person, Person>chunk(1000)
        .reader(personReader(db2Config.dataSource(),
            "select * from person"))
        .writer(writer(db3Config.dataSource(),
            "insert into person2"))     
            .build();


    return jobBuilderFactory.get("personImportJob")
        .incrementer(new RunIdIncrementer())
        .start(person1)
        .next(person2)      
        .build();

    }

    @Bean
    @StepScope
    @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    JdbcCursorItemReader<Person> personReader(DataSource dataSource, String sql) {
    JdbcCursorItemReader<Person> databaseReader = new JdbcCursorItemReader<>();
    databaseReader.setDataSource(dataSource);
    databaseReader.setSql(sql);
    databaseReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
    return databaseReader;
    }

    @Bean
    @StepScope
    @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public JdbcBatchItemWriter<Person> writer(DataSource dataSource, String sql) {
    JdbcBatchItemWriter<Person> itemWriter = new JdbcBatchItemWriter<>();
    itemWriter.setSql(sql);
    itemWriter.setDataSource(dataSource);
    itemWriter
        .setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
    return itemWriter;
    }

}

Main classes
@SpringBootApplication
@EnableBatchProcessing
public class CopyDbApplication {

    public static void main(String[] args) throws JobExecutionException, JobRestartException,
        JobInstanceAlreadyCompleteException, JobParametersInvalidException {

    ConfigurableApplicationContext ctx = SpringApplication.run(CopyDbApplication.class, args);

    JobLauncher jobLauncher = (JobLauncher) ctx.getBean("jobLauncher");
    Job dbImportJob= (Job) ctx.getBean("DBImportJob");
    jobLauncher.run(dbImportJob, newExecution("DBImportJob"));

    }

    private static JobParameters newExecution(String jobName) {
    return new JobParametersBuilder().addDate("date", new Date())
        .addLong("time", System.currentTimeMillis())
        .addString("jobName", jobName)
        .toJobParameters();

    }
}

推荐阅读