首页 > 解决方案 > Spring Webflux - R2dbc:如何在迭代结果集时运行子查询和更新值

问题描述

我是 Reactive 存储库和 webflux 的新手。我正在从 DB 中获取数据列表,使用它来迭代它map()以构建 DTO 类对象,在此过程中,我需要运行另一个查询来获取计数值并更新相同的 DTO 对象。当我尝试如下时,计数设置为 null

@Repository
public class CandidateGroupCustomRepo {
  public Flux<CandidateGroupListDTO> getList(BigInteger userId){
        final String sql = "SELECT gp.CANDIDATE_GROUP_ID,gp.NAME  ,gp.GROUP_TYPE   \n" +
                "                             ,gp.CREATED_DATE  ,cd.DESCRIPTION STATUS ,COUNT(con.CANDIDATE_GROUP_ID)\n" +
                "                             FROM  ........" +
                "                             WHERE gp.CREATED_BY_USER_ID = :userId  GROUP BY gp.CANDIDATE_GROUP_ID,gp.NAME  ,gp.GROUP_TYPE   \n" +
                "                             ,gp.CREATED_DATE  ,cd.DESCRIPTION";
        return dbClient.execute(sql)
                .bind("userId", userId)
                .map(row ->{
                            CandidateGroupListDTO info = new CandidateGroupListDTO();
                            info.setGroupId(row.get(0, BigInteger.class));
                            info.setGroupName(row.get(1, String.class)) ;
                            info.setGroupType(row.get(2, String.class));
                            info.setCreatedDate( row.get(3, LocalDateTime.class));
                            info.setStatus(row.get(4, String.class));

                            if(info.getGroupType().equalsIgnoreCase("static")){
                                info.setContactsCount(row.get(5, BigInteger.class));
                            }else{
                getGroupContactCount(info.getGroupId()).subscribe(count ->{
                    System.out.println(">>>>>"+count);
                    info.setContactsCount(count);
                
                        });
                            }
                            return info;
                            }
                        )
                .all() ;
    }
    
    Mono<BigInteger> getGroupContactCount(BigInteger groupId){
            final String sql = "SELECT 3 WHERE :groupId IS NOT NULL;";
            return dbClient.execute(sql)
                    .bind("groupId", groupId)
                    .map(row -> {
                        System.out.println(row.get(0, BigInteger.class));
                        return row.get(0, BigInteger.class);
                    }  ).one();
    }
    
}

当我打电话getGroupContactCount时,我试图从中提取计数Mono<BigInteger>并将其设置在我的 DTO.... sys out 正确打印计数值,但我仍然得到 null 作为响应。

标签: springspring-webfluxspring-data-r2dbcr2dbc

解决方案


subscribe在中间跟注,这实际上是阻塞的。一个订阅者通常是最终消费者,我猜你的 spring 应用程序不是,最终消费者很可能是发起调用的网页。您的服务器是生产者。

调用数据库,flatMap然后返回。

return dbClient.execute(sql)
    .bind("userId", userId)
    .flatMap(row ->{
        CandidateGroupListDTO info = new CandidateGroupListDTO();
        info.setGroupId(row.get(0, BigInteger.class));
        info.setGroupName(row.get(1, String.class)) ;
        info.setGroupType(row.get(2, String.class));
        info.setCreatedDate( row.get(3, LocalDateTime.class));
        info.setStatus(row.get(4, String.class));

        if(info.getGroupType().equalsIgnoreCase("static")){
            return Mono.just(info.setContactsCount(row.get(5, BigInteger.class)));
        } else {
            return getGroupContactCount(info.getGroupId()).flatMap(count -> {
                info.setContactsCount(count);
                return Mono.just(info)
            });
        }
    }).all();

如果顺序很重要,请使用map,否则尝试使用flatMap来执行异步工作。


推荐阅读