首页 > 解决方案 > 在 Flink 中,如何向 StreamTableEnvironment 注册 ClouderaRegistryCatalog?

问题描述

我正在尝试开发我的第一个 Flink 应用程序。

我的环境是:

Confluent Kafka (with Schema registry) cluster (3x nodes) with some_topic1as source and Flink-java 1.12.0 (Maven on IntelliJ)

我正在尝试使用 Flink 实现的目标(作为第一步):

  1. 消费来自 Kafka 主题的数据some_topic1
  2. 写一些(流)group bysql
  3. 将sql结果写入输出

从此Cloudera 链接中找到的参考示例

我已经做了什么

...
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; 
...     
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment
                .create(env, settings);

        env.getConfig().enableObjectReuse();
        SchemaRegistryClient schema_registry_client = new SchemaRegistryClient(
                ImmutableMap.of(
                        SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(),
                        "http://my_schemaregistry_host:8081/"
                )
        );

        Map<String, String> kafka_connector_props = new Kafka()
                .property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                        "my_kafka_host_01:9092,my_kafka_host_02:9092,my_kafka_host_03:9092")
                .startFromEarliest()
                .toProperties();

        tableEnvironment.registerCatalog(
                "registry", new ClouderaRegistryCatalog("registry", schema_registry_client, kafka_connector_props)
        );

        tableEnvironment.useCatalog("registry");

我的主要问题:

这部分代码tableEnvironment.registerCatalog("registry", new ClouderaRegistryCatalog...返回以下错误:

    Required type: Catalog
    Provided: ClouderaRegistryCatalog
    org.apache.flink.formats.avro.registry.cloudera.ClouderaRegistryCatalog public ClouderaRegistryCatalog(String name,
                              com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient registryClient,
                               @NotNull java.util.Map<String, String> connectorProperties)

问题:

如何解决这个错误?

ps:我理解错误的意思(Catalogvs ClouderaRegistryCatalog),但我真的不明白为什么 Cloudera 的 Flink 示例会发生这种类型不匹配。

标签: apache-kafkaapache-flinkclouderaconfluent-schema-registryflink-sql

解决方案


推荐阅读