scala - 如何将大流分组为子流
问题描述
我想用最多元素组合Stream[F, A]
成 内部流。Stream[Stream[F, A]]
n
这就是我所做的,基本上是通过管道将块放入Queue[F, Queue[F, Chunk[A]]
,然后将队列元素作为结果流产生。
implicit class StreamSyntax[F[_], A](s: Stream[F, A])(
implicit F: Concurrent[F]) {
def groupedPipe(
lastQRef: Ref[F, Queue[F, Option[Chunk[A]]]],
n: Int): Pipe[F, A, Stream[F, A]] = { in =>
val initQs =
Queue.unbounded[F, Option[Queue[F, Option[Chunk[A]]]]].flatMap { qq =>
Queue.bounded[F, Option[Chunk[A]]](1).flatMap { q =>
lastQRef.set(q) *> qq.enqueue1(Some(q)).as(qq -> q)
}
}
Stream.eval(initQs).flatMap {
case (qq, initQ) =>
def newQueue = Queue.bounded[F, Option[Chunk[A]]](1).flatMap { q =>
qq.enqueue1(Some(q)) *> lastQRef.set(q).as(q)
}
val evalStream = {
in.chunks
.evalMapAccumulate((0, initQ)) {
case ((i, q), c) if i + c.size >= n =>
val (l, r) = c.splitAt(n - i)
q.enqueue1(Some(l)) >> q.enqueue1(None) >> q
.enqueue1(None) >> newQueue.flatMap { nq =>
nq.enqueue1(Some(r)).as(((r.size, nq), c))
}
case ((i, q), c) if (i + c.size) < n =>
q.enqueue1(Some(c)).as(((i + c.size, q), c))
}
.attempt ++ Stream.eval {
lastQRef.get.flatMap { last =>
last.enqueue1(None) *> last.enqueue1(None)
} *> qq.enqueue1(None)
}
}
qq.dequeue.unNoneTerminate
.map(
q =>
q.dequeue.unNoneTerminate
.flatMap(Stream.chunk)
.onFinalize(
q.dequeueChunk(Int.MaxValue).unNoneTerminate.compile.drain))
.concurrently(evalStream)
}
}
def grouped(n: Int) = {
Stream.eval {
Queue.unbounded[F, Option[Chunk[A]]].flatMap { empty =>
Ref.of[F, Queue[F, Option[Chunk[A]]]](empty)
}
}.flatMap { ref =>
val p = groupedPipe(ref, n)
s.through(p)
}
}
}
但这很复杂,有没有更简单的方法?
解决方案
fs2 有可以帮助分组的方法chunkN
chunkLimit
stream.chunkN(n).map(Stream.chunk)
stream.chunkLimit(n).map(Stream.chunk)
chunkN
生成大小为 n 的块,直到流结束
chunkLimit
拆分现有块并可以生成大小可变的块。
scala> Stream(1,2,3).repeat.chunkN(2).take(5).toList
res0: List[Chunk[Int]] = List(Chunk(1, 2), Chunk(3, 1), Chunk(2, 3), Chunk(1, 2), Chunk(3, 1))
scala> (Stream(1) ++ Stream(2, 3) ++ Stream(4, 5, 6)).chunkLimit(2).toList
res0: List[Chunk[Int]] = List(Chunk(1), Chunk(2, 3), Chunk(4, 5), Chunk(6))
推荐阅读
- javascript - Favicon 可以在 localhost 上工作,但不能在服务器上工作?
- c - 递增指针数组
- angular - 使用 Angular 7 创建年份选择器
- python - 使用 Python 抓取网页并写入 CSV
- python - 正则表达式完整单词模式
- php - 如何显示 wp 对象数组中的内容?
- javascript - 使用滚动条为 DataTables 动态创建页脚 - 插入两个页脚
- android - 进度条在 Android 中显示错误的值?
- javascript - 巴西货币 去除掩码正则表达式
- python - 将 html 解析为文本并将链接保留在 python 中