首页 > 解决方案 > 将 JDBCTableSource 与 StreamTableEnvironment 一起使用会产生 ClassCastException

问题描述

我正在尝试使用 JDBC postgresql db 的源连接来实现流应用程序。一开始,我尝试了一个基本查询,但是由于转换异常,我无法执行它。

这是例外

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.time.LocalDate
at org.apache.flink.api.common.typeutils.base.LocalDateSerializer.copy(LocalDateSerializer.java:30)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)

这是代码片段

        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment);

        JDBCOptions jdbcOptions = JDBCOptions.builder()
                .setDriverName("org.postgresql.Driver")
                .setDBUrl("jdbc:postgresql://192.168.4.69:5432/sessiondb")
                .setUsername("postgres")
                .setPassword("postgres")
                .setTableName("sessions")
                .build();

        TableSchema tableSchema = TableSchema.builder()
                .field("id", DataTypes.STRING())
                .field("ip", DataTypes.STRING())
                .field("port", DataTypes.INT())
                .field("alias", DataTypes.STRING())
                .field("connect_time", DataTypes.DATE())
                .field("disconnect_time", DataTypes.DATE())
                .build();

        JDBCTableSource jdbcTableSource = JDBCTableSource.builder().setOptions(jdbcOptions).setSchema(tableSchema).build();
        tableEnvironment.registerTableSource("sessions", jdbcTableSource);
        Table table = tableEnvironment.sqlQuery("SELECT * FROM sessions");
        tableEnvironment.toAppendStream(table, Row.class).print();
        streamExecutionEnvironment.execute();

当我更改DataTypeTIMESTAMP_WITH_TIME_ZONE(这最初是 postgresql 上的真正 DataType)时,我收到了这个错误,它根本不允许我运行应用程序

org.apache.flink.table.api.TableException: Unsupported conversion from data type
'TIMESTAMP(6) WITH TIME ZONE' (conversion class: java.time.OffsetDateTime) to type information. 
Only data types that originated from type information fully support a reverse conversion.

任何想法如何处理这个问题?(Apache Flink API 1.9.0)

标签: apache-flinkflink-streamingflink-sql

解决方案


首先,围绕数据类型的异常TIMESTAMP WITH TIME ZONE是因为这两个规划器目前都不支持这种数据类型。我建议检查兼容性表

你试过用DataTypes.TIMESTAMP(3)吗?


推荐阅读