apache-kafka - 在 Flink 中,如何向 StreamTableEnvironment 注册 ClouderaRegistryCatalog?
问题描述
我正在尝试开发我的第一个 Flink 应用程序。
我的环境是:
Confluent Kafka (with Schema registry) cluster (3x nodes) with some_topic1
as source and Flink-java 1.12.0 (Maven on IntelliJ)
我正在尝试使用 Flink 实现的目标(作为第一步):
- 消费来自 Kafka 主题的数据
some_topic1
- 写一些(流)
group by
sql - 将sql结果写入输出
从此Cloudera 链接中找到的参考示例
我已经做了什么:
...
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
...
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment
.create(env, settings);
env.getConfig().enableObjectReuse();
SchemaRegistryClient schema_registry_client = new SchemaRegistryClient(
ImmutableMap.of(
SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(),
"http://my_schemaregistry_host:8081/"
)
);
Map<String, String> kafka_connector_props = new Kafka()
.property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"my_kafka_host_01:9092,my_kafka_host_02:9092,my_kafka_host_03:9092")
.startFromEarliest()
.toProperties();
tableEnvironment.registerCatalog(
"registry", new ClouderaRegistryCatalog("registry", schema_registry_client, kafka_connector_props)
);
tableEnvironment.useCatalog("registry");
我的主要问题:
这部分代码tableEnvironment.registerCatalog("registry", new ClouderaRegistryCatalog...
返回以下错误:
Required type: Catalog
Provided: ClouderaRegistryCatalog
org.apache.flink.formats.avro.registry.cloudera.ClouderaRegistryCatalog public ClouderaRegistryCatalog(String name,
com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient registryClient,
@NotNull java.util.Map<String, String> connectorProperties)
问题:
如何解决这个错误?
ps:我理解错误的意思(Catalog
vs ClouderaRegistryCatalog
),但我真的不明白为什么 Cloudera 的 Flink 示例会发生这种类型不匹配。
解决方案
推荐阅读
- javascript - 使用 PJAX 和 Three.js 的网站速度问题?
- c++ - 如何为不同链接共享库中定义的相同函数生成链接器警告?
- php - 在 Windows 上安装 PHP 7 的作曲家以获取 PHPmailer 时遇到问题
- c++ - 如何从 Windows 应用程序输出到标准输入?
- javascript - 有没有办法在 Bucklescript 类型注释中表示 JS 模板文字?
- xcode - 从 VS2017 和 xcode 7.3.1 构建时,RemoteBuild 提供部署 404
- javascript - snap.data 不是 onUpdate 中的函数
- c - c语言设置I2C读写
- java - 如何将 Java 连接到 MQL5(最好使用 ZeroMQ)
- java - 有没有办法在 Windows 10 下的 SWT-Browser-View 中使用 EcmaScript 6?