首页 > 解决方案 > 异步处理 akka 流并写入文件接收器

问题描述

我正在尝试编写一段代码,该代码将使用一系列代码(公司的证券交易所代码)并从每个代码的 REST API 中获取公司信息。

我想异步获取多家公司的信息。

我想以连续的方式将结果保存到文件中,因为整个数据集可能不适合内存。

根据我可以在此主题上搜索到的 akka 流和资源的文档,我提出了以下代码(为简洁起见,省略了某些部分):

  implicit val actorSystem: ActorSystem = ActorSystem("stock-fetcher-system")
  implicit val materializer: ActorMaterializer = ActorMaterializer(None, Some("StockFetcher"))(actorSystem)
  implicit val context = system.dispatcher

  import CompanyJsonMarshaller._
  val parallelism = 10
  val connectionPool = Http().cachedHostConnectionPoolHttps[String](s"api.iextrading.com")
  val listOfSymbols = symbols.toList

  val outputPath = "out.txt"  


  Source(listOfSymbols)
    .mapAsync(parallelism) {
      stockSymbol => Future(HttpRequest(uri = s"https://api.iextrading.com/1.0/stock/${stockSymbol.symbol}/company"), stockSymbol.symbol)
    }
    .via(connectionPool)
    .map {
      case (Success(response), _) => Unmarshal(response.entity).to[Company]
      case (Failure(ex), symbol)       => println(s"Unable to fetch char data for $symbol") "x"
    }
    .runWith(FileIO.toPath(new File(outputPath).toPath, Set(StandardOpenOption.APPEND)))
    .onComplete { _ =>
      bufferedSource.close
      actorSystem.terminate()
    }

这是有问题的行:

runWith(FileIO.toPath(new File(outputPath).toPath, Set(StandardOpenOption.APPEND)))

它不能编译,编译器给了我这个看起来很神秘的错误:

Type mismatch, expected Graph[SinkShape[Any, NotInferedMat2], actual Sink[ByeString, Future[IOResult]]

如果我将接收器更改为 Sink.ignore 或 println(_) 它可以工作。

我会很感激一些更详细的解释。

标签: scalaakkaakka-stream

解决方案


正如编译器所指示的那样,类型不匹配。在调用.map...

.map {
  case (Success(response), _) =>
    Unmarshal(response.entity).to[Company]
  case (Failure(ex), symbol)  =>
    println(s"Unable to fetch char data for $symbol")
    "x"
}

...您正在返回一个Company实例或 a String,因此编译器将最接近的超类型(或“最小上限”)推断为Any. 需要类型的Sink输入元素ByteString,而不是Any

一种方法是在不解组响应的情况下将响应发送到文件接收器:

Source(listOfSymbols)
  .mapAsync(parallelism) {
    ...
  }
  .via(connectionPool)
  .map(_.entity.dataBytes) // entity.dataBytes is a Source[ByteString, _]
  .flatMapConcat(identity)
  .runWith(FileIO.toPath(...))

推荐阅读