java - 无法在 Kafka Connect 2.4 中成功覆盖我的连接器
问题描述
嗨,我正在寻找通过 java 代码使用 2.3 中发布的新覆盖策略。
我想创建一个这样的例子:
创建一个包含 10 条消息的主题
创建一个消费消息的消费者,然后将它们发送到默认的 FileSink
创建一个不应从消费者那里获取数据的覆盖接收器(它被配置为最早)
产生一条消息,即由两个接收器消耗和接收!
以下是我的 SINK(文件)连接器(默认连接器)的配置:
taskOut = new FileStreamSinkTask();
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put(FileStreamSinkConnector.TOPICS_CONFIG, new ConstantSettingsBehavior().SINGLE_TOPIC);
sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, new ConstantSettingsBehavior().FILE_OUT_LATEST);
sinkProperties.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
sinkProperties.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
connectorOut.start(sinkProperties);
taskOut.initialize(createMock(SinkTaskContext.class));
taskOut.start(connectorOut.taskConfigs(1).get(0));
这里是最早的(只有正在发生变化的):
sinkProperties.put(FileStreamSinkConnector.TOPICS_CONFIG, new ConstantSettingsBehavior().SINGLE_TOPIC);
sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, new ConstantSettingsBehavior().FILE_OUT_EARLY);
sinkProperties.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
sinkProperties.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
接下来,我将创建一个消费者,将主题中的消息作为
List< SinkRecord >
我将此列表分配给每个连接器的任务:
myLatestOne.getTaskOut().put(data);
myEarlyOne.getTaskOut().put(data);
但看起来我的方法不对!因为所有消息都由每个连接器获取
这里的代码是我正在使用的代码覆盖代码的拉取请求。
如果我错过了什么,请不要犹豫告诉我!(第一个问题)。
谢谢
解决方案
所以我通过 JAVA 来做这件事。我找到了一种非常容易使用终端的方法:
命令执行
我们首先启动我们的服务器 Zookeeper :
bin/zookeeper-server-start.sh config/zokeeper.properties
接下来我们启动我们的服务器 kafka :
bin/kafka-server-start.sh config/server.properties
我们需要创建一个主题:
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
现在我们需要生成消息:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> [Your message]
现在我们可以启动我们的工人,连接 1 个连接器。您可以在配置文件中拥有它们的属性。
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
connector.client.config.override.policy=All
connector.client.config.override.policy=All
允许通过连接器覆盖客户端。
这是带有选项的连接器earliest
(如果没有保存偏移量,则从第一个条目开始)
name=local-file-earliest-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test.sink.earliest.txt
topics=test
consumer.override.auto.offset.reset=earliest
value.converter=org.apache.kafka.connect.storage.StringConverter
sudo ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink-early.properties
几秒钟后我们停止它(您可以查看tmp/test.sink.earliest.txt
)。
这次我们添加一个新的连接器:
name=local-file-latest-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test.sink.latest.txt
topics=test
consumer.override.auto.offset.reset=latest
value.converter=org.apache.kafka.connect.storage.StringConverter
我们可以同时启动它们:
sudo ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink-early.properties config/connect-file-sink-latest.properties
我们可以添加新消息并检查是否/tmp/test.sink.latest.txt
仅填充这些消息。
解释
这里的主要思想是能够以不同的方式为每个连接器提供默认的可重新配置。为此,我们使用了 add Override Policy
推荐阅读
- java - 无法连接到服务器(Socket)
- permissions - AWS Lex:将 Lambda 执行为 dialogCodeHook 和fulfillmentActivity 挂钩的意图权限
- java - OpenCV 4.0.0 中没有 dll 文件
- javascript - 为什么 CSS3 过渡不适用于使用 JavaScript 设置的元素高度?
- javascript - 如果我单击带有空输入的添加按钮,我的数组列表会继续输出相同的列表项
- angular - RxJS 和 Angular 中的新事件无限滚动
- aurelia - 将焦点集中到 Aurelia 组件的最佳“解耦”方式是什么?
- django - 电子邮件中的内容未呈现为标准 HTML
- airflow - 气流 dags 和 PYTHONPATH
- azure - Web 应用程序还原 - “在还原时忽略冲突的主机名”有什么作用?