首页 > 解决方案 > 在scala中的while循环中使用队列中的结果填充列表

问题描述

我正在尝试在 scala 中以一种功能性的方式编写一个 while 循环。我想要做的是用队列中的消息填充一个列表(在这种情况下是Kafka,但并不重要)。

我这样做是为了进行集成测试,并且由于在 CI 中运行测试时 Kafka 正在远程运行,因此测试有时会失败,因为 Kafka 不返回任何消息。所以我写了一个循环来查询 Kafka,直到我得到我期望的所有结果(否则测试会在一段时间后超时并失败)。我现在有这个:

var result = List[Int]()
while (result.size < expectedNumberOfMessages) {
    result = result ++ kafkaConsumer.poll(Duration.ofSeconds(10)).records(KAFKA_TOPIC).iterator().toList.map(_.value.getPayload)
}

这很好用,但对我来说看起来很可怕。另外,如果它是生产代码,它也将是低效的。任何人都可以提出一种更好的方法来实现这一点吗?

标签: scalafunctional-programming

解决方案


如果您打算保留while循环,我首先建议您使用 ascala.collection.mutable.ListBuffer而不是 immutable List。这将防止在每次迭代时在内存中复制整个列表。

如果您想要一种更“实用”的方式来编写上述代码,同时保留 Consumer API(而不是 Kafka Streams API),您可以Stream像这样手动定义一个 scala:

import scala.util.Random

// mock Kafka's "poll", returns a random number of Ints (max 10)
def poll(): List[Int] = {
    val size = Random.nextInt(10)
    println("fetching messages")
    Thread.sleep(1000)
    (1 to size).map(_ => Random.nextInt(10)).toList
}

lazy val s: Stream[Int] = Stream.continually(poll()).flatten

// s is now a Stream that will be evaluated when a certain number of messages is requested
// for example, fetching 40 results:

/*
scala> s.take(40).toList
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
res0: List[Int] = List(3, 6, 2, 7, 7, 8, 0, 4, 6, 2, 0, 3, 8, 9, 5, 8, 2, 9, 2, 7, 9, 2, 6, 1, 6, 7, 2, 4, 4, 6, 6, 3, 5, 7, 2, 0, 9, 4, 9, 4)
*/

推荐阅读