首页 > 解决方案 > 无法在 Kafka Connect 2.4 中成功覆盖我的连接器

问题描述

嗨,我正在寻找通过 java 代码使用 2.3 中发布的新覆盖策略。

我想创建一个这样的例子:

        taskOut = new FileStreamSinkTask();
        Map<String, String> sinkProperties = new HashMap<>();
        sinkProperties.put(FileStreamSinkConnector.TOPICS_CONFIG, new ConstantSettingsBehavior().SINGLE_TOPIC);
        sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, new ConstantSettingsBehavior().FILE_OUT_LATEST);
        sinkProperties.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
        sinkProperties.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        connectorOut.start(sinkProperties);
        taskOut.initialize(createMock(SinkTaskContext.class));
        taskOut.start(connectorOut.taskConfigs(1).get(0));

这里是最早的(只有正在发生变化的):

     sinkProperties.put(FileStreamSinkConnector.TOPICS_CONFIG, new ConstantSettingsBehavior().SINGLE_TOPIC);
        sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, new ConstantSettingsBehavior().FILE_OUT_EARLY);
        sinkProperties.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
        sinkProperties.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

接下来,我将创建一个消费者,将主题中的消息作为
List< SinkRecord >

我将此列表分配给每个连接器的任务:

        myLatestOne.getTaskOut().put(data);
        myEarlyOne.getTaskOut().put(data);

但看起来我的方法不对!因为所有消息都由每个连接器获取

这里的代码是我正在使用的代码覆盖代码的拉取请求。

如果我错过了什么,请不要犹豫告诉我!(第一个问题)。

谢谢

标签: javaapache-kafkaapache-kafka-connect

解决方案


所以我通过 JAVA 来做这件事。我找到了一种非常容易使用终端的方法:

命令执行

我们首先启动我们的服务器 Zookeeper :

 bin/zookeeper-server-start.sh config/zokeeper.properties

接下来我们启动我们的服务器 kafka :

bin/kafka-server-start.sh config/server.properties

我们需要创建一个主题:

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

现在我们需要生成消息:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> [Your message]

现在我们可以启动我们的工人,连接 1 个连接器。您可以在配置文件中拥有它们的属性。

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
connector.client.config.override.policy=All 

connector.client.config.override.policy=All允许通过连接器覆盖客户端。

这是带有选项的连接器earliest(如果没有保存偏移量,则从第一个条目开始)

name=local-file-earliest-sink 
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test.sink.earliest.txt
topics=test
consumer.override.auto.offset.reset=earliest
value.converter=org.apache.kafka.connect.storage.StringConverter
sudo ./bin/connect-standalone.sh config/connect-standalone.properties  config/connect-file-sink-early.properties

几秒钟后我们停止它(您可以查看tmp/test.sink.earliest.txt)。

这次我们添加一个新的连接器:

name=local-file-latest-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test.sink.latest.txt
topics=test
consumer.override.auto.offset.reset=latest
value.converter=org.apache.kafka.connect.storage.StringConverter

我们可以同时启动它们:

sudo ./bin/connect-standalone.sh config/connect-standalone.properties  config/connect-file-sink-early.properties config/connect-file-sink-latest.properties 

我们可以添加新消息并检查是否/tmp/test.sink.latest.txt仅填充这些消息。

解释

这里的主要思想是能够以不同的方式为每个连接器提供默认的可重新配置。为此,我们使用了 add Override Policy


推荐阅读