java - 等待发布者完成并将其用于另一个发布者,该发布者将所需数据返回给 webflux
问题描述
我有一个简单的代码,它将一些数据保存在数据库中并返回保存的值
public Flux<Person> savePersons(Flux<Person> persons){
Mono<Integer> lastPersonId = getLastPersonId();
AtomicInteger idNum = new AtomicInteger();
lastPersonId.log().subscribe(idNum::set);
System.out.println("id num "+idNum.get()); //Obviously zero since the data is not available yet from lastPersonId Mono
Flux<Person> personsWithId = persons.map(person->{
person.setId(idNum.getAndIncrement());
return person;
});
StringBuilder builder = new StringBuilder();
//Ignore the Sql Injection, was left as such for brevity.
personsWithId.subscribe(person-> builder.append("insert into person VALUES("+person.getId()+",\""+person.getName()+"\""));
return dbClient.execute(builder.toString()).fetch().rowsUpdated();//This fails as well, the builder has insert statements albeit with null Ids. since Id is not set yet.
}
在保存数据之前,我试图检索最后一个 ID 号并将其用于插入语句。我如何确保这些Publisher
是链式的。
PS:我了解 SQL 注入的风险。我将这些查询格式发送到外部实用程序以清理输入。为简洁起见,我省略了那段代码。我没有使用批处理,因为 mssql-r2dbc 似乎不支持批处理。
更新:这个问题更像是一个新手问题,因为有人从 Imperatve 编码转换,应该被忽略。在一些评论的帮助下,在 SO 中回答,在写了一些代码之后,我写了一些我能理解的东西。
public Mono<Integer> savePersons(Flux<Person> persons){
AtomicInteger idNum = new AtomicInteger();
StringBuilder builder = new StringBuilder();
return getLastPersonId() //returns a Mono<Integer>
.map(AtomicInteger::new)
.flatMapMany(val->assignIds(val, persons))
.map(person->Sqllib.escapedSql(builder, person))
.last()
.map(val->dbClient.execute(builder.toString()))
.fetch()
.rowsUpdated();
}
private Flux<Person> assignIds(AtomicInteger val, Flux<Person> persons) {
return persons.map(person-> {
person.setId(val.getAndIncrement());
return person;
});
}
解决方案
如果您真的不关心 SQL 注入或让数据库处理 PK 增量,您可以这样做
String query = "insert into person VALUES (%d,'%s');";
AtomicInteger idNum = new AtomicInteger();
return people
//for every flux element, join on the result of lastPersonId()
.join(lastPersonId(),s-> Flux.never(),s-> Flux.never(), Tuples::of)
//Create your query
.map(t ->
String.format(query,t.getT2() + idNum.incrementAndGet(),
t.getT1().getName()))
//Merge it together
.reduce(String::concat)
//Create execution spec
.map(client::execute)
//Execute spec
.map(DatabaseClient.GenericExecuteSpec::fetch)
//Get the number of updated rows
.flatMap(UpdatedRowsFetchSpec::rowsUpdated);
推荐阅读
- ssl-certificate - 无法连接到 Citrix Receiver(ICA 客户端 13.10.x)中的“0.0.0.2 - 已发布的应用程序名称”
- android - 从服务器检索数据太慢
- node.js - 同一项目中 NodeJS 和前端应用程序的单独 TypeScript 配置
- php - 在同名中使用多个 Where
- jquery - Jquery用自定义ID名称多次附加?有人可以向我发送正确的方向吗
- push-notification - Firebase 通知无法使用 FCM 服务正常工作
- mysql - MySQL 的地理分散/复制 Azure 数据库
- python - 我可以只从数据库中已经存在的日期时间字段数据中提取日期吗?
- sql-server - 当发现重复值时
- javascript - React Function Components with hooks vs Class Components