首页 > 解决方案 > 将 Doobie 流从数据库保存到文件

问题描述

Doobie select 返回一个fs2.Stream(doobie.ConnectionIO, String). 如果我们需要将其写入文件,显而易见的选择是调用stream.compile.toList.transact(transactor)然后将此列表保存到文件中。

有没有办法以流方式保存结果而不将其转换为列表?

标签: scalascala-catsfs2cats-effectdoobie

解决方案


诀窍是将cats.IO操作转换为doobie.ConnectionIOwith Async[doobie.ConnectionIO].liftIO(IO(...))。这允许将文件操作与数据库操作很好地结合起来。这是一个将结果流式传输到文件的完整示例程序。

package com.example

import java.io.BufferedWriter

import better.files.File
import cats.effect._
import cats.implicits._
import doobie._
import doobie.implicits._
import fs2.Stream


object Example extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val xa = Transactor.fromDriverManager[IO](
      "org.postgresql.Driver",     // driver classname
      "jdbc:postgresql:example_db",     // connect URL (driver-specific)
      "postgres",                  // user
      ""                          // password
    )

    val drop = sql"drop table if exists example".update.run
    val create =
      sql"create table if not exists example (id serial primary key, string_value text not null)".update.run
    val insert = Update[String]("insert into example (string_value) values (?)")
      .updateMany(List("one", "two", "three", "four", "five"))

    val setup = for {
      _ <- drop.transact(xa)
      _ <- create.transact(xa)
      _ <- insert.transact(xa)
    } yield ()

    val select: Stream[doobie.ConnectionIO, String] =
      sql"select string_value from example".query[String].stream
    val output = writeToFile(select).compile.drain.transact(xa)

    for {
      _ <- setup
      _ <- output
    } yield ExitCode.Success
  }

  private def writeToFile(result: Stream[doobie.ConnectionIO, String]): Stream[doobie.ConnectionIO, Unit] = {
    Stream.resource(writer("./example.txt")).flatMap { writer =>
      result.intersperse("\n").chunks.evalMap { chunk =>
        Async[doobie.ConnectionIO].liftIO(IO(
          chunk.foreach(writer.write)
        ))
      }
    }
  }

  private def writer(path: String): Resource[doobie.ConnectionIO, BufferedWriter] = {
    Resource.make {
      Async[doobie.ConnectionIO].liftIO(IO(
        File(path).newBufferedWriter
      ))
    } { outStream =>
      Async[doobie.ConnectionIO].liftIO(IO(
        outStream.close())
      )
    }
  }
}

推荐阅读