首页 > 解决方案 > 从hdfs读取json数据时如何修复flink中的“NoMatchingTableFactoryException”错误

问题描述

我正在尝试从hdfs读取json格式文件使用flink批处理api,我想注册为表源或接收器,但它不起作用;当我使用文件系统连接器和 csv 格式时,它可以读写。

val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = BatchTableEnvironment.create(env)
tableEnv.connect(new FileSystem().path("hdfs:///tmp/test.json"))
            .withFormat(new Json()
              .deriveSchema()
              .failOnMissingField(true)
            ).withSchema(……)
            .registerTableSource("test_json")

抛出异常

Exception in thread "main" java.lang.Exception: csv2SqlServer running failed.
        at cn.company.main.FlinkBrick$.main(FlinkBrick.scala:43)
        at cn.company.test.FlinkTest$.main(FlinkTest.scala:8)
        at cn.company.test.FlinkTest.main(FlinkTest.scala)
    Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.BatchTableSourceFactory' in
    the classpath.


  Reason: No context matches.

  The following properties are requested:
  connector.path=hdfs://cdhNM:8020/tmp/test.json
  connector.property-version=1
  connector.type=filesystem
  format.derive-schema=true
  format.fail-on-missing-field=true
  format.property-version=1
  format.type=json
  schema.0.name=busi_time
  schema.0.type=TIMESTAMP
  schema.1.name=seq_no
  schema.1.type=INT
  schema.2.name=stock_code
  schema.2.type=VARCHAR
  schema.3.name=stock_name
  schema.3.type=VARCHAR
  schema.4.name=trade_time
  schema.4.type=ANY<java.util.Date, xsdgsda……&gt;
  schema.5.name=yes_close_price
  schema.5.type=INT

  The following factories have been considered:
  org.apache.flink.table.sources.CsvBatchTableSourceFactory
  org.apache.flink.table.sources.CsvAppendTableSourceFactory
  org.apache.flink.table.sinks.CsvBatchTableSinkFactory
  org.apache.flink.table.sinks.CsvAppendTableSinkFactory
  org.apache.flink.formats.json.JsonRowFormatFactory
  org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory

标签: scalaapache-flink

解决方案


推荐阅读