首页 > 解决方案 > 如何使用属性值在 Scala 中声明 KafkaListener 的主题

问题描述

我有一个创建生产者的简单 Kafka/Scala 项目。现在我正在尝试创建消费者,但是,当我使用以下代码时......

@Service
class KafkaService @Autowired()(producer: KafkaTemplate[String, Array[Byte]]){

  @Value("${spring.kafka.topic}") val topic : String = null

  def sendMessage(msg: String): Unit = {
    System.out.println(s"Writing the message $msg to the topic ${this.topic}")
    producer.send(topic, msg.getBytes());
  }

  @KafkaListener(id="test", topics="${this.topic}")
  def consume(record: ConsumerRecord[String, String]): Unit = {
    System.out.println(s"Consumed Strinsg Message : ${record.value()}")
  }

}

我收到以下错误...

[ERROR] ...\service\KafkaService.scala:26: error: type mismatch;
[ERROR]  found   : String("${this.topic}")
[ERROR]  required: Array[String]
[ERROR]   @KafkaListener(id="test", topics="${this.topic}")

我错过了什么?

我还尝试了以下...

@Configuration
public class CommonConfiguration{
    ...
    @Value("${spring.kafka.topic}")
    public String topic;
    ...
}
@Service
class KafkaService @Autowired()(producer: KafkaTemplate[String, Array[Byte]], config: CommonConfiguration){

  def sendMessage(msg: String): Unit = {
    val topics : Array[String] = config.getTopics();
    println(s"Writing the message $msg ${topics.mkString(" ")}")
    producer.send(config.topic, msg.getBytes());
  }

  @KafkaListener(id="test", topics="#{config.topic.split(',')}")
  def consume(record: ConsumerRecord[String, String]): Unit = {
    System.out.println(s"Consumed Strinsg Message : ${record.value()}")
  }

}

仍然没有运气,但生产者的控制台日志获得了正确的价值。

标签: scalaapache-kafka

解决方案


@KafkaListener(id="scala", topics=Array("#{'${spring.kafka.topic}'.split(',')}"))

有关更多信息,请参阅此问题此问题


推荐阅读