python-3.x - 尝试从 RabbitMQ 消费时出现线程问题
问题描述
我有消费者代码:
class Consumer(threading.Thread):
def __init__(self,rabbitMQUrl,dgraphUrl):
super(JaqlConsumer, self).__init__()
self.parameters = pika.URLParameters(rabbitMQUrl)
def run(self):
self.connection = pika.BlockingConnection(self.parameters)
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange='publish', exchange_type='topic')
result = self.channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
self.channel.queue_bind(exchange='publish', queue=queue_name, routing_key='#')
self.channel.basic_qos(prefetch_count=LIMIT)
def process(values):
print ("Process:" + str(len(values)))
def on_message_callback(chan, method_frame, _header_frame, body, userdata=None):
data = json.loads(body)
self.values.append(data)
if (len(self.values) >= LIMIT):
process(self.values)
self.values = []
chan.basic_ack(delivery_tag=method_frame.delivery_tag,multiple=True)
self.consumer_tag = self.channel.basic_consume(
queue=queue_name, on_message_callback=on_message_callback)
self.channel.start_consuming()
def close(self):
if hasattr(self, 'channel'):
self.channel.basic_cancel(self.consumer_tag)
if hasattr(self, 'connection'):
if not self.connection.is_closed:
self.connection.close()
现在这是我的主要.py。我正在尝试收听 ZK 节点,当值从 false 变为 true 时,我想从 RabbitMQ 使用,从 True 变为 false 我不想连接到 RabbitMQ:
consumer = Consumer(brokerUrl)
consumer.setDaemon(True)
def toggleEnabled():
# Get the enabled value from ZK and watch the next change
isEnabled = config.get("enabled",enable_watch)
print (isEnabled)
if isEnabled:
consumer = Consumer(brokerUrl,dgraphUrl)
consumer.setDaemon(True)
consumer.run()
else:
consumer.close()
def enable_watch(event):
toggleEnabled()
toggleEnabled()
while True:
time.sleep(1)
主要问题是,在一次切换之后,切换代码不会运行,我认为这是因为当前线程是 RabbitMQ 的消耗(这是我在暂停脚本时看到的)。从主线程切换到另一个线程的正确设计是什么?
解决方案
以下代码应该是您的run()
方法的一部分:
self.consumer_tag = self.channel.basic_consume(
queue=queue_name, on_message_callback=on_message_callback)
self.channel.start_consuming()
将代码粘贴到问题中时是否出错?
我建议将您的代码添加到 GitHub 存储库或 gist。然后,在pika-python
邮件列表中提出您的问题,我将继续在那里提供帮助。Stack Overflow 不是来回协助的好地方。
注意: RabbitMQ 团队会监控rabbitmq-users
邮件列表,有时只会在 StackOverflow 上回答问题。
推荐阅读
- analytics - Weka - loading an arff file, keep getting error @relation expected, but the format is correct
- bash - How to write Vault LDAP Auth Configuration from a json config file?
- python - KeyError and AtrributeError when trying to retrieve a shelved class - Python 3.6
- jetty - 调用类 org.jboss.resteasy.cdi.CdiInjectorFactory 被放置在多个 bean 档案中
- wordpress - 使用 Wordpress 作为 CMS 并通过 REST API 访问内容
- postgresql - PostgreSQL select query on table that is being updated
- ms-access - 使用 2 个字段的 MS Access 完全外部联接?
- c# - 如何在 MVC 5 中使用 DropdownListFor 在多个 DropDownList 中选择值
- javascript - JavaScript 基于浏览器语言重定向用户(Chrome 扩展)
- xpath - 如何在 XPath 中使用多个 OR 运算符