首页 > 解决方案 > 如何使用 HiveMQ mqtt 客户端启用订阅主题/消息的并行消费/处理

问题描述

我们目前正在从旧版本的 Ecipe Paho MQTT 客户端切换到 1.2 版的 HiveMQ MQTT 客户端。 https://github.com/hivemq/hivemq-mqtt-client

目前正在使用需要消费者函数作为回调的客户端的 Aync 版本。

我们的 MQTT 客户端应用程序之一必须处理/消费许多不同主题的大量消息,并且一条消息的处理不应等待前一条消息完成。我们不确定仅使用一个客户端实例实现消息并行处理的最佳方法是什么。

在上面的文档中有一个可以定义的可选执行器

client.subscribeWith()
    .topicFilter("test/topic")
    .qos(MqttQos.EXACTLY_ONCE)
    .callback(System.out::println)
    .executor(executor) // optional
    .send();

当没有定义执行器时,AsyncClient 应该如何表现?那么一切都是以阻塞的方式串行处理的吗?不知何故,这似乎违背了用回调定义异步的目的......

在我们的旧实现中,我们使用共享订阅(这是 HiveMQ 3 中的非标准功能,现在是 MQTT 5 的标准功能),客户端的多个实例不断等待相同的主题交替处理它们。

然而,鉴于 HiveMQ 客户端 API(遗憾的是缺少更多解释或示例),我们希望能够提供一种更优雅、更简单的方法来实现与线程池或其他东西的并行处理!

任何帮助表示赞赏!

标签: parallel-processingclientmqtthivemq

解决方案


通常只有在将应用程序扩展到多台机器时才需要共享订阅。如果您可以并行处理消息,那么就没有理由在单台机器上使用共享订阅。如果未来消息负载会增加,您仍然可以选择共享订阅,以便以后横向扩展到多台机器。

由于 MQTT 提供排序保证,HiveMQ MQTT 客户端串行调用相同的回调。不同订阅的多个回调并行执行。对于单个回调,只有您的应用程序可以选择分解排序。为此,您只需将回调中的消息移交给并行工作人员即可。


推荐阅读