首页 > 解决方案 > FlinkKafkaConsumer 设置 group.id 消费时无法正常工作

问题描述

我用了一段时间flink1.10.0,发现一个奇怪的问题。

我两次提交相同的工作。

$ flink list -r
28.02.2020 18:04:24 : f9ad14cb86a14c388ed6a146c80988fd : ReadKafkaJob (RUNNING)
28.02.2020 18:07:23 : e05bf26ee986573ffc01af8b1f5d1d59 : ReadKafkaJob (RUNNING)

两个作业具有相同的 group.id,但每个作业都可以读取数据。下面的日志显示相同的事件消耗两次。

2020-02-28 18:08:29,600 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.1,"eventTs":1582884509239,"seqNo":0}
2020-02-28 18:08:29,601 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.1,"eventTs":1582884509239,"seqNo":0}
2020-02-28 18:08:34,442 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.5,"eventTs":1582884514437,"seqNo":1}
2020-02-28 18:08:34,442 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.5,"eventTs":1582884514437,"seqNo":1}
2020-02-28 18:08:39,448 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.2,"eventTs":1582884519443,"seqNo":2}
2020-02-28 18:08:39,448 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.2,"eventTs":1582884519443,"seqNo":2}

我在代码中设置了“group.id”。

String kafkaTopic = params.get("kafka-topic", "flink-test");
String brokers = params.get("brokers", "192.168.0.100:9092");
String groupId = "simple.read.kafka.job";

Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", brokers);
kafkaProps.setProperty("group.id", groupId);

FlinkKafkaConsumer<EventDo> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new EventDeSerializer(), kafkaProps);

那么为什么同一组的两个客户从 kafka 消费两次呢?

Dose FlinkKafkaConsumer 有什么特别的实现吗?

更新:

我做了一些测试,启动了两个控制台消费者和一个 flink 消费者。

如果我使用 kafka-console-consumer 像下面这样消费,client.id=123

kafka-console-consumer --bootstrap-server 192.168.0.100:9092 --topic flink-test --consumer-property group.id=simple.read.kafka.job --consumer-property client.id=123

和另一个消费者 client.id=456

kafka-console-consumer --bootstrap-server 192.168.0.100:9092 --topic flink-test --consumer-property group.id=simple.read.kafka.job --consumer-property client.id=456

然后我在 IDEA 中启动 flink 作业,以使用 group.id="simple.read.kafka.job" 的主题 flink.test

20:38:17,107 INFO  org.apache.kafka.clients.consumer.KafkaConsumer               - [Consumer clientId=cid0931c3, groupId=simple.read.kafka.job] Subscribed to partition(s): flink-test-0

我可以检查连接并找到两个消费者。

➜  bin descKafkaConsumerGroup simple.read.kafka.job --members          
Java HotSpot(TM) Server VM warning: G1 GC is disabled in this release.

GROUP                 CONSUMER-ID                              HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT  
simple.read.kafka.job 123-5925201d-a767-4216-acdc-b46f058db0df /192.168.0.100  123             1               flink-test(0)
simple.read.kafka.job 456-01190de7-5d4e-43c1-9cb6-b599c9c69b41 /192.168.0.101  456             0               -

但是 flink 工作消费者在哪里?

两个控制台消费者行为在一个组中,Flink 作业消费者行为在另一组中。

更新2

我已启用结帐。整个代码如下所示。

import com.stc.sls.stream_process.examples.model.StringDeSerializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;
import java.util.UUID;


@Slf4j
public class SimpleReadKafkaJob {

    final static String clientId = "cid" + StringUtils.remove(UUID.randomUUID().toString(), '-').substring(0, 6);

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);

        String kafkaTopic = params.get("kafka-topic", "flink-test");
        String brokers = params.get("brokers", "192.168.0.100:9092");
        String groupId = "simple.read.kafka.job";

        System.out.printf("Reading kafka topic %s @ %s\n", kafkaTopic, brokers);
        System.out.println();

        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", brokers);
        kafkaProps.setProperty("group.id", groupId);
        kafkaProps.setProperty("client.id", clientId);

        FlinkKafkaConsumer<String> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new StringDeSerializer(), kafkaProps);
        kafka.setStartFromGroupOffsets();
        kafka.setCommitOffsetsOnCheckpoints(true);

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(20000);
        env.setStateBackend((StateBackend) new FsStateBackend("file:///Users/ym/tmp/checkpoint"));
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


        DataStream<String> dataStream = env.addSource(kafka);
        dataStream.map((MapFunction<String, String>) s -> {
            log.info("message={}", s);
            return s;
        }).addSink(new DiscardingSink<>());

        env.execute(SimpleReadKafkaJob.class.getSimpleName());
    }


}

并记录如下

23:00:47,643 INFO  org.apache.kafka.clients.consumer.KafkaConsumer               - [Consumer clientId=cidb30879, groupId=simple.read.kafka.job] Subscribed to partition(s): flink-test-0
23:00:47,673 INFO  org.apache.kafka.clients.Metadata                             - Cluster ID: LsItbMw1T_SHYQvJMt_6Fw
23:00:47,677 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=cidb30879, groupId=simple.read.kafka.job] Discovered group coordinator 192.168.0.100:9092 (id: 2147483647 rack: null)
23:01:02,160 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1583420462134 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:01:02,572 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 437 ms).
23:01:22,136 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1583420482135 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:01:22,147 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 10 ms).
23:01:42,139 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 3 @ 1583420502138 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:01:42,151 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 3 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 12 ms).
23:02:02,138 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 4 @ 1583420522137 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:02:02,147 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 4 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 9 ms).
23:02:22,141 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 5 @ 1583420542139 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:02:22,152 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 5 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 12 ms).
23:02:42,138 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 6 @ 1583420562137 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:02:42,149 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 6 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 11 ms).
23:03:02,140 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 7 @ 1583420582139 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:03:02,148 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 7 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 8 ms).
23:03:18,031 INFO  com.stc.sls.stream_process.examples.SimpleReadKafkaJob        - message={seqNo: 1, eventTs: 1583420589670, id: even偶数, value: 2.76}
23:03:22,141 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 8 @ 1583420602141 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:03:22,152 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 8 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 10 ms).
23:03:25,544 INFO  com.stc.sls.stream_process.examples.SimpleReadKafkaJob        - message={seqNo: 2, eventTs: 1583420598370, id: even偶数, value: 5.18}
23:03:33,181 INFO  com.stc.sls.stream_process.examples.SimpleReadKafkaJob        - message={seqNo: 3, eventTs: 1583420605939, id: odd奇数, value: 0.89}
23:03:40,659 INFO  com.stc.sls.stream_process.examples.SimpleReadKafkaJob        - message={seqNo: 4, eventTs: 1583420613564, id: even偶数, value: 9.29}


但是使用 kafka-consumer-groups 显示没有活跃成员

bin descKafkaConsumerGroup simple.read.kafka.job --members
Java HotSpot(TM) Server VM warning: G1 GC is disabled in this release.

Consumer group 'simple.read.kafka.job' has no active members.

标签: apache-flinkflink-streaming

解决方案


Flink 默认通过 checkpointing 实现偏移量保存机制。这意味着偏移量会在检查点保存到 Kafka。这样,当作业崩溃时,您可以安全地重播尚未完全处理的事件。

因此,当您部署两个作业时,完全有可能并且预期两者都会处理记录,因为偏移量会定期保存。有多种改变这种行为的可能性,例如以适当的间隔启用自动提交。虽然,这可能会对性能产生负面影响或导致重新启动时丢失数据。

您可以参考此处的文档以获取更多信息。


推荐阅读