首页 > 解决方案 > 如何在 ZStream (ZIO) 的帮助下构建批处理请求并理解响应?

问题描述

我有 api 得到这样的请求:

case class UsersRequest(ids: List[Long])

并返回这样的响应:

case class UsersInfoResponse(info: List[Info])
case class Info(userId: Long, info: String)

另外,我有发送此请求并创建用户的方法:

def createUser(id: Long): IO[Throwable, User] = {
 getUserInfo(id)
   .map(info => User(id, info))
}

def getUserInfo(id:Long): IO[Throwable, String] = {
   here i call grpc service
   service.getUserInfo(UsersRequest(List(id)))
}

我想:

  1. 编写创建 ids 批处理的 ZStream
  2. 每 1 秒需要 10 个 id 并创建 UsersRequest
  3. 获取用户信息响应
  4. 使用 id 了解必须获取的信息
  5. 返回信息

所以我可以做到这一点,我应该创建类似的东西:

def getUserInfo(id:Long): IO[Throwable, String] = {
   Stream
      .fromQueue()
      .groupedWithin(10, Duration.Zero)
      .????
      .runDrain
      .forkManaged

   AND

      p <- Promise.make[Throwable, String]
      interrupted <- Promise.make[Nothing, Unit]
      env <- ZIO.environment[R]
}

我不知道我怎么能做到。如何构建批处理并发送请求并在匹配后通过 id 获得结果?

标签: scalaziozio-streams

解决方案


推荐阅读