首页 > 解决方案 > Flink SQL Client 环境配置读取 CSV 文件作为源流表

问题描述

Match_Recognize我想从 SQL 客户端试用 Flink SQL 中的运算符。为此,我为源表完成了以下设置

# A typical table source definition looks like:
 - name: TaxiRides
   type: source
   update-mode: append
   connector: 
       type: filesystem
       path: "/home/bitnami/Match_Recognize/TaxiRide.csv"
   format: 
       type: csv
       fields:
           - name: rideId
             type: LONG
           - name: taxiId
             type: LONG
           - name: isStart
             type: BOOLEAN
           - name: lon
             type: FLOAT
           - name: lat
             type: FLOAT
           - name: rideTime
             type: TIMESTAMP
           - name: psgCnt
             type: INT
       line-delimiter: "\n"
       field-delimiter: ","

   schema:
    - name: rideId
      type: LONG
    - name: taxiId
      type: LONG
    - name: isStart
      type: BOOLEAN
    - name: lon
      type: FLOAT
    - name: lat
      type: FLOAT
    - name: rideTime
      type: TIMESTAMP
      rowtime:
        timestamps:
          type: "from-field"
          from: "eventTime"
        watermarks:
          type: "periodic-bounded"
          delay: "60000"
    - name: psgCnt
      type: INT

当我开始会话时,我收到以下错误

Exception in thread "main" org.apache.flink.table.client.SqlClientException: The configured environment is invalid. Please check your environment files again.
        at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
        at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562)
        at org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)
        at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
        ... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No factory supports all properties.

所以,我的问题是:是否可以将源流作为表从文件中读取,还是必须来自 Kafka?

更新:我正在使用 Flink 版本 1.9.1

标签: apache-flinkflink-sql

解决方案


不幸的是,您遇到了 csv 文件系统连接器的限制。此连接器不支持行时间属性。

在 1.10 中,我们开始以稍微不同的方式表达水印和时间属性。请参阅参考:https ://issues.apache.org/jira/browse/FLINK-14320 。

您可以尝试使用 WATERMARK 声明从 DDL 创建表,如下所述:https ://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-表它只适用于blink planner(blink planner 是从1.10 版本开始的sql-client 中的默认实现)。

另一个选择是从 Kafka 读取 CSV 格式。

顺便说一句,这个特殊的异常消息在 FLINK 1.10 中得到了改进。从现在开始,Flink 会告诉有问题的属性。


推荐阅读