apache-kafka - Kafka消费者消费数据非常慢,只消费前500条记录
问题描述
我正在尝试集成 MongoDB 和 Storm-Kafka,Kafka Producer 从 MongoDB 生成数据,但无法从消费者端获取所有记录。它仅消耗 100 万条记录中的 500-600 条记录。
日志文件中没有错误,拓扑仍然存在但未处理进一步的记录。
Kafka 版本:0.10.* Storm 版本:1.2.1
我需要在消费者中添加任何配置吗?
conf.put(Config.TOPOLOGY_BACKPRESSURE_ENABLE, false);
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2048);
conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
BrokerHosts hosts = new ZkHosts(zookeeperUrl);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(new StringKeyValueScheme());
spoutConfig.fetchSizeBytes = 25000000;
if (startFromBeginning) {
spoutConfig.startOffsetTime = OffsetRequest.EarliestTime();
} else {
spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
}
return new KafkaSpout(spoutConfig);
}
我希望 Kafka spout 应该从生产者生成的 kafka 主题中读取所有记录。
解决方案
推荐阅读
- python - request.values / request.form 问题在表单烧瓶上获取带有空格字符的值
- node.js - Electron/Node 文件路径位置
- javascript - 当窗口获得焦点时,我可以检测到键是否被按住?
- javascript - 有没有办法根据外部每个循环在嵌套的每个循环内进行手风琴?
- android - 使用 LocationSettingsRequest 请求高精度位置而不更新位置设置
- javascript - 如何将 chrome 从 web 应用程序带到前台
- php - phpMailer - 没有错误,但没有发送电子邮件
- android - Android Q 内部崩溃
- python - 如何在 Python 中保持 rpc 处于打开状态
- java - 如何使用按钮返回到前一个数组值