java - 不支持给定 DataStream 的类型:GenericType弗林克卡桑德拉
问题描述
我想向 Cassandra 写入一行行。首先,我将 Avro 流转换为行流。编译时没有显示错误。请参阅下面的代码:(KafkaConsumer 和 CassandraSink 在其他工作中分别工作正常)
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize KafkaConsumer
FlinkKafkaConsumer010 kafkaConsumer = KafkaConnection.getKafkaConsumer(AvroSchemaClass.class, inTopic, schemaRegistryUrl, properties);
// Set KafkaConsumer as source
DataStream<AvroSchemaClass> avroInputStream = environment.addSource(kafkaConsumer);
// converting avro message to flink's row datatype.
// see https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.html
AvroRowDeserializationSchema avroToRow = new AvroRowDeserializationSchema(AvroSchemaClass.class);
DataStream<Row> rowInputStream = avroInputStream.map(new MapFunction<Orders_value, Row>() {
@Override
public Row map(AvroSchemaClass orders_value) throws Exception {
return avroToRow.deserialize(orders_value.toByteBuffer().array());
}
});
// Example transformation
DataStream<Row> rowOutputStream = rowInputStream.filter(row -> country.equals(row.getField(7).toString()));
CassandraSink streamSink = CassandraConnection.getSink(rowOutputStream,
cassandraURL,
cassandraPort,
cassandraCluster,
cassandraUser,
cassandraPass,
insertQuery);
streamSink.name("Write something to Cassandra");
environment.execute();
但是当我在 flink 中运行作业时,会出现以下错误:
java.lang.IllegalArgumentException: No support for the type of the given DataStream: GenericType<org.apache.flink.types.Row>
at org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(CassandraSink.java:255)
at servingLayer.CassandraConnection.getSink(CassandraConnection.java:24)
at speedLayer.KafkaToCassandra.main(KafkaToCassandra.java:84)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
java.lang.NullPointerException
数据流类型的特定更改会是解决方案吗?如果是,如何实施?如果您需要更多信息,请告诉我。
解决方案
似乎CassandraSink
应该支持Row
开箱即用。问题是RowTypeInfo
ofrowOutputStream
不知何故丢失了,它使用了回退GenericType
(这是低效的序列化 Kryo)。
AvroRowDeserializationSchema
正确返回类型信息,但 DataStream API 不会自动获取。
因此,如果一切都成立,那么修复将是显式设置返回类型rowIn/OutputStream
如下
DataStream<Row> rowInputStream = avroInputStream.map(new MapFunction<Orders_value, Row>() {
@Override
public Row map(AvroSchemaClass orders_value) throws Exception {
return avroToRow.deserialize(orders_value.toByteBuffer().array());
}
}).returns(avroToRow.getProducedType());
...
DataStream<Row> rowOutputStream = rowInputStream.filter(row -> country.equals(row.getField(7).toString()))
.returns(avroToRow.getProducedType())
一般来说,如果你坚持使用一个 API,会更容易。在这种情况下,我建议完全使用 Table API。
推荐阅读
- firebase - 'toLowerCase' 方法在 null 上被调用。?
- flask-wtforms - Flask WTForms 如何返回 Posted FieldList 表单行数据
- javascript - 如何从返回函数中获取值并将其发送到 html 元素
- html - 循环通过时html div没有关闭
- javascript - 在 Vuejs 中调用数据内部的方法
- ms-access - MS ACCESS - 如何在两列的组合框中搜索值
- go - 如何在 Go 中的 $PATH 中获取可执行文件的完整路径
- r - 函数内部的操作顺序与没有的不同吗?
- python - 从字典追加到列表
- machine-learning - BERT 多类情感分析准确率低?