首页 > 解决方案 > 有没有办法使用火花结构化流读取 ActiveMQ 中先前排队的消息?

问题描述

我有一个生产者向 ActiveMQ 的主题和具有以下属性的客户端发送消息:

val optionsMap: Map[String, String] =
      Map[String, String]("brokerUrl" -> brokerUrl,
                          "topic" -> topicName, 
                          "persistence" -> "memory", 
                          "username" -> username, 
                          "password" -> password, 
                          "clientId" -> "something")

现在,当我的客户端应用程序未运行时,我发送一条消息给ActiveMQ topic数量Enqueued Messages增加一并且数量Dequeued Messages保持不变。但是一旦我启动我的客户端,数量就Dequeued Messages等于Enqueued Messages我的客户端应用程序不产生任何输出的数量。如何解决这个问题?我希望我的客户端应用程序输出所有以前排队的消息。

标签: apache-sparkactivemqspark-structured-streaming

解决方案


除非您要发送到具有已注册持久订阅的主题,否则代理将立即丢弃所有发送的消息,因为这是定义主题的目的。只有活跃的 Topic 订阅者或持久订阅者才会收到发送到 Topic 的消息。


推荐阅读