首页 > 解决方案 > Producer.send 在 .map 内不起作用

问题描述

我正在制作从 elasticsearch 获取数据并将其发送到 kafka 的应用程序。但是 producer.send() 函数在 map 内部不起作用,但是在它外部,一切正常

val f1 = ElasticsearchSource
  .create(
    indexName = "products",
    typeName = "product",
    query = """{"match_all": {}}"""
  )
  .map { message: OutgoingMessage[spray.json.JsObject] =>
    val product = message.source
    producer.send(new ProducerRecord("test", product))
    println("publishing message ")
    IncomingMessage(Some(message.id), message.source)
  }
  .runWith(Sink.seq)

可能是什么原因造成的?

标签: elasticsearchapache-kafka

解决方案


推荐阅读