apache-flink - org.apache.flink.table.sources.CsvBatchTableSourceFactory 不支持的属性键:schema.#.proctime
问题描述
我正在使用 Flink 1.12 来探索该proc time
功能。我有两个测试用例如下。基本上,这两个测试用例做同样的事情,一个是 using connect and schema definition
,另一个是 using DDL
。
第一个(test 1: error one
)失败,第二个test("test 2: correct one")
成功,我想知道为什么第一个失败以及如何使它工作
申请代码:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
import org.apache.flink.types.Row
import org.scalatest.funsuite.AnyFunSuite
class Sql013_ProcessTimeTest2 extends AnyFunSuite {
test("test 1: error one") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val fmt = new Csv().fieldDelimiter(',')
val schema = new Schema()
.field("id", DataTypes.STRING())
.field("price", DataTypes.INT())
.field("pt", DataTypes.TIMESTAMP(3)).proctime() //ERROR here?
val path = "file:///D:/stock_id_price.csv"
tenv.connect(new FileSystem().path(path)).withSchema(schema).withFormat(fmt).createTemporaryTable("sourceTable")
val table = tenv.from("sourceTable")
table.printSchema()
table.toAppendStream[Row].print()
env.execute("Sql013_ProcessTimeTest2")
}
test("test 2: correct one") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val ddl =
"""
create table sourceTable(
id STRING,
price INT,
pt as PROCTIME()
) with (
'connector' = 'filesystem',
'path' = 'D:/stock_id_price.csv',
'format' = 'csv'
)
""".stripMargin(' ')
tenv.executeSql(ddl)
val table = tenv.from("sourceTable")
table.printSchema()
table.toAppendStream[Row].print()
env.execute("Sql013_ProcessTimeTest2")
}
}
查看异常消息中的以下文本(运行时test 1: error one
)告诉原因,但为什么第二个成功。
The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Unsupported property keys:
schema.#.proctime
运行时的完整错误异常test("test 1: error one")
:
root
|-- id: STRING
|-- price: INT
|-- pt: TIMESTAMP(3)
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.typeutils.TypeExtractor).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
findAndCreateTableSource failed.
org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
at org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:339)
at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149)
at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:186)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:250)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.scala:103)
at org.apache.flink.table.api.bridge.scala.TableConversions.toAppendStream(TableConversions.scala:78)
at org.example.sql2.Sql013_ProcessTimeTest2$$anonfun$1.apply(Sql013_ProcessTimeTest2.scala:33)
at org.example.sql2.Sql013_ProcessTimeTest2$$anonfun$1.apply(Sql013_ProcessTimeTest2.scala:13)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1562)
at org.scalatest.funsuite.AnyFunSuiteLike$class.invokeWithFixture$1(AnyFunSuiteLike.scala:186)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$runTest$1.apply(AnyFunSuiteLike.scala:199)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$runTest$1.apply(AnyFunSuiteLike.scala:199)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.funsuite.AnyFunSuiteLike$class.runTest(AnyFunSuiteLike.scala:199)
at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1562)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$runTests$1.apply(AnyFunSuiteLike.scala:232)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$runTests$1.apply(AnyFunSuiteLike.scala:232)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
at org.scalatest.funsuite.AnyFunSuiteLike$class.runTests(AnyFunSuiteLike.scala:232)
at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
at org.scalatest.Suite$class.run(Suite.scala:1112)
at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$run$1.apply(AnyFunSuiteLike.scala:236)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$run$1.apply(AnyFunSuiteLike.scala:236)
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
at org.scalatest.funsuite.AnyFunSuiteLike$class.run(AnyFunSuiteLike.scala:236)
at org.scalatest.funsuite.AnyFunSuite.run(AnyFunSuite.scala:1562)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1320)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1314)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:972)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:971)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
at org.scalatest.tools.Runner$.run(Runner.scala:798)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:40)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: No factory supports all properties.
The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Unsupported property keys:
schema.#.proctime
The following properties are requested:
connector.path=file:///D:/stock_id_price.csv
connector.property-version=1
connector.type=filesystem
format.field-delimiter=,
format.property-version=1
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=id
schema.1.data-type=INT
schema.1.name=price
schema.2.data-type=TIMESTAMP(3)
schema.2.name=pt
schema.2.proctime=true
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory
at org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
... 73 more
Process finished with exit code 0
解决方案
推荐阅读
- javascript - 我想转换 lat 和 lat 以获取网站中地标的名称?
- flutter - 在使用颤振构建的计算器中选择什么输出字段
- javascript - 使用multer将pdf存储在服务器中
- mysql - 尝试连接数据库时 getaddrinfo 失败
- html - 我可以检查 CSS 中的一个属性是否等于另一个属性吗?
- c++ - 找不到 -lfreeglut 代码块
- c# - 多个实例中的相同 IHostedService
- flutter - 无法解析配置“:app:debugRuntimeClasspath”的所有文件。,颤动
- php - cakephp 3 中动态下拉列表的问题
- java - 在连接四中检查获胜者 - Kotlin