首页 > 解决方案 > 在插入行之前,Slick run 方法返回的 Future 成功完成

问题描述

我正在使用 Slick 和 PostgreSQL 进行存储的 Scala 应用程序。我有一种foo从 CSV 文件中读取数据并将大约 10,000 行插入数据库的方法。在行插入操作的 Future 完成后,bar调用另一个方法从数据库中检索这些行并对它们执行一些操作。这就是问题所在:实际上没有从数据库中检索到任何行,因为在 Future 完成时还没有插入任何行。

从我在寻找答案和官方文档时收集到的信息来看,在插入语句成功执行之前,Future 不应该完成。如果我将以下代码添加Thread.sleep(30000)bar,允许首先执行插入语句,则这些方法会提供预期的结果。现在,出于显而易见的原因,我宁愿不这样做,所以我正在寻找替代方案。

下图说明了doStuff调用初始方法后的程序流程:

程序流程图

doStufffoo在返回 Future 之前加载数据并将其存储在数据库中的调用。然后在doStuff这个 Future 中映射并调用barbarbar 从数据库中检索行并处理它们。但是,由于在调用的位置没有插入任何行bar,因此不会处理任何数据。

doStuff方法:

def doStuff(csvFile: File): Future[Unit] = {
  fooService.foo(csvFile)
    .map(_ => {
      csvFile.delete()
      barService.bar()
    })
}

foo方法:

def foo(file: File) Future[Unit] = {
  val reader = CSVReader.open(file)
  fooStorage.truncateFooData().map(_ => {
    val foos = for (line <- reader.iterator if
    line.head != "bad1" &&
      line.head !="bad2")
      yield parseFooData(line)
    fooStorage.saveFooDataBulk(foos.toSeq)
  })
}

我如何使用 Slick 插入行:

override def saveFooDataBulk(fooSeq: Seq[Foo]): Future[Seq[Foo]] =
  db.run(DBIO.seq(fooQuery ++= fooSeq)).map(_ => fooSeq)

我希望bar在所有行都插入数据库后立即被调用,但是,目前来自 Slick 的 Future 完成得太快了。如果它是相关的:doStuff当请求发送到 Akka Http 端点时调用该方法。应用程序和数据库在两个不同的 docker 容器中运行。我究竟做错了什么?

我也无法理解我一年半前选择的用户名现在有多合适。

标签: postgresqlscalaslick-3.0

解决方案


替换map为. flatMap_ def foo否则,它将启动 a Future[Seq[Foo]],然后立即返回 a (),然后被您的doStuff. 像这样的东西:

fooStorage
  .truncateFooData()
  .flatMap(_ => {
    /* stuff... */
    fooStorage.saveFooDataBulk(foo.toSeq)
  })
  .map(_ => ())

我没有测试它,但无论如何,Future在另一个中间开始一些 sFuture.map然后立即返回 an()感觉不太对劲。


推荐阅读