python - 从 python STOMP 创建临时队列
问题描述
我想为 python(stomp) 创建临时队列。在文档中,他们提供了回复标题。但它没有创建任何临时队列。用 python 编写的生产者和消费者文件。python STOMP 真的有办法实现临时队列吗?
我正在发送带有回复标题的消息。在订阅者 on_message 侦听器中,我正在检查是否存在回复标头,如果存在,则将响应发送到回复标头值。
队列生产者
import stomp
import json
class QueueProducer:
def __init__(self):
self.hosts = ['somevalue']
self.ports = ['somevalue']
self.ENCODE_FORMAT = "UTF-8"
self.conn_param = zip(self.hosts,self.ports)
self.conn = stomp.Connection11(self.conn_param, encoding=self.ENCODE_FORMAT)
self.conn.start()
self.conn.connect(wait=True)
except Exception as ex:
raise ex
def send_msg(self,message,queue_name,header={"JMSDeliveryMode":"Persistent",
"JMSPriority":4}):
try:
message = json.dumps(message)
self.conn.send(destination='/queue/{}'.format(queue_name), body=message,
headers=header)
except Exception as ex:
raise ex
if __name__ == "__main__":
obj = QueueProducer()
from datetime import datetime
obj.send_msg(str(datetime.now()),"MessageLife",header={"JMSDeliveryMode":"Persistent",
"JMSPriority":4,'reply-to':"destination", })
队列消费者:
import sys
import stomp
import time
import json
ACK_CLIENT_INDIVIDUAL = "client-individual"
class CustomListener(stomp.ConnectionListener):
def __init__(self,conn):
self.conn = conn
def on_error(self, headers, message):
print('received an error::%s' % message)
def on_message(self, headers, message):
try:
message = json.loads(message)
response = [1,2,3]
self.conn.ack(headers.get("message-id"), int(headers.get("subscription")))
if 'reply-to' in headers:
self.conn.send(destination='/queue/{}'.format(headers.get("reply-to")),
body=json.dumps(response))
except Exception as ex:
print("exception nack")
class QueueConsumer:
def __init__(self):
try:
self.hosts = ["some value",]
self.ports = ["some value",]
self.ENCODE_FORMAT = "utf-8"
self.conn_param = zip(self.hosts,self.ports)
self.conn = stomp.Connection11(self.conn_param, encoding=self.ENCODE_FORMAT)
self.conn.start()
self.conn.connect(wait=True,)
self.conn.set_listener('', CustomListener(self.conn))
except Exception as ex:
raise ex
def consume_msg(self):
try:
while True:
self.conn.subscribe('/queue/{}'.format(self.queue_name),1, ack=ACK_CLIENT_INDIVIDUAL,
headers={"activemq.prefetchSize":1}
)
time.sleep(500)
self.conn.unsubscribe(1)
except Exception as ex:
raise ex
def execute(self):
try:
self.queue_name = "sample_queue"
self.consume_msg()
self.close_connection()
except Exception as ex:
raise ex
if __name__ == "__main__":
obj = QueueConsumer()
obj.execute()
我真的很想知道回复标头是否创建一个临时队列,如 java.(createTemporaryQueue 方法),或者我们必须手动创建队列(在发送者或消费者中)。如果有人在 python STOMP.py 中有代码示例,它将非常有用。
解决方案
reply-to
STOMP 中命名的消息头没有什么特别之处。STOMP 协议规范没有提到任何关于名为 的标头,reply-to
也没有提到任何关于出于任何原因创建临时队列的内容。
也就是说,ActiveMQ 实现了对 STOMP 的一些扩展,因此如果您使用destination
开头的标头发送消息或创建消费者,/temp-queue/
则会创建一个临时队列。
推荐阅读
- selenium - How to date in the datpicker using selenium where calender defaults to Year selection first?
- javascript - JavaScript - 视频自动播放
- php - 无法使用 MySQL 查询检查两列中的值
- amazon-web-services - 如何通过 AWS EKS 在 ECR 中使用 Docker 映像
- swift - 如何修复 Swift 枚举 switch case 问题
- javascript - Django:一些字符串会自动添加到我的代码中
- javascript - 如何合并间距数组
- sass - 在不同的视图中在引导程序中获取不同的主题
- regex - 如何使用螺栓编译器验证正则表达式
- python - 按第一个列表对列表进行排序