java - Spring Batch循环阅读器同时更改sql查询并写入不同的文件
问题描述
首先非常感谢您不厌其烦地阅读本咨询。我着手使用Spring Batch进行一个项目的实现,我的查询如下。我有一个阅读器,它根据变量的值完成查询的字段并将结果带给我并继续将它们写入文件。问题是其中一个可能的变量是“ALL”,它应该运行阅读器三次,更改 CUSTOMER、ACCOUNT 和 PRODUCT 的查询字段。这会给我不同的结果,因为它们是不同的查询,应该写入不同的文件。
有什么办法可以做到这一点吗?根据我的研究,我无法在作业运行时对其进行修改,因此我无法根据值“ALL”向其添加另一个步骤。
非常感谢您的宝贵时间。
资源
public Resource outputResource() {
String outputDir = env.getProperty("uy.com.antel.up.data.folder.out");
Date date = new Date();
DateFormat hourdateFormat = new SimpleDateFormat("dd-MM-yyyy");
String outputName = outputDir + "webUserbatch-" + hourdateFormat.format(date) + ".txt";
File file = new File(outputName);
if(file.exists()) {
try {
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
Resource resource = new FileSystemResource(file);
return resource;
}
读者
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean(destroyMethod = "")
@StepScope
public JdbcCursorItemReader reader(DataSource dataSource, @Value("#{jobParameters}") Map<String, Object> map) {
JdbcCursorItemReader reader = new JdbcCursorItemReader();
reader.setDataSource(dataSource);
reader.setRowMapper(new UsuariosProductosRowMapper());
String urn = "", fechaInicio = "", fechaFin = "", ultimafecha = "", tabla_consultar = "";
if (map.get("urn") != null) {
urn = map.get("urn").toString();
}
if (map.get("fechaInicio") != null) {
fechaInicio = map.get("fechaInicio").toString();
}
if (map.get("fechaFin") != null) {
fechaFin = map.get("fechaFin").toString();
}
if (map.get("ultimafecha") != null) {
ultimafecha = map.get("ultimafecha").toString();
}
if (map.get("tabla_consultar") != null) {
tabla_consultar = map.get("tabla_consultar").toString();
}
String tipoGeneracion = env.getProperty("uy.com.antel.up.tipo.carga");
Date date = new Date();
//Se le asigna el valor de "" para que se pueda realizar busquedas personalizadas, sino se maneja solamente con la carga inicial
//tipoGeneracion = "";
LOG.info("URN: " + urn);
LOG.info("Fecha desde: " + fechaInicio);
LOG.info("Fecha hasta: " + fechaFin);
LOG.info("Ultima fecha: " + ultimafecha);
LOG.info("Tabla consultar: " + tabla_consultar);
//eliminar esto para que tome otro valor por afuera del inicial
//tabla accreditation_event
//datos faltantes: user_name, acrreditation_level
String table_online_id = "";
String table_reference = "";
String shorcut_table_online = "";
String shorcut_table_reference = "";
String attribute_table_online ="";
if (tabla_consultar == null || tabla_consultar.equals(CommandArgument.ALL.name()) ) {
//
//At this point the program should run the query for CUSTOMER, ACCOUNT and PRODUCT
//
}else{
if (tabla_consultar.equals(CommandArgument.CUSTOMER.name())) {
table_online_id = "online_id_customer";
table_reference = "customer_reference";
shorcut_table_online = "oidc";
attribute_table_online = "customer";
shorcut_table_reference = "cr";
}else if (tabla_consultar.equals(CommandArgument.PRODUCT.name())) {
table_online_id = "online_id_product";
table_reference = "product_reference";
shorcut_table_online = "oidp";
attribute_table_online = "product";
shorcut_table_reference = "pr";
}else {
table_online_id = "online_id_account";
table_reference = "account_reference";
shorcut_table_online = "oida";
attribute_table_online = "account";
shorcut_table_reference = "ar";
}
}
if (tipoGeneracion.equalsIgnoreCase("inicial")) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + "ace.date<='" + date + "'");
} else if (urn.isEmpty() && fechaInicio.isEmpty() && fechaFin.isEmpty()) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Calendar cal = Calendar.getInstance();
cal.add(Calendar.MONTH, -1);
cal.set(Calendar.DATE, 1);
Date firstDateOfPreviousMonth = cal.getTime();
String fechaDesde = sdf.format(firstDateOfPreviousMonth);
cal.set(Calendar.DATE, cal.getActualMaximum(Calendar.DATE));
Date lastDateOfPreviousMonth = cal.getTime();
String fechaHasta = sdf.format(lastDateOfPreviousMonth);
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and ace.date>='" + fechaDesde + "' and ace.date<='" + fechaHasta + "'");
} else if (!urn.isEmpty() && !fechaInicio.isEmpty() && !fechaFin.isEmpty()) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + " ace.date>='" + map.get("fechaInicio") + "' and \r\n"
+ " ace.date<='" + map.get("fechaFin") + "' and pr.urn like '%" + map.get("urn") + "%'");
} else if (!urn.isEmpty() && fechaInicio.isEmpty() && fechaFin.isEmpty()) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + " pr.urn like '%" + map.get("urn") + "%'");
} else if (urn.isEmpty() && !fechaInicio.isEmpty() && !fechaFin.isEmpty()) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + " ace.date>='" + map.get("fechaInicio") + "' and \r\n"
+ " ace.date<='" + map.get("fechaFin") + "'");
} else if (urn.isEmpty() && !fechaInicio.isEmpty() && fechaFin.isEmpty()) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + " ace.date>='" + map.get("fechaInicio") + "'");
} else if (!urn.isEmpty() && !fechaInicio.isEmpty() && fechaFin.isEmpty()) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + " ace.date>='" + map.get("fechaInicio") + "' and \r\n"
+ " pr.urn like '%" + map.get("urn") + "%'");
} else if (!urn.isEmpty() && fechaInicio.isEmpty() && !fechaFin.isEmpty()) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + " ace.date<='" + map.get("fechaFin") + "' and pr.urn like '%"
+ map.get("urn") + "%'");
} else if (urn.isEmpty() && fechaInicio.isEmpty() && !fechaFin.isEmpty()) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + " ace.date<='" + map.get("fechaFin") + "'");
} else if (urn.isEmpty() && fechaInicio.isEmpty() && fechaFin.isEmpty() && !ultimafecha.isEmpty()) {
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.name as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.acreditacion=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " oea.valid=true and " + " ace.date='" + map.get("ultimafecha") + "'");
}
LOG.info("SQL: " + reader.getSql());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
reader.open(executionContext);
Object customerCredit = new Object();
while (customerCredit != null) {
try {
customerCredit = reader.read();
counter++;
} catch (Exception e) {
e.printStackTrace();
}
}
LOG.info("Registros procesados: " + (counter - 1));
reader.close();
return reader;
}
作家
@Bean
@StepScope
public ItemWriter<UsuariosProductos> writer() throws Exception {
FlatFileItemWriter<UsuariosProductos> writer = new FlatFileItemWriter<>();
try {
writer.setResource(this.outputResource());
writer.open(new ExecutionContext());
BeanWrapperFieldExtractor<UsuariosProductos> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] { "username", "urn", "fecha", "action", "accreditation_level"});
DelimitedLineAggregator<UsuariosProductos> delLineAgg = new DelimitedLineAggregator<UsuariosProductos>();
delLineAgg.setDelimiter(",");
delLineAgg.setFieldExtractor(fieldExtractor);
writer.setLineAggregator(delLineAgg);
//agregar header
writer.setHeaderCallback(new FlatFileHeaderCallback() {
@Override
public void writeHeader(Writer writer) throws IOException {
writer.write("------USERNAME------,--------URN--------,------------------FECHA------------------,--------ACTION--------,-------------------------ACCREDITATION_LEVEL-----");
writer.write("----------------------------------------");
}
});
//eliminar footer
writer.setFooterCallback(new FlatFileFooterCallback() {
@Override
public void writeFooter(Writer writer) throws IOException {
// writer.write("Archivos procesados: ");
writer.write("----------------------------------------\r\n" +
"----- FIN ARCHIVO DE PROCESADOS OK -----");
}
});
writer.close();
} catch (WriterNotOpenException e) {
LOG.info("Excepcion encontrada: " + e.getMessage());
}
return writer;
}
工作
@Bean
public Job usuariosProductosJob(JobListener listener, Step step1) {
return jobBuilderFactory.get("usuariosProductosJob").incrementer(new RunIdIncrementer()).listener(listener)
.flow(step1).end().build();
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<UsuariosProductos> reader,
ItemWriter<UsuariosProductos> writer, ItemProcessor<UsuariosProductos, UsuariosProductos> processor) {
return stepBuilderFactory.get("step1").<UsuariosProductos, UsuariosProductos>chunk(obtenerChunk())
.reader(reader).processor(processor).writer(writer).build();
}
public Integer obtenerChunk() {
Integer chunk = Integer.valueOf(env.getProperty("uy.com.antel.up.chunk.step"));
return chunk;
}
UsuariosProductRowMapper.class
public class UsuariosProductosRowMapper implements RowMapper<UsuariosProductos>{
private static final Logger LOG = LoggerFactory.getLogger(UsuariosProductosRowMapper.class);
private static final String USERNAME = "userName";
private static final String URN = "urn";
private static final String FECHA = "fecha";
private static final String ACTION = "action";
private static final String ACCREDITATION_LEVEL = "accreditation_level";
@Override
public UsuariosProductos mapRow(ResultSet rs, int rowNum) throws SQLException {
UsuariosProductos up = new UsuariosProductos();
up.setUsername(rs.getString(USERNAME));
up.setUrn(rs.getString(URN));
up.setAction(rs.getString(ACTION));
up.setAccreditation_level(rs.getString(ACCREDITATION_LEVEL));
LOG.info("Estoy en UsuariosProductosRow");
LOG.info("resultSet: " + rs.toString());
LOG.info("action: " + rs.getString(ACTION));
LOG.info("accreditation_level: " + rs.getString(ACCREDITATION_LEVEL));
try {
java.sql.Timestamp fecha = rs.getTimestamp(FECHA);
LOG.info("Fecha java sql: " + fecha);
up.setFecha(fecha.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(rs.isLast()) {
up.setProcesados(rs.getRow());
}
return up;
}
}
处理器
public class UsuariosProductosProcessor implements ItemProcessor<UsuariosProductos, UsuariosProductos>{
private static final Logger LOG = LoggerFactory.getLogger(UsuariosProductosProcessor.class);
@Override
public UsuariosProductos process(UsuariosProductos item) throws Exception {
UsuariosProductos up = new UsuariosProductos();
if(item.getUsername() != null)
up.setUsername("userName: " + item.getUsername() + " ");
else
up.setUsername("userName: Not found ");
// no igual, nada. null
up.setUrn("urn: " + item.getUrn() + " ");
up.setFecha("fechaAcreditación: " + item.getFecha() + " ");
up.setAction("action: " + item.getAction() + " ");
up.setAccreditation_level("accreditation_level: " + item.getAccreditation_level() + " ");
return up;
}
}
解决方案
根据 Mahmoud Ben Hassine 的评论,我决定使用几个工作来完成不同的任务,但不会破坏代码中已经存在的结构,因为我正在开发的软件属于客户,我不知道我可以从中修改多少。
我所做的是以下,定义一个变量,该变量接受在 jar 中输入的参数,并试图执行与其关联的作业的过程。在 ALL 的情况下,它将一个接一个地执行每个作业。
if (map.get("tabla_consultar") != null) {
type_job = map.get("tabla_consultar").toString();
if (type_job.equalsIgnoreCase(CommandArgument.ALL.name())) {
map.put("tabla_consultar", new JobParameter(CommandArgument.ACCOUNT.name()));
executeJob(ctx.getBean("usuariosAccountJob", Job.class), jobLauncher, map);
map.put("tabla_consultar", new JobParameter(CommandArgument.CUSTOMER.name()));
executeJob(ctx.getBean("usuariosCustomerJob", Job.class), jobLauncher, map);
map.put("tabla_consultar", new JobParameter(CommandArgument.PRODUCT.name()));
executeJob(ctx.getBean("usuariosProductosJob", Job.class), jobLauncher, map);
}else if (type_job.equalsIgnoreCase(CommandArgument.CUSTOMER.name())) {
executeJob(ctx.getBean("usuariosCustomerJob", Job.class), jobLauncher, map);
}else if (type_job.equalsIgnoreCase(CommandArgument.ACCOUNT.name())) {
executeJob(ctx.getBean("usuariosAccountJob", Job.class), jobLauncher, map);
}else {
executeJob(ctx.getBean("usuariosProductosJob", Job.class), jobLauncher, map);
}
}
在 BatchConfiguration 中,我有一个变量来检查您想要执行的作业类型,并从中完成查询的字段,该字段引用与该作业相关的表和属性。
String table_online_id = "";
String table_reference = "";
String shorcut_table_online = "";
String shorcut_table_reference = "";
String attribute_table_online ="";
if (tabla_consultar.equalsIgnoreCase(CommandArgument.CUSTOMER.name())) {
table_online_id = "online_id_customer";
table_reference = "customer_reference";
shorcut_table_online = "oidc";
attribute_table_online = "customer";
shorcut_table_reference = "cr";
}else if (tabla_consultar.equalsIgnoreCase(CommandArgument.PRODUCT.name())) {
table_online_id = "online_id_product";
table_reference = "product_reference";
shorcut_table_online = "oidp";
attribute_table_online = "product";
shorcut_table_reference = "pr";
}else {
table_online_id = "online_id_account";
table_reference = "account_reference";
shorcut_table_online = "oida";
attribute_table_online = "account";
shorcut_table_reference = "ar";
}
例如:
reader.setSql("select ul.user_name as \"userName\"," + shorcut_table_reference +".urn as \"urn\", ace.wbu_action as \"action\", al.code as \"accreditation_level\", ace.date as \"fecha\"\r\n"
+ " from " + table_online_id +" "+ shorcut_table_online +", online_entity_association oea, accreditation_event ace, " + table_reference + " " + shorcut_table_reference +", user_login ul, acreditation_level al \r\n"
+ " where " + shorcut_table_online+".id=oea.id and \r\n" + " oea.id=ace.id and \r\n"
+ " "+shorcut_table_online+"."+attribute_table_online+"=" + shorcut_table_reference +".id and \r\n" + " ul.online_identity="+shorcut_table_online+".online_identity and \r\n" + " ace.level=al.id and \r\n"
+ " ace.date<'" + date + "'");
为了完成变量“tabla_consultar”被用作文件名的一部分,这样我可以确定如果我要进行的查询来自客户,文件将是 xxxx-Customer-xxxx,如果我从 ACCOUNT 询问会有所不同或 PRODUCT,因此要实现文件不被覆盖,这也可以通过添加 hh:mm:ss 的值来实现。
DateFormat hourdateFormat = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
String outputName = outputDir + "wxxxxxxx-"+ tabla_consultar +"-"+ hourdateFormat.format(date) + ".txt";
File file = new File(outputName);
推荐阅读
- javascript - 如何在范围滑块中有标题并更新显示的值
- xquery - Count 嵌套 FLWOR 循环在 XQuery 中运行的次数
- php - 在一个循环中: fgets 只读取一行,该行一直在以下循环中使用
- python-3.x - KeyError: 0 使用索引获取值时
- javascript - 如何从数据库中提取 C# 值以在 JS 中使用?
- javascript - 如何使用 BeautifulSoup 读取周期性的 innerHTML 生成的元素?
- android - 使用 google 和 firebase 注册并登录,不会重定向到目标页面
- javascript - 从数组中删除不起作用 - Mongodb
- angular - 在redux中使用reducer进行后端调用可以吗?
- design-patterns - 什么是查询速率受限 API 的可扩展方式?