apache-flink - Flink 嵌套类 toDataStream 转换报错
问题描述
我正在使用 flink 1.13。我正在尝试通过以下方式将表结果转换为数据流,但不断出错。
public class HybridTrial {
public static class Address {
public String street;
public String houseNumber;
public Address() {}
public Address(String street, String houseNumber) {
this.street = street;
this.houseNumber = houseNumber;
}
}
public static class User {
public String name;
public Integer score;
public LocalDateTime event_time;
public Address address;
// default constructor for DataStream API
public User() {}
// fully assigning constructor for Table API
public User(String name, Integer score, LocalDateTime event_time, Address address) {
this.name = name;
this.score = score;
this.event_time = event_time;
this.address = address;
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<User> dataStream =
env.fromElements(
new User("Alice", 4, LocalDateTime.now(), new Address()),
new User("Bob", 6, LocalDateTime.now(), new Address("NBC", "204")),
new User("Alice", 10, LocalDateTime.now(), new Address("ABC", "1033")))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<User>forBoundedOutOfOrderness(Duration.ofSeconds(60)));
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table =
tableEnv.fromDataStream(
dataStream, Schema.newBuilder().build());
table.printSchema();
Table t = table.select($("*"));
DataStream<User> dsRow = tableEnv.toDataStream(t,User.class);
dsRow.print();
env.execute();
}
}
我得到的错误是:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Column types of query result and sink for registered table 'default_catalog.default_database.Unregistered_DataStream_Sink_1' do not match.
Cause: Incompatible types for sink column 'event_time' at position 2.
Query schema: [name: STRING, score: INT, event_time: RAW('java.time.LocalDateTime', '...'), address: *flinkSqlExperiments.HybridTrial$Address<`street` STRING, `houseNumber` STRING>*]
Sink schema: [name: STRING, score: INT, event_time: TIMESTAMP(9), address: *flinkSqlExperiments.HybridTrial$Address<`street` STRING, `houseNumber` STRING>*]
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.createSchemaMismatchException(DynamicSinkUtils.java:437)
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:256)
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertSinkToRel(DynamicSinkUtils.java:198)
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertExternalToRel(DynamicSinkUtils.java:143)
我也尝试过从 DataStream 到 table 的自定义转换,但是在从 table 转换到 DataStream 时仍然遇到错误。我被卡住了,所以任何帮助表示赞赏。
解决方案
DataStream 中基于反射的自动类型提取不如 Table API 强大。这也是由于 DataStream API 中的状态向后兼容性问题。
该event_time
字段是GenericType
DataStream API 中的一个,其结果RAW
是 Table API。您有以下可能性:
- 给一个适当
TypeInformation
的fromElements
- 覆盖
TypeInformation
使用DataType
_fromDataStream
推荐阅读
- javascript - 如何在 Firebase 数据库中获取对象的子对象?
- python - 为什么我收到 IndexError:列表索引超出范围?
- c++ - SFML:矢量 2
无法编译 - javascript - 如何在 Javascript 集合对象的特定索引处添加元素?
- javascript - Javascript-JQuery When
- javascript - ViewChild 通过 ID 显示隐藏表单
- shell - 如何增加文件中文本变量的值
- c++ - 为什么 `explicit` 关键字不能阻止 `char` 转换为 `int`?
- ruby-on-rails - Gemfile 缺少 ruby on rails 克隆项目
- python - 使用 Docker 在 Raspberry pi 中安装 Noetic 时出现的问题