首页 > 解决方案 > Spring Batch循环阅读器同时更改sql查询并写入不同的文件

问题描述

首先非常感谢您不厌其烦地阅读本咨询。我着手使用Spring Batch进行一个项目的实现,我的查询如下。我有一个阅读器,它根据变量的值完成查询的字段并将结果带给我并继续将它们写入文件。问题是其中一个可能的变量是“ALL”,它应该运行阅读器三次,更改 CUSTOMER、ACCOUNT 和 PRODUCT 的查询字段。这会给我不同的结果,因为它们是不同的查询,应该写入不同的文件。

有什么办法可以做到这一点吗?根据我的研究,我无法在作业运行时对其进行修改,因此我无法根据值“ALL”向其添加另一个步骤。

非常感谢您的宝贵时间。

资源

public Resource outputResource() {
        String outputDir = env.getProperty("uy.com.antel.up.data.folder.out");
        Date date = new Date();
        DateFormat hourdateFormat = new SimpleDateFormat("dd-MM-yyyy");
        String outputName = outputDir + "webUserbatch-" + hourdateFormat.format(date) + ".txt";
        File file = new File(outputName);
        if(file.exists()) {
            try {
                file.createNewFile();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        Resource resource = new FileSystemResource(file);

        return resource;
    }

读者

@SuppressWarnings({ "rawtypes", "unchecked" })
    @Bean(destroyMethod = "")
    @StepScope
    public JdbcCursorItemReader reader(DataSource dataSource, @Value("#{jobParameters}") Map<String, Object> map) {
        JdbcCursorItemReader reader = new JdbcCursorItemReader();
        reader.setDataSource(dataSource);
        reader.setRowMapper(new UsuariosProductosRowMapper());
        String urn = "", fechaInicio = "", fechaFin = "", ultimafecha = "", tabla_consultar = "";
        if (map.get("urn") != null) {
            urn = map.get("urn").toString();
        }
        if (map.get("fechaInicio") != null) {
            fechaInicio = map.get("fechaInicio").toString();
        }
        if (map.get("fechaFin") != null) {
            fechaFin = map.get("fechaFin").toString();
        }
        if (map.get("ultimafecha") != null) {
            ultimafecha = map.get("ultimafecha").toString();
        }
        if (map.get("tabla_consultar") != null) {
            tabla_consultar = map.get("tabla_consultar").toString();
        }
        String tipoGeneracion = env.getProperty("uy.com.antel.up.tipo.carga");

        Date date = new Date();

        //Se le asigna el valor de "" para que se pueda realizar busquedas personalizadas, sino se maneja solamente con la carga inicial 
        //tipoGeneracion = "";

        LOG.info("URN: " + urn);
        LOG.info("Fecha desde: " + fechaInicio);
        LOG.info("Fecha hasta: " + fechaFin);
        LOG.info("Ultima fecha: " + ultimafecha);
        LOG.info("Tabla consultar: " + tabla_consultar);


        //eliminar esto para que tome otro valor por afuera del inicial
        //tabla accreditation_event
        //datos faltantes: user_name, acrreditation_level

        String table_online_id = "";
        String table_reference = "";
        String shorcut_table_online = "";
        String shorcut_table_reference = "";
        String attribute_table_online ="";

        if (tabla_consultar == null || tabla_consultar.equals(CommandArgument.ALL.name()) ) {
            //
            //At this point the program should run the query for CUSTOMER, ACCOUNT and PRODUCT
            //
        }else{
            if (tabla_consultar.equals(CommandArgument.CUSTOMER.name())) {
                table_online_id = "online_id_customer";
                table_reference = "customer_reference";
                shorcut_table_online = "oidc";
                attribute_table_online = "customer";
                shorcut_table_reference = "cr";
            }else if (tabla_consultar.equals(CommandArgument.PRODUCT.name())) {
                table_online_id = "online_id_product";
                table_reference = "product_reference";
                shorcut_table_online = "oidp";
                attribute_table_online = "product";
                shorcut_table_reference = "pr";
            }else {
                table_online_id = "online_id_account";
                table_reference = "account_reference";
                shorcut_table_online = "oida";
                attribute_table_online = "account";
                shorcut_table_reference = "ar";
            }
          }





        if (tipoGeneracion.equalsIgnoreCase("inicial")) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + "ace.date<='" + date + "'");
        } else if (urn.isEmpty() && fechaInicio.isEmpty() && fechaFin.isEmpty()) {

            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            Calendar cal = Calendar.getInstance();
            cal.add(Calendar.MONTH, -1);
            cal.set(Calendar.DATE, 1);

            Date firstDateOfPreviousMonth = cal.getTime();
            String fechaDesde = sdf.format(firstDateOfPreviousMonth);

            cal.set(Calendar.DATE, cal.getActualMaximum(Calendar.DATE)); 

            Date lastDateOfPreviousMonth = cal.getTime();

            String fechaHasta = sdf.format(lastDateOfPreviousMonth);

            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and ace.date>='" + fechaDesde + "' and ace.date<='" + fechaHasta + "'");
        } else if (!urn.isEmpty() && !fechaInicio.isEmpty() && !fechaFin.isEmpty()) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + "      ace.date>='" + map.get("fechaInicio") + "' and \r\n"
                    + "   ace.date<='" + map.get("fechaFin") + "' and  pr.urn like '%" + map.get("urn") + "%'");
        } else if (!urn.isEmpty() && fechaInicio.isEmpty() && fechaFin.isEmpty()) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + " pr.urn like '%" + map.get("urn") + "%'");
        } else if (urn.isEmpty() && !fechaInicio.isEmpty() && !fechaFin.isEmpty()) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + "      ace.date>='" + map.get("fechaInicio") + "' and \r\n"
                    + "   ace.date<='" + map.get("fechaFin") + "'");
        } else if (urn.isEmpty() && !fechaInicio.isEmpty() && fechaFin.isEmpty()) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + "      ace.date>='" + map.get("fechaInicio") + "'");
        } else if (!urn.isEmpty() && !fechaInicio.isEmpty() && fechaFin.isEmpty()) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + "      ace.date>='" + map.get("fechaInicio") + "' and \r\n"
                    + " pr.urn like '%" + map.get("urn") + "%'");
        } else if (!urn.isEmpty() && fechaInicio.isEmpty() && !fechaFin.isEmpty()) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + "      ace.date<='" + map.get("fechaFin") + "' and  pr.urn like '%"
                    + map.get("urn") + "%'");
        } else if (urn.isEmpty() && fechaInicio.isEmpty() && !fechaFin.isEmpty()) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + "      ace.date<='" + map.get("fechaFin") + "'");
        } else if (urn.isEmpty() && fechaInicio.isEmpty() && fechaFin.isEmpty() && !ultimafecha.isEmpty()) {
            reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " oea.valid=true and " + "      ace.date='" + map.get("ultimafecha") + "'");
        }

        LOG.info("SQL: " + reader.getSql());

        int counter = 0;
        ExecutionContext executionContext = new ExecutionContext();
        reader.open(executionContext);
        Object customerCredit = new Object();
        while (customerCredit != null) {
            try {
                customerCredit = reader.read();
                counter++;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        LOG.info("Registros procesados: " + (counter - 1));

        reader.close();

        return reader;
    }

作家

@Bean
    @StepScope
    public ItemWriter<UsuariosProductos> writer() throws Exception {
        FlatFileItemWriter<UsuariosProductos> writer = new FlatFileItemWriter<>();
        try {
            writer.setResource(this.outputResource());
            writer.open(new ExecutionContext());

            BeanWrapperFieldExtractor<UsuariosProductos> fieldExtractor = new BeanWrapperFieldExtractor<>();
            fieldExtractor.setNames(new String[] { "username", "urn", "fecha", "action", "accreditation_level"});

            DelimitedLineAggregator<UsuariosProductos> delLineAgg = new DelimitedLineAggregator<UsuariosProductos>();
            delLineAgg.setDelimiter(",");
            delLineAgg.setFieldExtractor(fieldExtractor);
            writer.setLineAggregator(delLineAgg);

            //agregar header
            writer.setHeaderCallback(new FlatFileHeaderCallback() {
                @Override
                public void writeHeader(Writer writer) throws IOException {
                    writer.write("------USERNAME------,--------URN--------,------------------FECHA------------------,--------ACTION--------,-------------------------ACCREDITATION_LEVEL-----");
                    writer.write("----------------------------------------");
                }
            });

            //eliminar footer
            writer.setFooterCallback(new FlatFileFooterCallback() {
                @Override
                public void writeFooter(Writer writer) throws IOException {
//                  writer.write("Archivos procesados: ");
                    writer.write("----------------------------------------\r\n" + 
                            "----- FIN ARCHIVO DE PROCESADOS OK -----");
                }
            });

            writer.close();
        } catch (WriterNotOpenException e) {
            LOG.info("Excepcion encontrada: " + e.getMessage());
        }
        return writer;

    }

工作

@Bean
    public Job usuariosProductosJob(JobListener listener, Step step1) {
        return jobBuilderFactory.get("usuariosProductosJob").incrementer(new RunIdIncrementer()).listener(listener)
                .flow(step1).end().build();
    }

    @Bean
    public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<UsuariosProductos> reader,
            ItemWriter<UsuariosProductos> writer, ItemProcessor<UsuariosProductos, UsuariosProductos> processor) {

        return stepBuilderFactory.get("step1").<UsuariosProductos, UsuariosProductos>chunk(obtenerChunk())
                .reader(reader).processor(processor).writer(writer).build();
    }


    public Integer obtenerChunk() {
        Integer chunk = Integer.valueOf(env.getProperty("uy.com.antel.up.chunk.step"));

        return chunk;
    }

UsuariosProductRowMapper.class

public class UsuariosProductosRowMapper implements RowMapper<UsuariosProductos>{

    private static final Logger LOG = LoggerFactory.getLogger(UsuariosProductosRowMapper.class);

    private static final String USERNAME = "userName";
    private static final String URN = "urn";
    private static final String FECHA = "fecha";
    private static final String ACTION = "action";
    private static final String ACCREDITATION_LEVEL = "accreditation_level";


    @Override
    public UsuariosProductos mapRow(ResultSet rs, int rowNum) throws SQLException {
        UsuariosProductos up = new UsuariosProductos();
        up.setUsername(rs.getString(USERNAME));
        up.setUrn(rs.getString(URN));
        up.setAction(rs.getString(ACTION));
        up.setAccreditation_level(rs.getString(ACCREDITATION_LEVEL));

        LOG.info("Estoy en UsuariosProductosRow");

        LOG.info("resultSet: " + rs.toString());
        LOG.info("action: " + rs.getString(ACTION));
        LOG.info("accreditation_level: " + rs.getString(ACCREDITATION_LEVEL));

        try {
            java.sql.Timestamp fecha = rs.getTimestamp(FECHA);
            LOG.info("Fecha java sql: " + fecha);
            up.setFecha(fecha.toString());
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        if(rs.isLast()) {
            up.setProcesados(rs.getRow());
        }
        return up;
    }

}

处理器

public class UsuariosProductosProcessor implements ItemProcessor<UsuariosProductos, UsuariosProductos>{

    private static final Logger LOG = LoggerFactory.getLogger(UsuariosProductosProcessor.class);

    @Override
    public UsuariosProductos process(UsuariosProductos item) throws Exception {
        UsuariosProductos up = new UsuariosProductos();
        if(item.getUsername() != null)
            up.setUsername("userName: " + item.getUsername() + " ");
        else
            up.setUsername("userName: Not found ");
        // no igual, nada. null

        up.setUrn("urn: " + item.getUrn() + " ");
        up.setFecha("fechaAcreditación: " + item.getFecha() + " ");
        up.setAction("action: " + item.getAction() + " ");
        up.setAccreditation_level("accreditation_level: " + item.getAccreditation_level() + " ");

        return up;
    }

}

标签: javamysqlspringspring-batch

解决方案


根据 Mahmoud Ben Hassine 的评论,我决定使用几个工作来完成不同的任务,但不会破坏代码中已经存在的结构,因为我正在开发的软件属于客户,我不知道我可以从中修改多少。

我所做的是以下,定义一个变量,该变量接受在 jar 中输入的参数,并试图执行与其关联的作业的过程。在 ALL 的情况下,它将一个接一个地执行每个作业。

if (map.get("tabla_consultar") != null) {
            type_job = map.get("tabla_consultar").toString();
            if (type_job.equalsIgnoreCase(CommandArgument.ALL.name())) {
                map.put("tabla_consultar", new JobParameter(CommandArgument.ACCOUNT.name())); 
                executeJob(ctx.getBean("usuariosAccountJob", Job.class), jobLauncher, map);
                map.put("tabla_consultar", new JobParameter(CommandArgument.CUSTOMER.name()));
                executeJob(ctx.getBean("usuariosCustomerJob", Job.class), jobLauncher, map);
                map.put("tabla_consultar", new JobParameter(CommandArgument.PRODUCT.name()));
                executeJob(ctx.getBean("usuariosProductosJob", Job.class), jobLauncher, map);
            }else if (type_job.equalsIgnoreCase(CommandArgument.CUSTOMER.name())) {
                executeJob(ctx.getBean("usuariosCustomerJob", Job.class), jobLauncher, map);
            }else if (type_job.equalsIgnoreCase(CommandArgument.ACCOUNT.name())) {
                executeJob(ctx.getBean("usuariosAccountJob", Job.class), jobLauncher, map);
            }else {
                executeJob(ctx.getBean("usuariosProductosJob", Job.class), jobLauncher, map);
            }
        }

在 BatchConfiguration 中,我有一个变量来检查您想要执行的作业类型,并从中完成查询的字段,该字段引用与该作业相关的表和属性。

    String table_online_id = "";
    String table_reference = "";
    String shorcut_table_online = "";
    String shorcut_table_reference = "";
    String attribute_table_online ="";

    if (tabla_consultar.equalsIgnoreCase(CommandArgument.CUSTOMER.name())) {
        table_online_id = "online_id_customer";
        table_reference = "customer_reference";
        shorcut_table_online = "oidc";
        attribute_table_online = "customer";
        shorcut_table_reference = "cr";
    }else if (tabla_consultar.equalsIgnoreCase(CommandArgument.PRODUCT.name())) {
        table_online_id = "online_id_product";
        table_reference = "product_reference";
        shorcut_table_online = "oidp";
        attribute_table_online = "product";
        shorcut_table_reference = "pr";
    }else {
        table_online_id = "online_id_account";
        table_reference = "account_reference";
        shorcut_table_online = "oida";
        attribute_table_online = "account";
        shorcut_table_reference = "ar";
    }

例如:

reader.setSql("select ul.user_name  as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.code as \"accreditation_level\", ace.date as \"fecha\"\r\n" 
                    + " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " +  shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
                    + " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.id=ace.id and \r\n"
                    + " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
                    + " ace.date<'" + date + "'");

为了完成变量“tabla_consultar”被用作文件名的一部分,这样我可以确定如果我要进行的查询来自客户,文件将是 xxxx-Customer-xxxx,如果我从 ACCOUNT 询问会有所不同或 PRODUCT,因此要实现文件不被覆盖,这也可以通过添加 hh:mm:ss 的值来实现。

DateFormat hourdateFormat = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
        String outputName = outputDir + "wxxxxxxx-"+ tabla_consultar +"-"+ hourdateFormat.format(date) + ".txt";
        File file = new File(outputName);

推荐阅读