apache-flink - 将 JDBCTableSource 与 StreamTableEnvironment 一起使用会产生 ClassCastException
问题描述
我正在尝试使用 JDBC postgresql db 的源连接来实现流应用程序。一开始,我尝试了一个基本查询,但是由于转换异常,我无法执行它。
这是例外
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.time.LocalDate
at org.apache.flink.api.common.typeutils.base.LocalDateSerializer.copy(LocalDateSerializer.java:30)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
这是代码片段
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment);
JDBCOptions jdbcOptions = JDBCOptions.builder()
.setDriverName("org.postgresql.Driver")
.setDBUrl("jdbc:postgresql://192.168.4.69:5432/sessiondb")
.setUsername("postgres")
.setPassword("postgres")
.setTableName("sessions")
.build();
TableSchema tableSchema = TableSchema.builder()
.field("id", DataTypes.STRING())
.field("ip", DataTypes.STRING())
.field("port", DataTypes.INT())
.field("alias", DataTypes.STRING())
.field("connect_time", DataTypes.DATE())
.field("disconnect_time", DataTypes.DATE())
.build();
JDBCTableSource jdbcTableSource = JDBCTableSource.builder().setOptions(jdbcOptions).setSchema(tableSchema).build();
tableEnvironment.registerTableSource("sessions", jdbcTableSource);
Table table = tableEnvironment.sqlQuery("SELECT * FROM sessions");
tableEnvironment.toAppendStream(table, Row.class).print();
streamExecutionEnvironment.execute();
当我更改DataType
为TIMESTAMP_WITH_TIME_ZONE
(这最初是 postgresql 上的真正 DataType)时,我收到了这个错误,它根本不允许我运行应用程序
org.apache.flink.table.api.TableException: Unsupported conversion from data type
'TIMESTAMP(6) WITH TIME ZONE' (conversion class: java.time.OffsetDateTime) to type information.
Only data types that originated from type information fully support a reverse conversion.
任何想法如何处理这个问题?(Apache Flink API 1.9.0)
解决方案
首先,围绕数据类型的异常TIMESTAMP WITH TIME ZONE
是因为这两个规划器目前都不支持这种数据类型。我建议检查兼容性表。
你试过用DataTypes.TIMESTAMP(3)
吗?
推荐阅读
- sql - 在 Excel VBA 中使用 SQL 记录集填充多列列表框
- sql - 具有动态日期的 SQL Pivot
- python - 如何诊断我的 Keras LSTM 网络的问题?
- perl - Perl 抱怨全局符号需要明确的包名,尽管严格和警告已关闭
- android - 发布 apk 出现“未安装应用程序”错误,但调试 apk 工作正常
- r - rowSums error 'x' must be numeric
- php - PHP显示内容基于referrer
- json - 如何使用 bash 脚本屏蔽存储在 json 文件中的密码
- c++ - 在 2d C 数组上使用 std::transform 到 1d C 数组
- connection - 通过 Oracle SQL Developer 连接到 Intersystems Cache DB