apache-flink - Flink Table to DataStream:如何访问列名?
问题描述
我想使用 Flink SQL 将 Kafka 主题消费到表中,然后将其转换回 DataStream。
这是SOURCE_DDL
:
CREATE TABLE kafka_source (
user_id BIGINT,
datetime TIMESTAMP(3),
last_5_clicks STRING
) WITH (
'connector' = 'kafka',
'topic' = 'aiinfra.fct.userfeature.0',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'test-group',
'format' = 'json'
)
使用 Flink,我执行 DDL。
val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")
然后,我将其转换为DataStream,并在该map(e => ...)
部分中进行下游逻辑。
tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...)
在map(e => ...)
部件内部,我想访问列名,在本例中为last_5_clicks
. 为什么?因为我可能有不同的列名(例如last_10min_page_view
)的不同来源,但我想重用map(e => ...)
.
有没有办法做到这一点?谢谢。
解决方案
从 Flink 1.12 开始,可以通过Table.getSchema.getFieldNames
. 从 1.13 版本开始,可以通过Row.getFieldNames
.
推荐阅读
- historian - Proficy Historian RestAPI - 使用包含“#”的标签检索插值数据
- c - 我应该如何避免与 C 中包含的库发生命名冲突?
- javascript - Ajax 请求返回 '200 OK'
- jquery - 根据按钮的属性值添加/删除 css 类
- python - 如何使帖子标题在我的模板中可用
- transactions - 矿工之间交易的有效性冲突
- hibernate - @Enumerated(EnumType.STRING) 因@AttributeOverride 而丢失
- javascript - JavaScript 中 new Array() 和 Array() 初始化语法的区别
- c# - 有没有办法在瀑布对话框中接受语音输入
- sql-server - 在 SQL Server 上获取电子邮件域名(无 TLD)