首页 > 解决方案 > 卡夫卡流共同分区与交互式查询

问题描述

我有以下拓扑:

topology.addSource(WS_CONNECTION_SOURCE, new StringDeserializer(), new WebSocketConnectionEventDeserializer()
            , utilService.getTopicByType(TopicType.CONNECTION_EVENTS_TOPIC))
            .addProcessor(SESSION_PROCESSOR, WSUserSessionProcessor::new, WS_CONNECTION_SOURCE)
            .addStateStore(sessionStoreBuilder, SESSION_PROCESSOR)
            .addSink(WS_STATUS_SINK, utilService.getTopicByType(TopicType.ONLINE_STATUS_TOPIC),
                    stringSerializer, stringSerializer
                    , SESSION_PROCESSOR)

            //WS session routing
            .addSource(WS_NOTIFICATIONS_SOURCE, new StringDeserializer(), new StringDeserializer(),
                    utilService.getTopicByType(TopicType.NOTIFICATION_TOPIC))
            .addProcessor(WS_NOTIFICATIONS_ROUTE_PROCESSOR, SessionRoutingEventGenerator::new,
                    WS_NOTIFICATIONS_SOURCE)
            .addSink(WS_NOTIFICATIONS_DELIVERY_SINK, new NodeTopicNameExtractor(), WS_NOTIFICATIONS_ROUTE_PROCESSOR)
            .addStateStore(userConnectedNodesStoreBuilder, WS_NOTIFICATIONS_ROUTE_PROCESSOR, SESSION_PROCESSOR);  

如您所见,有 2 个源主题。状态存储是从第一个主题构建的,第二个流程读取状态存储。当我开始拓扑时,我看到这些流线程被分配了两个源主题的相同分区(共同分区)。我认为这是因为状态存储是由第二个主题流访问的。

这在功能上工作正常。但是有一个性能问题。当更新状态存储的第一个源主题的输入数据量激增时,第二个主题的处理会延迟。

对我来说,第二个话题应该尽快处理。处理第一个主题的延迟很好。

我正在考虑以下策略:

Current configuration:
     WS_CONNECTION_SOURCE - 30 partitions
     WS_NOTIFICATIONS_SOURCE - 30 partitions
     streamThreads: 10
     appInstances: 3 

New configuration:
    WS_CONNECTION_SOURCE - 15 partitions
    WS_NOTIFICATIONS_SOURCE - 30 partitions
    streamThreads: 10
    appInstances: 3
    Since there is no co-partitioning, tasks has to use interactive query to access store

思路是在10个线程中,5个线程只处理第二个主题,这样可以在第一个主题激增时缓解当前的问题。

以下是我的问题:

1. Is this strategy correct? To avoid co-partitioning and use interactive query
2. Is there a chance that Kafka will assign 10 partitions of WS_CONNECTION_SOURCE 
   to one instance since there are 10 threads and one instance won't get any?
3. Is there any better approach to solve the performance problem?

标签: apache-kafkaapache-kafka-streamspartitioning

解决方案


状态存储和交互式查询是 Kafka Streams 的抽象。要使用交互式查询,您必须定义状态存储(使用 Kafka Streams API)并强制您对输入主题具有相同数量的分区。我认为您的解决方案行不通。交互式查询用于公开在 Kafka 流之外查询状态存储的能力(不适用于处理器 API 中的访问)

也许您可以查看您的SESSION_PROCESSOR源代码并从其他拓扑中提取更多工作到 Process 并将结果发布到中间主题,然后基于该构建该状态存储。

此外:

目前 Kafka Streams 不支持输入主题的优先级。关于 Source topic: KIP-349 的优先级有 KIP 。不幸的是,链接的 Jira 票已关闭,因为不会修复 ( https://issues.apache.org/jira/browse/KAFKA-6690 )


推荐阅读