scala - 在scala中的while循环中使用队列中的结果填充列表
问题描述
我正在尝试在 scala 中以一种功能性的方式编写一个 while 循环。我想要做的是用队列中的消息填充一个列表(在这种情况下是Kafka,但并不重要)。
我这样做是为了进行集成测试,并且由于在 CI 中运行测试时 Kafka 正在远程运行,因此测试有时会失败,因为 Kafka 不返回任何消息。所以我写了一个循环来查询 Kafka,直到我得到我期望的所有结果(否则测试会在一段时间后超时并失败)。我现在有这个:
var result = List[Int]()
while (result.size < expectedNumberOfMessages) {
result = result ++ kafkaConsumer.poll(Duration.ofSeconds(10)).records(KAFKA_TOPIC).iterator().toList.map(_.value.getPayload)
}
这很好用,但对我来说看起来很可怕。另外,如果它是生产代码,它也将是低效的。任何人都可以提出一种更好的方法来实现这一点吗?
解决方案
如果您打算保留while
循环,我首先建议您使用 ascala.collection.mutable.ListBuffer
而不是 immutable List
。这将防止在每次迭代时在内存中复制整个列表。
如果您想要一种更“实用”的方式来编写上述代码,同时保留 Consumer API(而不是 Kafka Streams API),您可以Stream
像这样手动定义一个 scala:
import scala.util.Random
// mock Kafka's "poll", returns a random number of Ints (max 10)
def poll(): List[Int] = {
val size = Random.nextInt(10)
println("fetching messages")
Thread.sleep(1000)
(1 to size).map(_ => Random.nextInt(10)).toList
}
lazy val s: Stream[Int] = Stream.continually(poll()).flatten
// s is now a Stream that will be evaluated when a certain number of messages is requested
// for example, fetching 40 results:
/*
scala> s.take(40).toList
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
res0: List[Int] = List(3, 6, 2, 7, 7, 8, 0, 4, 6, 2, 0, 3, 8, 9, 5, 8, 2, 9, 2, 7, 9, 2, 6, 1, 6, 7, 2, 4, 4, 6, 6, 3, 5, 7, 2, 0, 9, 4, 9, 4)
*/
推荐阅读
- xamarin - Xamarin Forms - ScrollView 如何检查是否滚动到底部
- ios - iOS 13 不显示俄罗斯卢布 (₽) unicode 符号
- android - 我可以对 Koin 模块进行单元测试吗?
- python-3.x - 是否可以忽略 Python3 中模块的所有成员?
- angular - 在使用 httpclient 从 @angular/common/http 以角度进行休息调用时,可以覆盖超过 2 分钟的超时值
- sql-server - How can I replace two characters in a single word with wildcard if the two characters are included in wild card definition in MSQL
- django - Django 多个用户组
- gpflow - GPflow,bvh:ValueError:平均值必须是一维的
- java - 在 Linux Mint 19x 中更改 JAVA_HOME 值
- angular - Angular6中的单元测试输入事件