java - 使用超类型创建 MapFunction 时编译失败
问题描述
我正在使用 Flink 1.12.0,并且我有以下简单的测试用例:
我定义了两个模型类(AbstractDataModel 是超类型,而 ConcreteModel 是子类型):
public interface AbstractDataModel {
public String getValue();
}
public class ConcreteModel implements AbstractDataModel {
private String key;
private String value;
public ConcreteModel() {
}
public ConcreteModel(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
然后,我定义一个简单的应用程序如下,将 ConcreteModel 映射到字符串,
MapFunction 使用超类型 AbstractDataModel,但出现编译错误抱怨:
Required type:
MapFunction<com.ConcreteModel,java.lang.String>
Provided:
MyMapFunction
如果我仍然想使用 AbstractDataModel 作为 MapFunction 中的泛型类型,我会问如何解决这个问题
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
class MyMapFunction implements MapFunction<AbstractDataModel, String> {
public String map(AbstractDataModel model) throws Exception {
return model.getValue();
}
}
public class ConcreteModelTest {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.registerType(ConcreteModel.class);
// env.registerType(AbstractDataModel.class);
//
DataStream<String> ds = env.fromElements(new ConcreteModel("a", "1"), new ConcreteModel("b", "2")).map(new MyMapFunction());
ds.print();
env.execute();
}
}
解决方案
这基本上是因为 FlinkPOJO
由于其分布式环境而无法处理对象。这是文档所说的:
15:45:51,460 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class … 不能用作 POJO 类型,因为并非所有字段都是有效的 POJO 字段,必须作为 GenericType 处理。有关对性能影响的详细信息,请阅读 Flink 文档中的“数据类型和序列化”。
您可以使用ResultTypeQueryableTypeInformation.of(AbstractDataModel.class)
并使用方法定义返回类型public TypeInformation getProducedType()
。
这个接口可以通过函数和输入格式来实现,以告诉框架它们产生的数据类型。此方法可替代以其他方式执行的反射分析,并且在生成的数据类型可能因参数化而变化的情况下很有用。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
public class MyMapFunction implements MapFunction<AbstractDataModel, String>,
ResultTypeQueryable {
@Override
public String map(AbstractDataModel value) throws Exception {
return value.getValue();
}
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(AbstractDataModel.class);
}
}
public class ConcreteModelTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
AbstractDataModel concreteModel01 = new ConcreteModel("a", "1");
AbstractDataModel concreteModel02 = new ConcreteModel("a", "2");
DataStream<String> ds = env
.fromElements(concreteModel01, concreteModel02)
.map(new MyMapFunction());
ds.print();
env.execute();
}
}
或者一个简单的方法是调用map
. TypeInformation.of(String.class)
然后你不需要实现ResultTypeQueryable
at MyMapFunction
。
DataStream<String> ds = env
.fromElements(concreteModel01, concreteModel02)
.map(new MyMapFunction(), TypeInformation.of(String.class));
然后只需将您的接口与它的类实现一起使用。
public interface AbstractDataModel {
public String getValue();
}
public class ConcreteModel implements AbstractDataModel {
private String key;
private String value;
public ConcreteModel() {
}
public ConcreteModel(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
推荐阅读
- google-cloud-platform - 我可以在我的购买状态变为活跃之前创建承诺使用实例吗
- python - 为什么我的测试通过了冲突的断言?
- python - 从 URL 获取协议和域(没有子域)
- excel - Excel webservice 在一个单元格中返回超过 32k 个字符
- python-3.x - 如何实现广度优先和深度优先搜索网络爬虫?
- python - 连接相关字段并在数据框中替换
- emacs - 如何让 emacs 在启动时拥有如同阅读过我的 bashrc 的环境?
- python - 如何更改分配值给传递给函数的数组?
- r - PowerBI 自定义视觉对象中的 R plot_ly 行或数据框
- c# - 有效地多次使用具有不同参数的方法