scala - Kafka Streams Scala groupBy 类型转换问题
问题描述
我正在尝试在 Scala 中实现一个简单的流处理应用程序,但遇到了类型转换问题。任何指针高度赞赏
这是我试图让Scala编译编译的代码片段
case class ItemValue(LOCATION:String,
U_ID:String,
UOM:String,
R_ID:String,
ITEM_TYPE:String,
B_ID:String,
RECORDED_TIMESTAMP:String,
P_ID:String,
VALUE:String,
RECORDED_DTM:String,
DATA_TYPE:String)
val valueSerde : JSONSerde[ItemValue] = new JSONSerde[ItemValue]
val consumed = Consumed.`with`(Serdes.String(), valueSerde)
val items: KStream[String, ItemValue] = builder.stream("raw_topic", consumed)
.groupBy(
new KeyValueMapper[String, ItemValue, KeyValue[String, ItemValue]] {
override def apply(key: String, value: ItemValue) : KeyValue[String, ItemValue] = {
new KeyValue(value.P_ID+"_"+value.U_ID+"_"+value.R_ID+"_"+value.B_ID+"_"+value.ITEM_TYPE,value)
}
},
Serialized.`with`(
Serdes.String(),
new JSONSerde[ItemValue])
)
这是我无法理解的编译器错误
type mismatch;
found : org.apache.kafka.streams.kstream.KeyValueMapper[String,com.example.streams.ItemValue,org.apache.kafka.streams.KeyValue[String,com.example.streams.ItemValue]]
required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: com.example.streams.ItemValue, Object]
Note: String <: Any, but Java-defined trait KeyValueMapper is invariant in type K.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
Note: com.example.streams.ItemValue <: Any, but Java-defined trait KeyValueMapper is invariant in type V.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
Note: org.apache.kafka.streams.KeyValue[String,com.example.streams.ItemValue] <: Object, but Java-defined trait KeyValueMapper is invariant in type VR.
You may wish to investigate a wildcard type such as `_ <: Object`. (SLS 3.2.10)
new KeyValueMapper[String, ItemValue, KeyValue[String, ItemValue]] {
^
ItemStatsApp.scala:59: type mismatch;
found : org.apache.kafka.streams.kstream.Serialized[String,com.example.streams.ItemValue]
required: org.apache.kafka.streams.kstream.Serialized[Object,com.example.streams.ItemValue]
Note: String <: Object, but Java-defined class Serialized is invariant in type K.
You may wish to investigate a wildcard type such as `_ <: Object`. (SLS 3.2.10)
Serialized.`with`(
^
two errors found
Kafka FAQ(来自 Confluent)和这里的答案说如果没有在 Scala 中明确定义类型,这可能会发生,但我看不出我在哪里没有明确说明。
解决方案
推荐阅读
- python - Python RegEx 从地址中拆分街道和号码
- android - Android Activity Lifecycle 中的 Flutter configureFlutterEngine 方法
- xml - XML标记的XPath取决于文本中的位置?
- spring - 没有 maxlen 选项来限制弹簧数据反应 Redis 模板中的流大小
- windows - 此 Microsoft 文档中的字节/位顺序是什么?
- java - Aspose PDF:是否可以使对象“不可打印”?
- graphics - hlsl 非插值行为
- swift - 在小部件上禁用暗模式?
- sql - 如何分隔字符串并转换为列标题?
- r - 如何创建重复次数在一定范围值内的热图