scala - 异步处理 akka 流并写入文件接收器
问题描述
我正在尝试编写一段代码,该代码将使用一系列代码(公司的证券交易所代码)并从每个代码的 REST API 中获取公司信息。
我想异步获取多家公司的信息。
我想以连续的方式将结果保存到文件中,因为整个数据集可能不适合内存。
根据我可以在此主题上搜索到的 akka 流和资源的文档,我提出了以下代码(为简洁起见,省略了某些部分):
implicit val actorSystem: ActorSystem = ActorSystem("stock-fetcher-system")
implicit val materializer: ActorMaterializer = ActorMaterializer(None, Some("StockFetcher"))(actorSystem)
implicit val context = system.dispatcher
import CompanyJsonMarshaller._
val parallelism = 10
val connectionPool = Http().cachedHostConnectionPoolHttps[String](s"api.iextrading.com")
val listOfSymbols = symbols.toList
val outputPath = "out.txt"
Source(listOfSymbols)
.mapAsync(parallelism) {
stockSymbol => Future(HttpRequest(uri = s"https://api.iextrading.com/1.0/stock/${stockSymbol.symbol}/company"), stockSymbol.symbol)
}
.via(connectionPool)
.map {
case (Success(response), _) => Unmarshal(response.entity).to[Company]
case (Failure(ex), symbol) => println(s"Unable to fetch char data for $symbol") "x"
}
.runWith(FileIO.toPath(new File(outputPath).toPath, Set(StandardOpenOption.APPEND)))
.onComplete { _ =>
bufferedSource.close
actorSystem.terminate()
}
这是有问题的行:
runWith(FileIO.toPath(new File(outputPath).toPath, Set(StandardOpenOption.APPEND)))
它不能编译,编译器给了我这个看起来很神秘的错误:
Type mismatch, expected Graph[SinkShape[Any, NotInferedMat2], actual Sink[ByeString, Future[IOResult]]
如果我将接收器更改为 Sink.ignore 或 println(_) 它可以工作。
我会很感激一些更详细的解释。
解决方案
正如编译器所指示的那样,类型不匹配。在调用.map
...
.map {
case (Success(response), _) =>
Unmarshal(response.entity).to[Company]
case (Failure(ex), symbol) =>
println(s"Unable to fetch char data for $symbol")
"x"
}
...您正在返回一个Company
实例或 a String
,因此编译器将最接近的超类型(或“最小上限”)推断为Any
. 需要类型的Sink
输入元素ByteString
,而不是Any
。
一种方法是在不解组响应的情况下将响应发送到文件接收器:
Source(listOfSymbols)
.mapAsync(parallelism) {
...
}
.via(connectionPool)
.map(_.entity.dataBytes) // entity.dataBytes is a Source[ByteString, _]
.flatMapConcat(identity)
.runWith(FileIO.toPath(...))
推荐阅读
- android - 将算法从 Java 转换为 NDK
- firebase - 函数中的 Firebase 消息传递错误
- android - Cordova Plugin 自定义应用程序类
- c# - 禁止对 Visual Studio 中的所有项目发出警告
- jenkins - 增加 Jenkins 注销计时器不起作用
- html - 解析 HTML 表格单元 ID
- javascript - 如何使用 Node js 和 Multer 将保存在文件系统上的文件返回到 Angular 前端?
- javascript - 如何在同一行记录控制台或清除节点中的控制台?
- python - 如何在订阅频道之前获取发布到 Redis 的消息
- python - 如何将命令行参数传递给 url 字符串