apache-flink - Flink SQL Client 环境配置读取 CSV 文件作为源流表
问题描述
Match_Recognize
我想从 SQL 客户端试用 Flink SQL 中的运算符。为此,我为源表完成了以下设置
# A typical table source definition looks like:
- name: TaxiRides
type: source
update-mode: append
connector:
type: filesystem
path: "/home/bitnami/Match_Recognize/TaxiRide.csv"
format:
type: csv
fields:
- name: rideId
type: LONG
- name: taxiId
type: LONG
- name: isStart
type: BOOLEAN
- name: lon
type: FLOAT
- name: lat
type: FLOAT
- name: rideTime
type: TIMESTAMP
- name: psgCnt
type: INT
line-delimiter: "\n"
field-delimiter: ","
schema:
- name: rideId
type: LONG
- name: taxiId
type: LONG
- name: isStart
type: BOOLEAN
- name: lon
type: FLOAT
- name: lat
type: FLOAT
- name: rideTime
type: TIMESTAMP
rowtime:
timestamps:
type: "from-field"
from: "eventTime"
watermarks:
type: "periodic-bounded"
delay: "60000"
- name: psgCnt
type: INT
当我开始会话时,我收到以下错误
Exception in thread "main" org.apache.flink.table.client.SqlClientException: The configured environment is invalid. Please check your environment files again.
at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562)
at org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)
at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: No factory supports all properties.
所以,我的问题是:是否可以将源流作为表从文件中读取,还是必须来自 Kafka?
更新:我正在使用 Flink 版本 1.9.1
解决方案
不幸的是,您遇到了 csv 文件系统连接器的限制。此连接器不支持行时间属性。
在 1.10 中,我们开始以稍微不同的方式表达水印和时间属性。请参阅参考:https ://issues.apache.org/jira/browse/FLINK-14320 。
您可以尝试使用 WATERMARK 声明从 DDL 创建表,如下所述:https ://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-表它只适用于blink planner(blink planner 是从1.10 版本开始的sql-client 中的默认实现)。
另一个选择是从 Kafka 读取 CSV 格式。
顺便说一句,这个特殊的异常消息在 FLINK 1.10 中得到了改进。从现在开始,Flink 会告诉有问题的属性。
推荐阅读
- ffmpeg - 使用 FFMPEG 将帧添加到 gif
- regex - 在 vscode 中查找双破折号
- python - 将 pandas 或 keras 中的样本数据自动上调或下调至统一大小的简单方法
- arm - Cortex-A57 是否有 CPU 周期计数功能
- asynchronous - 为什么我的异步请求比同步请求慢?
- django - 在 HTML 模板中输出音频流,django
- python - Serverless GCP Function 部署
- realm - sum(ofProperty: "amount") 在 swift 5 中给出错误
- javascript - 更改一种状态后重新加载整个页面
- itfoxtec-identity-saml2 - 在 NemLog-in 中为私人公司创建 IT 系统