java - Scala - 如何过滤 KStream (Kafka Streams)
问题描述
我是 Scala 的新手,我正在尝试根据第二个组件字段过滤 KStream[String, JsonNode] 。
例如,工作 Java 代码是这样的:
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
...
import com.fasterxml.jackson.databind.JsonNode;
...
...
final KStream<String, JsonNode> source = streamsBuilder.stream(inputTopic,
Consumed.with(Serdes.String(), jsonSerde));
// filter and producer preprocessed
source.filter((k, v) -> v.get("total_cost").asDouble() > 0 && v.get("num_items").asInt() > 0)
.to(outputTopic, Produced.with(Serdes.String(), jsonSerde));
我试过这个:
import org.apache.kafka.streams.kstream.{Produced,Consumed,KStream};
import org.apache.kafka.streams.StreamsBuilder;
...
import com.fasterxml.jackson.databind.JsonNode;
...
var source:KStream[String, JsonNode] = streamsBuilder.stream(inputTopic, Consumed.`with`(Serdes.String(), jsonSerde));
source.filter({
case (k:String ,v:JsonNode) =>
(v.get("total_cost").asDouble() > 0 && v.get("num_items").asInt() > 0)
})
.to(outputTopic, Produced.`with`(Serdes.String(), jsonSerde));
在上面的尝试中,我得到了:
描述 资源路径 位置 类型 扩展函数的参数类型缺失 匿名函数的参数类型必须是完全已知的。(SLS 8.5) 预期类型为:org.apache.kafka.streams.kstream.Predicate[? >:字符串,?>:com.fasterxml.jackson.databind.JsonNode]
我也试过这个:
source.filter((_._2.get("total_cost").asDouble() > 0 && _._2.get("num_items").asInt() > 0))
.to(outputTopic, Produced.`with`(Serdes.String(), jsonSerde));
我如何在 Scala 中过滤这个对象?提前致谢。
解决方案
org.apache.kafka.streams.kstream.Predicate
来自 JavaAPI,在 Scala 中2.11
(我假设你使用它)你必须显式地实现接口,因此:
source.filter(new Predicate[String, JsonNode]() {
override def test(k: String, v: JsonNode): Boolean = {
v.get("total_cost").asDouble() > 0 && v.get("num_items").asInt() > 0
}
}).to(outputTopic, Produced.`with`(Serdes.String(), jsonSerde));
应该管用。
更多关于SAM
s(单一抽象方法)的信息可以在这里找到
请注意,您不必使用 Java API - 有一流的Scala API。
推荐阅读
- docker - 挂载docker卷销毁文件?
- php - 如何创建正则表达式模式以删除特定标记后的换行符
- elasticsearch - 如何知道 Geopoint 是否在使用 elasticsearch 7 的多边形内?
- javascript - 基于 getBoundingClientRect() 问题的带有样式注入的 React.cloneElement
- qlikview - QlikView aggr 列表框中排除了多个值
- python - 使用 win32com.client 列出可用的消息属性
- python - 如何在 ggplot 中正确构建数据框以绘制不同的图表
- c# - 如何使用像素坐标和光线投射将虚拟物体放置在真实物体上?
- java - 是否可以使用 java 12 创建 spring boot maven 项目?
- java - 无法从 MySQL 数据库中检索数据并将其放在我的 android 应用程序的列表视图中