首页 > 解决方案 > 使用持久键值存储的 Kafka Stream 应用程序的高消费者滞后

问题描述

使用状态存储的这个 Streams 应用程序并没有像我认为的那样获得尽可能多的吞吐量,而且我的消费者延迟正在上升。我想知道是否有任何我可以调整的明显配置或任何可以帮助优化我的吞吐量的配置。

在我的主要属性中,我有这个:

public static void main(String[] args) {
    final Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "reading-stream");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800");
    config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 0);

    final MyStream stream = new MyStream(config);
    stream.start();

    Runtime.getRuntime().addShutdownHook(new Thread(stream::close));
}

在 MyStream.start() 中是这段代码:

final Serde readingSerde = Utils.createSerde(Reading.class);
final Serde groupSerde = Utils.createSerde(Group.class);
final Serde checkStateSerde = Utils.createSerde(CheckState.class);
final StreamsBuilder builder = new StreamsBuilder();

final StoreBuilder storeSupplier =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore(STORE_STATE),
        Serdes.String(),
        checkStateSerde
    ).withLoggingEnabled(new HashMap());
builder.addStateStore(storeSupplier);

final KTable<String, Group> group = builder
    .table("group-topic",
        Consumed.with(Serdes.String(), groupSerde)
    );

final KStream<String, Reading> readings = builder
    .stream("reading-topic",
        Consumed.with(Serdes.String(), readingSerde)
            .withOffsetResetPolicy(Topology.AutoOffsetReset.LATEST));

readings
    .join(group,
        ....
    ).flatMap
        ....
        }).process(() -> new MyProcessor(), STORE_STATE);

final Topology topology = builder.build();
streams = new KafkaStreams(topology, this.config);
streams.start();

还考虑在 AWS 上使用不同类型的硬盘驱动器(现在使用 gp2)......

还有其他有助于优化此代码的想法吗?

标签: javaperformanceoptimizationapache-kafkaapache-kafka-streams

解决方案


推荐阅读