apache-kafka - apache flink 与 Kafka:InvalidTypesException
问题描述
我有以下代码:
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName());
FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
"test-kafka-topic",
new SimpleStringSchema(),
properties);
final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyCustomClass> kafkaInputStream = streamEnv.addSource(kafkaConsumer);
DataStream<String> stringStream = kafkaInputStream
.map(new MapFunction<MyCustomClass,String>() {
@Override
public String map(MyCustomClass message) {
logger.info("--- Received message : " + message.toString());
return message.toString();
}
});
streamEnv.execute("Published messages");
MyCustomClassDeserializer 用于将字节数组转换为 java 对象。
当我在本地运行此程序时,出现错误:
原因:org.apache.flink.api.common.functions.InvalidTypesException:输入不匹配:预期基本类型。
我得到这个代码行:
.map(new MapFunction<MyCustomClass,String>() {
不知道为什么我会得到这个?
解决方案
因此,您有一个返回 POJO 的反序列化器,但您告诉 Flink 它byte[]
应该String
使用SimpleStringSchema
. 现在看到问题了吗?:)
我认为您通常不应该使用自定义的 Kafka 反序列化器FlinkKafkaConsumer
。相反,您应该瞄准的是创建一个DeserializationSchema
从 Flink 扩展的自定义类。它在类型安全性和可测试性方面应该要好得多。
推荐阅读
- ios - 如何从 xcode 仪器时间分析器获取挂钟时间?
- ruby - Ruby:如何遍历数据库记录以返回真正的键/值对
- javascript - 提升如何应用于此 JavaScript 代码?
- javascript - 为什么 Chrome 没有收到我的 cookie?
- r - 如何从与现有数据集不匹配的变量创建新数据表
- react-native - 当 TextInput 获得焦点时,按钮不接收 onPress 事件
- javascript - Array.toString() 返回数组而不是字符串 - 为什么?Javascript
- python - QClipboard.mimedata() 总是有黑色背景
- javascript - 检测浏览器推送通知已启用并显示警报
- swift - 如何阻止 MOYA/Alamfire 转义我的身体 json 参数?