首页 > 解决方案 > 等待发布者完成并将其用于另一个发布者,该发布者将所需数据返回给 webflux

问题描述

我有一个简单的代码,它将一些数据保存在数据库中并返回保存的值

public Flux<Person> savePersons(Flux<Person> persons){
        Mono<Integer> lastPersonId = getLastPersonId();
        AtomicInteger idNum = new AtomicInteger();

        lastPersonId.log().subscribe(idNum::set);
        System.out.println("id num "+idNum.get()); //Obviously zero since the data is not available yet from lastPersonId Mono

        Flux<Person> personsWithId = persons.map(person->{
            person.setId(idNum.getAndIncrement());
            return person;
        });

        StringBuilder builder = new StringBuilder();
        //Ignore the Sql Injection, was left as such for brevity.
        personsWithId.subscribe(person-> builder.append("insert into person VALUES("+person.getId()+",\""+person.getName()+"\""));

        return dbClient.execute(builder.toString()).fetch().rowsUpdated();//This fails as well, the builder has insert statements albeit with null Ids. since Id is not set yet. 
    }

在保存数据之前,我试图检索最后一个 ID 号并将其用于插入语句。我如何确保这些Publisher是链式的。

PS:我了解 SQL 注入的风险。我将这些查询格式发送到外部实用程序以清理输入。为简洁起见,我省略了那段代码。我没有使用批处理,因为 mssql-r2dbc 似乎不支持批处理。

更新:这个问题更像是一个新手问题,因为有人从 Imperatve 编码转换,应该被忽略。在一些评论的帮助下,在 SO 中回答,在写了一些代码之后,我写了一些我能理解的东西。

public Mono<Integer> savePersons(Flux<Person> persons){
    AtomicInteger idNum = new AtomicInteger();
    StringBuilder builder = new StringBuilder();

    return getLastPersonId() //returns a Mono<Integer>
            .map(AtomicInteger::new)
            .flatMapMany(val->assignIds(val, persons))
            .map(person->Sqllib.escapedSql(builder, person))
            .last()
            .map(val->dbClient.execute(builder.toString()))
            .fetch()
            .rowsUpdated();

}

private Flux<Person> assignIds(AtomicInteger val, Flux<Person> persons) {
    return persons.map(person-> {
      person.setId(val.getAndIncrement());
      return person;
    });
}

标签: javaspring-webfluxproject-reactor

解决方案


如果您真的不关心 SQL 注入或让数据库处理 PK 增量,您可以这样做

    String query = "insert into person VALUES (%d,'%s');";
    AtomicInteger idNum = new AtomicInteger();

    return  people
            //for every flux element, join on the result of lastPersonId()
            .join(lastPersonId(),s-> Flux.never(),s-> Flux.never(), Tuples::of)
            //Create your query
            .map(t -> 
                String.format(query,t.getT2() + idNum.incrementAndGet(),
                t.getT1().getName()))
            //Merge it together
            .reduce(String::concat)
            //Create execution spec
            .map(client::execute)
            //Execute spec
            .map(DatabaseClient.GenericExecuteSpec::fetch)
            //Get the number of updated rows
            .flatMap(UpdatedRowsFetchSpec::rowsUpdated);

推荐阅读