java - 使用持久键值存储的 Kafka Stream 应用程序的高消费者滞后
问题描述
使用状态存储的这个 Streams 应用程序并没有像我认为的那样获得尽可能多的吞吐量,而且我的消费者延迟正在上升。我想知道是否有任何我可以调整的明显配置或任何可以帮助优化我的吞吐量的配置。
在我的主要属性中,我有这个:
public static void main(String[] args) {
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "reading-stream");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800");
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 0);
final MyStream stream = new MyStream(config);
stream.start();
Runtime.getRuntime().addShutdownHook(new Thread(stream::close));
}
在 MyStream.start() 中是这段代码:
final Serde readingSerde = Utils.createSerde(Reading.class);
final Serde groupSerde = Utils.createSerde(Group.class);
final Serde checkStateSerde = Utils.createSerde(CheckState.class);
final StreamsBuilder builder = new StreamsBuilder();
final StoreBuilder storeSupplier =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(STORE_STATE),
Serdes.String(),
checkStateSerde
).withLoggingEnabled(new HashMap());
builder.addStateStore(storeSupplier);
final KTable<String, Group> group = builder
.table("group-topic",
Consumed.with(Serdes.String(), groupSerde)
);
final KStream<String, Reading> readings = builder
.stream("reading-topic",
Consumed.with(Serdes.String(), readingSerde)
.withOffsetResetPolicy(Topology.AutoOffsetReset.LATEST));
readings
.join(group,
....
).flatMap
....
}).process(() -> new MyProcessor(), STORE_STATE);
final Topology topology = builder.build();
streams = new KafkaStreams(topology, this.config);
streams.start();
还考虑在 AWS 上使用不同类型的硬盘驱动器(现在使用 gp2)......
还有其他有助于优化此代码的想法吗?
解决方案
推荐阅读
- reactjs - React Material Design:在类组件中使用带有 redux 的 React Material Design 自定义样式
- php - 显示具有关系 laravel 的图像
- ruby-on-rails - 试图在 rails 中禁用整个 f.select
- node.js - 在更新操作上使用聚合管道时,MongoDB“'$' 本身不是有效的 FieldPath”
- javascript - 单击父元素内的按钮的Node.js Puppeteer问题
- visual-studio - Visual Studio 2019 Publish 删除文件夹的写权限
- python - 我的 django 视图总是抛出“意外的关键字参数错误
- excel - 将列从一个工作簿复制到另一个
- excel - 在符合模式的文件夹中打开文件的最新版本
- javascript - 我无法在 React.js 中访问“this.props.match.params.id”