apache-kafka - 卡夫卡流共同分区与交互式查询
问题描述
我有以下拓扑:
topology.addSource(WS_CONNECTION_SOURCE, new StringDeserializer(), new WebSocketConnectionEventDeserializer()
, utilService.getTopicByType(TopicType.CONNECTION_EVENTS_TOPIC))
.addProcessor(SESSION_PROCESSOR, WSUserSessionProcessor::new, WS_CONNECTION_SOURCE)
.addStateStore(sessionStoreBuilder, SESSION_PROCESSOR)
.addSink(WS_STATUS_SINK, utilService.getTopicByType(TopicType.ONLINE_STATUS_TOPIC),
stringSerializer, stringSerializer
, SESSION_PROCESSOR)
//WS session routing
.addSource(WS_NOTIFICATIONS_SOURCE, new StringDeserializer(), new StringDeserializer(),
utilService.getTopicByType(TopicType.NOTIFICATION_TOPIC))
.addProcessor(WS_NOTIFICATIONS_ROUTE_PROCESSOR, SessionRoutingEventGenerator::new,
WS_NOTIFICATIONS_SOURCE)
.addSink(WS_NOTIFICATIONS_DELIVERY_SINK, new NodeTopicNameExtractor(), WS_NOTIFICATIONS_ROUTE_PROCESSOR)
.addStateStore(userConnectedNodesStoreBuilder, WS_NOTIFICATIONS_ROUTE_PROCESSOR, SESSION_PROCESSOR);
如您所见,有 2 个源主题。状态存储是从第一个主题构建的,第二个流程读取状态存储。当我开始拓扑时,我看到这些流线程被分配了两个源主题的相同分区(共同分区)。我认为这是因为状态存储是由第二个主题流访问的。
这在功能上工作正常。但是有一个性能问题。当更新状态存储的第一个源主题的输入数据量激增时,第二个主题的处理会延迟。
对我来说,第二个话题应该尽快处理。处理第一个主题的延迟很好。
我正在考虑以下策略:
Current configuration:
WS_CONNECTION_SOURCE - 30 partitions
WS_NOTIFICATIONS_SOURCE - 30 partitions
streamThreads: 10
appInstances: 3
New configuration:
WS_CONNECTION_SOURCE - 15 partitions
WS_NOTIFICATIONS_SOURCE - 30 partitions
streamThreads: 10
appInstances: 3
Since there is no co-partitioning, tasks has to use interactive query to access store
思路是在10个线程中,5个线程只处理第二个主题,这样可以在第一个主题激增时缓解当前的问题。
以下是我的问题:
1. Is this strategy correct? To avoid co-partitioning and use interactive query
2. Is there a chance that Kafka will assign 10 partitions of WS_CONNECTION_SOURCE
to one instance since there are 10 threads and one instance won't get any?
3. Is there any better approach to solve the performance problem?
解决方案
状态存储和交互式查询是 Kafka Streams 的抽象。要使用交互式查询,您必须定义状态存储(使用 Kafka Streams API)并强制您对输入主题具有相同数量的分区。我认为您的解决方案行不通。交互式查询用于公开在 Kafka 流之外查询状态存储的能力(不适用于处理器 API 中的访问)
也许您可以查看您的SESSION_PROCESSOR源代码并从其他拓扑中提取更多工作到 Process 并将结果发布到中间主题,然后基于该构建该状态存储。
此外:
目前 Kafka Streams 不支持输入主题的优先级。关于 Source topic: KIP-349 的优先级有 KIP 。不幸的是,链接的 Jira 票已关闭,因为不会修复 ( https://issues.apache.org/jira/browse/KAFKA-6690 )
推荐阅读
- eclipse - 在 Jenkins 上运行 gui 测试后获取 java.awt.HeadlessException
- regex - 正则表达式 AND 在由分隔符包围的块内搜索
- java - 如何在安卓中打开gmail
- xero-api - 更新现有联系人
- javascript - 我应该如何设置图像的类型?
- h2 - 为什么 h2 数据库在访问 h2 数据库时出现输入/输出错误?
- ios - 如何防止 ionic 3 应用程序冻结?
- java - 与 ExpandableListViews 有什么关系?
- hadoop - 无法启动配置单元,无法解析 [${clusterHostInfo/webhcat_server_host|append(core-site/hadoop.proxyuser.HTTP.hosts]
- javascript - NodeJs函数无法返回回调响应