首页 > 解决方案 > 从 akka-stream 到 fs2 的旅程 - 如何使用 http4s 在 fs2 中定义类似阶段的 akka-stream http 流

问题描述

我正在加深我对 fs2 的了解,并想尝试 fs2-kafka 来替换 akka 流的用例。这个想法很简单,从 kafka 读取并通过 http 请求将数据发布到接收器,然后在成功时提交回 kafka。到目前为止,我无法真正弄清楚 http 部分。在 akka 流/akka http 中,您有一个开箱即用的流https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#using-a-host-连接池

Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]

它与 akka 流完美集成。

我试图看看我是否可以用 http4s 和 fs2 做类似的事情。

有没有人有任何参考、代码示例、博客以及没有显示如何进行这种集成的东西。到目前为止,我唯一能想到的是,将流包装到客户端资源的使用方法中,即

BlazeClientBuilder[IO](IORuntime.global.compute).resource.use { ..... run stream here ..... }

即使那样我也不确定整个事情

标签: scalahttp4sfs2

解决方案


类型级生态系统的事情是一切都只是一个库,您不需要示例说明它们中有多少交互在一起,您只需要了解每个库的工作原理和组合的基本规则即可。

def createClient(/** whatever arguments you need */): Resource[IO, Client[IO]] = {
  // Fill this based on the documentation of the client of your choice:
  // I would recommend the ember client from http4s:
  // https://http4s.org/v0.23/api/org/http4s/ember/client/emberclientbuilder 
}


def sendHttpRequest(client: Client[IO])(data: Data): IO[Result] = {
  // Fill this based on the documentation of your client:
  // https://http4s.org/v0.23/client/
  // https://http4s.org/v0.23/api/org/http4s/client/client
}

def getStreamOfRecords(/** whatever arguments you need */): Stream[IO, CommittableConsumerRecord[IO, Key, Data]] = {
  // Fill this based on the documentation of fs2-kafka:
  // https://fd4s.github.io/fs2-kafka/docs/consumers
}

def program(/** whatever arguments you need */): Stream[IO, Unit] = {
  // Based on the documentation of fs2 and fs2-kafka I would guess something like this:
  Stream.fromResource(createClient(...)).flatMap { client =>
    getStreamOfRecords(...).evalMapFilter { committable =>
      sendHttpRequest(client)(data = committable.record).map { result =>
        if (result.isSuccess) Some(committable.offset)
        else None
      }
    }.through(commitBatchWithin(...))
  }
}

object Main extends IOApp.Simple {
  override final val run: IO[Unit] =
    program(...).compile.drain
}

请注意,我在头脑中编写了所有这些内容,并且只需快速浏览一下文档,您就需要更改许多内容(尤其是类型,例如Data& Result。以及调整诸如错误处理和何时提交回Kafka之类的东西。
但是,我希望这可以帮助您了解如何构建代码。


推荐阅读