首页 > 解决方案 > 不支持给定 DataStream 的类型:GenericType弗林克卡桑德拉

问题描述

我想向 Cassandra 写入一行行。首先,我将 Avro 流转换为行流。编译时没有显示错误。请参阅下面的代码:(KafkaConsumer 和 CassandraSink 在其他工作中分别工作正常)

StreamExecutionEnvironment environment =  StreamExecutionEnvironment.getExecutionEnvironment();

// Initialize KafkaConsumer
FlinkKafkaConsumer010 kafkaConsumer = KafkaConnection.getKafkaConsumer(AvroSchemaClass.class, inTopic, schemaRegistryUrl, properties);

// Set KafkaConsumer as source
DataStream<AvroSchemaClass> avroInputStream = environment.addSource(kafkaConsumer);

// converting avro message to flink's row datatype.
// see https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.html
AvroRowDeserializationSchema avroToRow = new AvroRowDeserializationSchema(AvroSchemaClass.class);
DataStream<Row> rowInputStream = avroInputStream.map(new MapFunction<Orders_value, Row>() {
                @Override
                public Row map(AvroSchemaClass orders_value) throws Exception {
                    return avroToRow.deserialize(orders_value.toByteBuffer().array());
                }
            });

// Example transformation
DataStream<Row> rowOutputStream = rowInputStream.filter(row -> country.equals(row.getField(7).toString()));
       
CassandraSink streamSink = CassandraConnection.getSink(rowOutputStream,
                    cassandraURL,
                    cassandraPort,
                    cassandraCluster,
                    cassandraUser,
                    cassandraPass,
                    insertQuery);
streamSink.name("Write something to Cassandra");

environment.execute();

但是当我在 flink 中运行作业时,会出现以下错误:

java.lang.IllegalArgumentException: No support for the type of the given DataStream: GenericType<org.apache.flink.types.Row>
        at org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(CassandraSink.java:255)
        at servingLayer.CassandraConnection.getSink(CassandraConnection.java:24)
        at speedLayer.KafkaToCassandra.main(KafkaToCassandra.java:84)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
java.lang.NullPointerException

数据流类型的特定更改会是解决方案吗?如果是,如何实施?如果您需要更多信息,请告诉我。

标签: javacassandraapache-flink

解决方案


似乎CassandraSink应该支持Row开箱即用。问题是RowTypeInfoofrowOutputStream不知何故丢失了,它使用了回退GenericType(这是低效的序列化 Kryo)。

AvroRowDeserializationSchema正确返回类型信息,但 DataStream API 不会自动获取。

因此,如果一切都成立,那么修复将是显式设置返回类型rowIn/OutputStream如下

DataStream<Row> rowInputStream = avroInputStream.map(new MapFunction<Orders_value, Row>() {
            @Override
            public Row map(AvroSchemaClass orders_value) throws Exception {
                return avroToRow.deserialize(orders_value.toByteBuffer().array());
            }
        }).returns(avroToRow.getProducedType());
...
DataStream<Row> rowOutputStream = rowInputStream.filter(row -> country.equals(row.getField(7).toString()))
  .returns(avroToRow.getProducedType())

一般来说,如果你坚持使用一个 API,会更容易。在这种情况下,我建议完全使用 Table API。


推荐阅读