scala - 从 akka-stream 到 fs2 的旅程 - 如何使用 http4s 在 fs2 中定义类似阶段的 akka-stream http 流
问题描述
我正在加深我对 fs2 的了解,并想尝试 fs2-kafka 来替换 akka 流的用例。这个想法很简单,从 kafka 读取并通过 http 请求将数据发布到接收器,然后在成功时提交回 kafka。到目前为止,我无法真正弄清楚 http 部分。在 akka 流/akka http 中,您有一个开箱即用的流https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#using-a-host-连接池
Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]
它与 akka 流完美集成。
我试图看看我是否可以用 http4s 和 fs2 做类似的事情。
有没有人有任何参考、代码示例、博客以及没有显示如何进行这种集成的东西。到目前为止,我唯一能想到的是,将流包装到客户端资源的使用方法中,即
BlazeClientBuilder[IO](IORuntime.global.compute).resource.use { ..... run stream here ..... }
即使那样我也不确定整个事情
解决方案
类型级生态系统的事情是一切都只是一个库,您不需要示例说明它们中有多少交互在一起,您只需要了解每个库的工作原理和组合的基本规则即可。
def createClient(/** whatever arguments you need */): Resource[IO, Client[IO]] = {
// Fill this based on the documentation of the client of your choice:
// I would recommend the ember client from http4s:
// https://http4s.org/v0.23/api/org/http4s/ember/client/emberclientbuilder
}
def sendHttpRequest(client: Client[IO])(data: Data): IO[Result] = {
// Fill this based on the documentation of your client:
// https://http4s.org/v0.23/client/
// https://http4s.org/v0.23/api/org/http4s/client/client
}
def getStreamOfRecords(/** whatever arguments you need */): Stream[IO, CommittableConsumerRecord[IO, Key, Data]] = {
// Fill this based on the documentation of fs2-kafka:
// https://fd4s.github.io/fs2-kafka/docs/consumers
}
def program(/** whatever arguments you need */): Stream[IO, Unit] = {
// Based on the documentation of fs2 and fs2-kafka I would guess something like this:
Stream.fromResource(createClient(...)).flatMap { client =>
getStreamOfRecords(...).evalMapFilter { committable =>
sendHttpRequest(client)(data = committable.record).map { result =>
if (result.isSuccess) Some(committable.offset)
else None
}
}.through(commitBatchWithin(...))
}
}
object Main extends IOApp.Simple {
override final val run: IO[Unit] =
program(...).compile.drain
}
请注意,我在头脑中编写了所有这些内容,并且只需快速浏览一下文档,您就需要更改许多内容(尤其是类型,例如Data
& Result
)。以及调整诸如错误处理和何时提交回Kafka之类的东西。
但是,我希望这可以帮助您了解如何构建代码。
推荐阅读
- rust - 在以 nom 分隔的标签之间捕获字符串
- sql - 对同一属性使用两次选择
- linux - 如何将多行粘贴到 scala stdIn 进程的命令行提示符?
- r - 在R中用姓交换名字和反之亦然
- asp.net - 嵌套内联运算符 <% 给出换行错误
- html - 使用 CSS 给我的网页一个白色的前景,灰色的背景(模仿 MS Word)
- iterm2 - iTerm2 3.3 Python API——如何创建一个 2x2 的会话网格?
- r - 在 R 中(以及安装 ROracle 包时),如何设置 OCI_LIB64?
- php - Python Xampp 错误:标头之前的脚本输出结束
- angular - 在一系列返回后完成一个 observable