首页 > 解决方案 > Flink 将映射的 Row 解释为单个 RAW

问题描述

我能够将静态行下沉到数据库:

DataStream<Row> staticRows = environment.fromElements("value1", "value2")
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment); // convert to table API
Table inputTable = tableEnv.fromDataStream(staticRows);
tableEnv.executeSql(myDDLAndSinkProperties);
inputTable.executeInsert("MYTABLE");

但是像这样将无界流映射到 Row :

DataStream<Row> kafkaRows = kafkaEvents.map(new MyKafkaRecordToRowMapper());

尝试插入数据库时​​,如果输入和接收器架构不匹配,则会引发错误。查询架构:[f0: RAW('org.apache.flink.types.Row', '...')]

相同的代码适用于 POJO 和 Tuple,但我有超过 25 列,并且 POJO 没有任何其他用途 - 所以我希望它可以被通用的字段序列(Row 声称是)替换。如何使用 Row 输入数据库?给出的示例仅显示它用于静态数据流和数据库输出。

标签: apache-flinkflink-streaming

解决方案


我认为如果您将其更改为这样的内容会更好(当然,在调整列名和类型之后):

DataStream<Row> kafkaRows = kafkaEvents
  .map(new MyKafkaRecordToRowMapper())
  .returns(Types.ROW_NAMED(
                new String[] {"id", "quota", "ts", ...},
                Types.STRING,
                Types.LONG,
                TypeInformation.of(Instant.class)),
                ...);

推荐阅读