python - 如何在 python 中使用 pika (RabbitMQ) 向消费者添加多处理
问题描述
我在 python 中使用 pika 框架编写了非常基本的生产者-消费者代码。问题是 - 消费者端在队列中的消息上运行太慢。我进行了一些测试,发现我可以通过多处理将工作流程加快 27 倍。问题是 - 我不知道向我的代码添加多处理功能的正确方法是什么。
import pika
import json
from datetime import datetime
from functions import download_xmls
def callback(ch, method, properties, body):
print('Got something')
body = json.loads(body)
type = body[-1]['Type']
print('Object type in work currently ' + type)
cnums = [x['cadnum'] for x in body[:-1]]
print('Got {} cnums to work with'.format(len(cnums)))
date_start = datetime.now()
download_xmls(type,cnums)
date_end = datetime.now()
ch.basic_ack(delivery_tag=method.delivery_tag)
print('Download complete in {} seconds'.format((date_end-date_start).total_seconds()))
def consume(queue_name = 'bot-test'):
parameters = pika.URLParameters('server@address')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='bot-test')
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
如何从这里开始添加多处理功能?
解决方案
推荐阅读
- html - 静态角度选择选项
- r - 为数据寻找合适的转换
- qz-tray - QZ 托盘打印顺序未按顺序排列
- jmeter - 我有一个动态值作为响应,但它不断改变位置。我怎样才能捕捉到它
- hadoop - 如何在结构类型列中创建带有“@”的配置单元表?
- angular - Angular 6 Ngx typeahead 选项渲染
- google-cloud-platform - 如何仅从我的帐户内部 IP 地址授予对我的 GCP 实例之一的访问权限
- javascript - 如何将 spring.bind 变量从 freemaker 传递给 javascript 函数
- java - 维护链表插入的顺序
- google-chrome - 我可以使用 SVG 工具栏图标吗?