首页 > 解决方案 > apache flink 与 Kafka:InvalidTypesException

问题描述

我有以下代码:

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName());

FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
                    "test-kafka-topic",
                    new SimpleStringSchema(),
                    properties);

final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyCustomClass> kafkaInputStream = streamEnv.addSource(kafkaConsumer);

DataStream<String> stringStream = kafkaInputStream
                    .map(new MapFunction<MyCustomClass,String>() {
                        @Override
                        public String map(MyCustomClass message) {
                            logger.info("--- Received message : " + message.toString());
                            return message.toString();
                        }
                    });

streamEnv.execute("Published messages");

MyCustomClassDeserializer 用于将字节数组转换为 java 对象。

当我在本地运行此程序时,出现错误:

原因:org.apache.flink.api.common.functions.InvalidTypesException:输入不匹配:预期基本类型。

我得到这个代码行:

.map(new MapFunction<MyCustomClass,String>() {

不知道为什么我会得到这个?

标签: apache-kafkaapache-flink

解决方案


因此,您有一个返回 POJO 的反序列化器,但您告诉 Flink 它byte[]应该String使用SimpleStringSchema. 现在看到问题了吗?:)

我认为您通常不应该使用自定义的 Kafka 反序列化器FlinkKafkaConsumer。相反,您应该瞄准的是创建一个DeserializationSchema从 Flink 扩展的自定义类。它在类型安全性和可测试性方面应该要好得多。


推荐阅读