scala - 如何对 kafka 消费者应用程序进行功能测试?
问题描述
我想测试一个使用 Kafka 消息并将它们写入日志的应用程序。这是它在类似 scala 的伪代码中的近似表示:
import kafka.consumer.Consumer
import kafka.consumer.ConsumerConfig
import org.slf4j.LoggerFactory
import java.util.Properties
import java.util.HashMap
object ConsumerApp extends App {
val topic = new HashMap[String, Integer]()
topic.put("test", 1)
val logger = LoggerFactory.getLogger(getClass().getName())
val messageStream = Consumer
.createJavaConsumerConnector(new ConsumerConfig(new Properties()))
.createMessageStreams(topic)
.get(topic).get(0)
for (message <- messageStream) {
val gotMessage = new String(message.message())
logger.info(gotMessage)
}
}
我想到的测试场景如下:
Kafka 服务器已启动。
应用程序启动并连接到 Kafka 服务器,开始监听特定主题的消息。
一条消息被发送到该主题。
应用程序使用消息并记录它。
这是类似 Scala 的伪代码中的测试草稿:
import uk.org.lidalia.slf4jtest.TestLoggerFactory;
import uk.org.lidalia.slf4jtest.LoggingEvent.info;
abstract class UnitSpec extends FlatSpec with Matchers with EmbeddedKafka {
}
class ConsumerAppSpec extends UnitSpec {
"ConsumerApp" should "consume and log messages from Kafka on specific topic" in {
withRunningKafka {
val consumer = ConsumerApp
// interecept logger to be able to test that the kafka message is logged
val logger = TestLoggerFactory.getTestLogger(consumer.getClass)
// start the application, but beforehand do something to prevent it infinitely blocking
???
consumer.main(Array())
// publish a test message
publishStringMessageToKafka("test", "TEST")
// Confirm that the message has been properly logged
???
}
EmbeddedKafka.stop()
}
}
我的问题是前三个问号处的测试代码。如果我执行 main() 方法,它不会终止,从而阻止执行其余的测试。
解决方案
让它发挥作用...
object ConsumerApp extends App {
def doStuff() {
val topic = ...
// more stuff ...
}
doStuff()
}
class ConsumerAppSpec {
// ...
consumer.doStuff()
// ...
publishMessage()
}
更新
我想,我误解了这个问题。而不是“不要调用main”,我应该说不要阻塞线程:)
class ConsumerAppSpec {
...
val futureResult = Future(consumer.doStuff)
...
publishMessage()
}
推荐阅读
- jquery - 在正确的 span 中呈现由 ajax 请求获取的数据
- python - 为什么这个布尔表达式的计算结果为 False?
- powershell - 使用 Powershell 搜索和删除包含字符串的注册表值
- python - 使用 mariaDB python 更新数据库
- php - 将具有不同值的多个复选框插入相应的数据库列
- java - 使用 ConcurrentHashMap 同时 PUT/GET 的问题
- bash - 如何使用 SLURM 循环遍历脚本?(批量和运行)
- indexing - nomencl 乳胶不会显示输出
- html - 如何使用语义技术在块上划分标题:例如:标题包含:徽标图像,搜索......?
- php - php7.2.11 - pthreads-unable-to-initialize-module-in-windows-10