apache-flink - Flink 将映射的 Row 解释为单个 RAW
问题描述
我能够将静态行下沉到数据库:
DataStream<Row> staticRows = environment.fromElements("value1", "value2")
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment); // convert to table API
Table inputTable = tableEnv.fromDataStream(staticRows);
tableEnv.executeSql(myDDLAndSinkProperties);
inputTable.executeInsert("MYTABLE");
但是像这样将无界流映射到 Row :
DataStream<Row> kafkaRows = kafkaEvents.map(new MyKafkaRecordToRowMapper());
尝试插入数据库时,如果输入和接收器架构不匹配,则会引发错误。查询架构:[f0: RAW('org.apache.flink.types.Row', '...')]
相同的代码适用于 POJO 和 Tuple,但我有超过 25 列,并且 POJO 没有任何其他用途 - 所以我希望它可以被通用的字段序列(Row 声称是)替换。如何使用 Row 输入数据库?给出的示例仅显示它用于静态数据流和数据库输出。
解决方案
我认为如果您将其更改为这样的内容会更好(当然,在调整列名和类型之后):
DataStream<Row> kafkaRows = kafkaEvents
.map(new MyKafkaRecordToRowMapper())
.returns(Types.ROW_NAMED(
new String[] {"id", "quota", "ts", ...},
Types.STRING,
Types.LONG,
TypeInformation.of(Instant.class)),
...);
推荐阅读
- dialogflow-es - dialogflow webhook 中的电报用户电话号码或 ID
- rest - 如何在没有用户干预的情况下在 Web 应用程序后台获取 Google 访问令牌
- swift - UITableView 不符合协议
- c# - Autofac 和 ASP .Net Web API 版本 4.2.0 .NET Framework 4.7.2
- entity-framework - 使用FromSql时如何使用位域运行sql查询
- ios - 对象实例到 PIL 图像
- autohotkey - 如何将大量数据复制到剪贴板
- c# - 如何使用 JSON 在 Unity 中创建通知列表
- sql - 比较两列并计算结果行
- python - itemgetter 是如何工作的?当我们将它初始化为 key 时会发生什么