首页 > 技术文章 > python:RabbitMQ 消息队列 学习笔记(3) RPC

gtq7512 2019-08-30 16:52 原文

RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。

代码区:

#7512



import pika,uuid,time

class Rpc_client(object):

    def __init__(self,host = "localhost"):
        self.client = pika.BlockingConnection(pika.ConnectionParameters(host))
        self.channel = self.client.channel()
        self.channel.queue_declare(queue='com_queue')
        self.result = self.channel.queue_declare(queue='',exclusive=True)
        self.replay_queue = self.result.method.queue
        self.uuid = str(uuid.uuid4())
        self.respond = None
        self.channel.basic_consume(queue=self.replay_queue,
                                   on_message_callback=self.reply_callbak,
                                   auto_ack=True)


    def com_call(self,msg):
        self.channel.basic_publish(exchange = '',
                                   routing_key = 'com_queue',
                                   body = str(msg).encode() ,
                                   properties = pika.BasicProperties(
                                       reply_to = self.replay_queue,
                                       correlation_id= self.uuid
                                   ))

        while self.respond is None:
            self.client.process_data_events()
            # print("no msg ......")
            # time.sleep(2)
        return  int(self.respond)

    # def aa(self):
    #     self.client.process_data_events()
    def reply_callbak(self,ch , method ,property , body):

        if property.correlation_id == self.uuid:
            self.respond = body


if __name__ == "__main__":
    rpc = Rpc_client()
    while True:

        msg = input(">>:")
        if len(msg) == 0:
            continue
        print("[rpc]  recv the resulet is [%d]"%rpc.com_call(int(msg)))
        rpc.respond = None
RPC client
 1 #7512
 2 
 3 import pika
 4 
 5 class Rpc_server(object):
 6 
 7     def __init__(self,host = "localhost"):
 8         self.server  = pika.BlockingConnection(pika.ConnectionParameters(host=host))
 9         self.channel = self.server.channel()
10         self.channel.queue_declare(queue = 'com_queue')
11         self.channel.basic_consume(queue = 'com_queue',
12                                    on_message_callback= self.com_calback ,
13                                    auto_ack = False)
14         self.channel.start_consuming()
15 
16     def fibonacci(self,n):
17         a, b, c = 0, 0, 1
18         for i in range(n - 1):
19             a = b
20             b = c
21             c = a + b
22         return c
23 
24 
25     def com_calback(self,ch , method ,property ,msg):
26 
27         self.result = str(self.fibonacci(int(msg)))
28 
29         ch.basic_publish(exchange='',
30                                    routing_key = property.reply_to,
31                                    body = self.result.encode(),
32                                    properties=pika.BasicProperties(
33                                        correlation_id=property.correlation_id
34                                    )
35                                    )
36         ch.basic_ack(delivery_tag=method.delivery_tag)
37 
38 if __name__ =="__main__":
39     rpc = Rpc_server()
RPC server

 

推荐阅读