首页 > 解决方案 > 使用超类型创建 MapFunction 时编译失败

问题描述

我正在使用 Flink 1.12.0,并且我有以下简单的测试用例:

我定义了两个模型类(AbstractDataModel 是超类型,而 ConcreteModel 是子类型):

public interface AbstractDataModel {
    public String getValue();
}

public class ConcreteModel implements AbstractDataModel {
    private String key;
    private String value;

    public ConcreteModel() {
    }

    public ConcreteModel(String key, String value) {
        this.key = key;
        this.value = value;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }
}

然后,我定义一个简单的应用程序如下,将 ConcreteModel 映射到字符串,

MapFunction 使用超类型 AbstractDataModel,但出现编译错误抱怨:

Required type:
MapFunction<com.ConcreteModel,java.lang.String>
Provided:
MyMapFunction

如果我仍然想使用 AbstractDataModel 作为 MapFunction 中的泛型类型,我会问如何解决这个问题

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

class MyMapFunction implements MapFunction<AbstractDataModel, String> {

    public String map(AbstractDataModel model) throws Exception {
        return model.getValue();
    }
}

public class ConcreteModelTest {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //        env.registerType(ConcreteModel.class);
        //        env.registerType(AbstractDataModel.class);
        //
        DataStream<String> ds = env.fromElements(new ConcreteModel("a", "1"), new ConcreteModel("b", "2")).map(new MyMapFunction());
        ds.print();
        env.execute();
    }
}

标签: javaapache-flinkflink-streaming

解决方案


这基本上是因为 FlinkPOJO由于其分布式环境而无法处理对象。这是文档所说的:

15:45:51,460 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class … 不能用作 POJO 类型,因为并非所有字段都是有效的 POJO 字段,必须作为 GenericType 处理。有关对性能影响的详细信息,请阅读 Flink 文档中的“数据类型和序列化”。

您可以使用ResultTypeQueryableTypeInformation.of(AbstractDataModel.class)并使用方法定义返回类型public TypeInformation getProducedType()

这个接口可以通过函数和输入格式来实现,以告诉框架它们产生的数据类型。此方法可替代以其他方式执行的反射分析,并且在生成的数据类型可能因参数化而变化的情况下很有用。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;

public class MyMapFunction implements MapFunction<AbstractDataModel, String>, 
    ResultTypeQueryable {

    @Override
    public String map(AbstractDataModel value) throws Exception {
        return value.getValue();
    }

    @Override
    public TypeInformation getProducedType() {
        return TypeInformation.of(AbstractDataModel.class);
    }
}
public class ConcreteModelTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        AbstractDataModel concreteModel01 = new ConcreteModel("a", "1");
        AbstractDataModel concreteModel02 = new ConcreteModel("a", "2");
        DataStream<String> ds = env
                .fromElements(concreteModel01, concreteModel02)
                .map(new MyMapFunction());
        ds.print();
        env.execute();
    }
}

或者一个简单的方法是调用map. TypeInformation.of(String.class)然后你不需要实现ResultTypeQueryableat MyMapFunction

DataStream<String> ds = env
    .fromElements(concreteModel01, concreteModel02)
    .map(new MyMapFunction(), TypeInformation.of(String.class));

然后只需将您的接口与它的类实现一起使用。

public interface AbstractDataModel {
    public String getValue();
}
public class ConcreteModel implements AbstractDataModel {
    private String key;
    private String value;
    public ConcreteModel() {
    }
    public ConcreteModel(String key, String value) {
        this.key = key;
        this.value = value;
    }
    public String getKey() {
        return key;
    }
    public void setKey(String key) {
        this.key = key;
    }
    public String getValue() {
        return value;
    }
    public void setValue(String value) {
        this.value = value;
    }
}

推荐阅读