parallel-processing - 如何使用 HiveMQ mqtt 客户端启用订阅主题/消息的并行消费/处理
问题描述
我们目前正在从旧版本的 Ecipe Paho MQTT 客户端切换到 1.2 版的 HiveMQ MQTT 客户端。 https://github.com/hivemq/hivemq-mqtt-client
目前正在使用需要消费者函数作为回调的客户端的 Aync 版本。
我们的 MQTT 客户端应用程序之一必须处理/消费许多不同主题的大量消息,并且一条消息的处理不应等待前一条消息完成。我们不确定仅使用一个客户端实例实现消息并行处理的最佳方法是什么。
在上面的文档中有一个可以定义的可选执行器
client.subscribeWith()
.topicFilter("test/topic")
.qos(MqttQos.EXACTLY_ONCE)
.callback(System.out::println)
.executor(executor) // optional
.send();
当没有定义执行器时,AsyncClient 应该如何表现?那么一切都是以阻塞的方式串行处理的吗?不知何故,这似乎违背了用回调定义异步的目的......
在我们的旧实现中,我们使用共享订阅(这是 HiveMQ 3 中的非标准功能,现在是 MQTT 5 的标准功能),客户端的多个实例不断等待相同的主题交替处理它们。
然而,鉴于 HiveMQ 客户端 API(遗憾的是缺少更多解释或示例),我们希望能够提供一种更优雅、更简单的方法来实现与线程池或其他东西的并行处理!
任何帮助表示赞赏!
解决方案
通常只有在将应用程序扩展到多台机器时才需要共享订阅。如果您可以并行处理消息,那么就没有理由在单台机器上使用共享订阅。如果未来消息负载会增加,您仍然可以选择共享订阅,以便以后横向扩展到多台机器。
由于 MQTT 提供排序保证,HiveMQ MQTT 客户端串行调用相同的回调。不同订阅的多个回调并行执行。对于单个回调,只有您的应用程序可以选择分解排序。为此,您只需将回调中的消息移交给并行工作人员即可。
推荐阅读
- java - Java Annotation如何获取特定注解的当前ElemenType
- python - Pandas - 从 Pandas 中的同一字符串中选择几个浮点数来操作它们
- python - 用于动态分组的 Python pandas 小计,如何让总数始终出现在分组的末尾?
- node.js - 使用 Promise.all() 结果执行另一个查询的语法
- .htaccess - 也将非 www 重定向到 www 子页面
- php - php.ini 更新未反映在 phpinfo() 输出中
- apache-kafka - Kafka Streams / 如何获取迭代器正在迭代的分区?
- python - Python 脚本 - 使用插入语句的日期时间戳问题
- windows - 计算批处理运行的脚本/命令的经过时间
- unit-testing - Vue测试工具计算的值不正确