首页 > 解决方案 > Flink Table to DataStream:如何访问列名?

问题描述

我想使用 Flink SQL 将 Kafka 主题消费到表中,然后将其转换回 DataStream。

这是SOURCE_DDL

CREATE TABLE kafka_source (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'test-group',
    'format' = 'json'
)

使用 Flink,我执行 DDL。

val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")

然后,我将其转换为DataStream,并在该map(e => ...)部分中进行下游逻辑。

tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...)

map(e => ...)部件内部,我想访问列名,在本例中为last_5_clicks. 为什么?因为我可能有不同的列名(例如last_10min_page_view)的不同来源,但我想重用map(e => ...).

有没有办法做到这一点?谢谢。

标签: apache-flinkflink-streamingflink-sql

解决方案


从 Flink 1.12 开始,可以通过Table.getSchema.getFieldNames. 从 1.13 版本开始,可以通过Row.getFieldNames.


推荐阅读