multithreading - 编写测试以在多线程环境中使用 Kafka 消费者
问题描述
我正在尝试在一个单独的线程中创建一个 kafka 消费者,该线程使用来自 kafka 主题的数据。为此,我扩展ShutdownableThread
了抽象类并提供了doWork
方法的实现。我的代码是这样的 -
abstract class MyConsumer(topic: String) extends ShutdownableThread(topic) {
val props: Properties = ???
private val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List(topic).asJava)
def process(value: String): Unit // Abstract method defining what to do with each record
override def doWork(): Unit = {
for (record <- consumer.poll(Duration.ofMillis(1000)).asScala)
process(record.value())
}
}
现在在我的测试中,我创建了消费者提供process()
方法的实现,它只是改变一个变量,然后调用start()
它的方法来启动线程。
var mutVar = "initial_value"
val consumer = new MyConsumer("test_topic") {
override def process(value: String): Unit = mutVar = "updated_value"
}
consumer.start()
assert(mutVar === "updated_value")
Consumer 确实使用了来自 kafka 的消息,但在测试完成之前它没有更新它,因此测试失败。所以,我试图让主线程进入睡眠状态。但它会抛出ConcurrentModificationException
异常消息 -KafkaConsumer is not safe for multi-threaded access
知道我的方法有什么问题吗?提前致谢。
解决方案
必须让主线程休眠几秒钟,以允许消费者使用来自 kafka 主题的消息并将其存储在可变变量中。添加 -Thread.sleep(5000)
启动消费者后。
推荐阅读
- java - 在某些 pdf 查看器应用程序中无法在 android 上打开 pdf 文档的意图视图
- javascript - React table v7 动态过滤/排序
- bash - 比较同一文件Linux中两个位置之间的2个值
- java - 带有实例变量的 Spring Boot 中的线程安全
- c# - 将组合框的值插入 WPF 中的数据库
- python - 运行 python -m pytest tests 找不到包
- javascript - 从另一个反应应用程序重新渲染反应应用程序
- html - 修复以使 CSS 中的过渡平滑(动画搜索栏)
- python - enumerate(zip(*k_fold(dataset, folds))) 是如何工作的?
- java - 上下文初始化期间遇到异常