elasticsearch - Producer.send 在 .map 内不起作用
问题描述
我正在制作从 elasticsearch 获取数据并将其发送到 kafka 的应用程序。但是 producer.send() 函数在 map 内部不起作用,但是在它外部,一切正常
val f1 = ElasticsearchSource
.create(
indexName = "products",
typeName = "product",
query = """{"match_all": {}}"""
)
.map { message: OutgoingMessage[spray.json.JsObject] =>
val product = message.source
producer.send(new ProducerRecord("test", product))
println("publishing message ")
IncomingMessage(Some(message.id), message.source)
}
.runWith(Sink.seq)
可能是什么原因造成的?
解决方案
推荐阅读
- asp.net - 如何将数据从 clickOnce 应用程序传回给调用者
- machine-learning - 有条件抽样
- apache-spark - 获取数组中项目的索引,该数组是 Spark 数据框中的一列
- html - 导航文章和侧边宽度总和为 100%,但侧边换行
- vb.net - WebForms 标签文本未从 OnInit() 更新
- android - 正确使用反射获取Class方法
- amazon-web-services - 当 lambda 死亡时会发生什么?
- typescript - 指定参数必须具有 keyof 属性
- listview - 切换切换时隐藏数据单元格中的行
- java - 只有创建视图层次结构的原始线程才能在 asynctask 中触及其视图错误