scala - 如何使用属性值在 Scala 中声明 KafkaListener 的主题
问题描述
我有一个创建生产者的简单 Kafka/Scala 项目。现在我正在尝试创建消费者,但是,当我使用以下代码时......
@Service
class KafkaService @Autowired()(producer: KafkaTemplate[String, Array[Byte]]){
@Value("${spring.kafka.topic}") val topic : String = null
def sendMessage(msg: String): Unit = {
System.out.println(s"Writing the message $msg to the topic ${this.topic}")
producer.send(topic, msg.getBytes());
}
@KafkaListener(id="test", topics="${this.topic}")
def consume(record: ConsumerRecord[String, String]): Unit = {
System.out.println(s"Consumed Strinsg Message : ${record.value()}")
}
}
我收到以下错误...
[ERROR] ...\service\KafkaService.scala:26: error: type mismatch;
[ERROR] found : String("${this.topic}")
[ERROR] required: Array[String]
[ERROR] @KafkaListener(id="test", topics="${this.topic}")
我错过了什么?
我还尝试了以下...
@Configuration
public class CommonConfiguration{
...
@Value("${spring.kafka.topic}")
public String topic;
...
}
@Service
class KafkaService @Autowired()(producer: KafkaTemplate[String, Array[Byte]], config: CommonConfiguration){
def sendMessage(msg: String): Unit = {
val topics : Array[String] = config.getTopics();
println(s"Writing the message $msg ${topics.mkString(" ")}")
producer.send(config.topic, msg.getBytes());
}
@KafkaListener(id="test", topics="#{config.topic.split(',')}")
def consume(record: ConsumerRecord[String, String]): Unit = {
System.out.println(s"Consumed Strinsg Message : ${record.value()}")
}
}
仍然没有运气,但生产者的控制台日志获得了正确的价值。
解决方案
推荐阅读
- jquery - 如果多个文件字段为空,则 JQuery 禁用按钮
- c - 在 C mongoose os 中的特定时间打开 2 个 LED
- javascript - 如何总结从多选下拉框中选择的值?
- javascript - 材料设计中的easyautocomplete
- c# - 将 JsonObject 映射到类 json
- php - 使用 vue-js 和 Laravel 制作表格
- reactjs - ReactJs、Express 和 Axios,向服务器发送请求,然后响应客户端
- javascript - discordjs bot (Nodejs) 如何使用 reactjs 向 DOM 添加数据
- android - 无法在 Flutter 中使用 path_provider?
- scala - 尝试在 IntelliJ 中为 scala 启用源代码时遇到问题