scala - 从hdfs读取json数据时如何修复flink中的“NoMatchingTableFactoryException”错误
问题描述
我正在尝试从hdfs读取json格式文件使用flink批处理api,我想注册为表源或接收器,但它不起作用;当我使用文件系统连接器和 csv 格式时,它可以读写。
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = BatchTableEnvironment.create(env)
tableEnv.connect(new FileSystem().path("hdfs:///tmp/test.json"))
.withFormat(new Json()
.deriveSchema()
.failOnMissingField(true)
).withSchema(……)
.registerTableSource("test_json")
抛出异常
Exception in thread "main" java.lang.Exception: csv2SqlServer running failed.
at cn.company.main.FlinkBrick$.main(FlinkBrick.scala:43)
at cn.company.test.FlinkTest$.main(FlinkTest.scala:8)
at cn.company.test.FlinkTest.main(FlinkTest.scala)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.BatchTableSourceFactory' in
the classpath.
Reason: No context matches.
The following properties are requested:
connector.path=hdfs://cdhNM:8020/tmp/test.json
connector.property-version=1
connector.type=filesystem
format.derive-schema=true
format.fail-on-missing-field=true
format.property-version=1
format.type=json
schema.0.name=busi_time
schema.0.type=TIMESTAMP
schema.1.name=seq_no
schema.1.type=INT
schema.2.name=stock_code
schema.2.type=VARCHAR
schema.3.name=stock_name
schema.3.type=VARCHAR
schema.4.name=trade_time
schema.4.type=ANY<java.util.Date, xsdgsda……>
schema.5.name=yes_close_price
schema.5.type=INT
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
解决方案
推荐阅读
- python - 如何根据输入参数多次运行 Python click 模块?
- javascript - 语法错误:错误:无法加载要扩展的配置“ckeditor5”。将 CKEditor 导入 Vuejs 时
- javascript - 在 setState 中 React Native 设置默认值
- python - 使用 C# BinaryReader 读取 python 二进制文件
- javascript - node.js 错误:连接 ECONNREFUSED;来自本地主机的响应
- javascript - 我应该存储标签中使用的文本吗?
- reactjs - Reactjs:mobx 异步调用的错误边界
- python - 如何修复python云函数中的错误“'NoneType'对象没有属性'download_to_filename'”?
- php - 下新订单时增加计数,确认订单时减少计数
- python - 如何根据新范围舍入值数组?