首页 > 解决方案 > 从类运行时出现python nats连接错误

问题描述

我正在尝试运行一个在继续之前等待特定 nats 消息的应用程序。所以我创建了以下发送消息并监听消息的类:

#!/usr/bin/python3
from nats.aio.client import Client as NATS
import asyncio


class NatsService:

    def __init__(self):
        self.NATSERVER = "XXXXX"
        self.CHANNEL = "msgchannel"
        self.CONNECTTIMEOUT = 1
        self.MAXRECONNECTATTEMPTS = 1
        self.RECONNECTTIMEWAIT = 1
        self.nc = NATS()

    async def send_message(self, message, channel=None):
        if not channel:
            channel = self.CHANNEL

        print("connecting to nats server")
        await self.nc.connect(self.NATSERVER, self.CONNECTTIMEOUT,
                              max_reconnect_attempts=self.MAXRECONNECTATTEMPTS,
                              reconnect_time_wait=self.RECONNECTTIMEWAIT)

        print(f"Publishing message: '{message}' to channel: {channel}")
        await self.nc.publish(channel, message.encode('utf-8'))
        print("message sent, closing connection")
        await self.nc.close()
        print("nats server connection closed")

    def start_listening(self):
        loop = asyncio.get_event_loop()
        try:
            loop.create_task(self.listener_loop(loop))
            loop.run_forever()
        finally:
            loop.close()

    async def listener_loop(self, loop):
        print("connecting to nats listener loop")
        await self.nc.connect(self.NATSERVER, loop=loop)

        async def message_handler(msg):
            subject = msg.subject
            data = msg.data.decode()
            print('Received a message on {}: {}'.format(subject, data))

            if eval(data.split(":::")[1]):
                print("message received, closing")
                await nc.drain()    # timeout occurs for some reason
                print("stopping loop")
                loop.stop()
            
        await self.nc.subscribe(self.CHANNEL, cb=msg_handler)

我在两个应用程序中导入这个类,一个应该发送消息,一个应该监听这些消息,直到收到正确的消息。

我的主应用程序侦听消息,并且仅在收到正确消息后才继续

from nats_service import NatsService

try:
    print("starting nats service instance")
    ns = NatsService()
    print("listening for approved message")
    start_listening()
except Exception as e:
    print(f"Error: {e}")

print(f"Contiuing with application...")

另一个应用程序用于发送消息:

from nats_service import NatsService
import asyncio

async def main():
    ns = NatsService()
    message = "test"
    await ns.send_message(message)

if __name__=="__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print(f"Completed sender function.")

我能够在我将它们放在一起之前创建的独立函数上发送和接收消息。但是从上面的类中导入它们时,我似乎无法运行它们,尤其是遇到了 asyncio 的问题。

经过一些试验和错误,当我运行发件人时,它似乎终于开始了,但立即失败,出现一个我不明白的错误,找不到太多关于:

connecting to nats server
nats: encountered error
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 1185, in _select_next_server
    connection_future, self.options['connect_timeout']
  File "/usr/lib/python3.7/asyncio/tasks.py", line 416, in wait_for
    return fut.result()
  File "/usr/lib/python3.7/asyncio/streams.py", line 75, in open_connection
    protocol = StreamReaderProtocol(reader, loop=loop)
  File "/usr/lib/python3.7/asyncio/streams.py", line 227, in __init__
    self._closed = self._loop.create_future()
AttributeError: 'int' object has no attribute 'create_future'
Traceback (most recent call last):
  File "sender_side.py", line 13, in <module>
    loop.run_until_complete(main())
  File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "sender_side.py", line 9, in main
    await ns.send_message(message)
  File "/home/bot10-sigma/nats_tests/nats_service.py", line 23, in send_message
    reconnect_time_wait=self.RECONNECTTIMEWAIT)
  File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 317, in connect
    await self._select_next_server()
  File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 1174, in _select_next_server
    self.options["reconnect_time_wait"], loop=self._loop
  File "/usr/lib/python3.7/asyncio/tasks.py", line 563, in sleep
    future = loop.create_future()
AttributeError: 'int' object has no attribute 'create_future'
Exception ignored in: <function StreamReaderProtocol.__del__ at 0x761cacd8>
Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/streams.py", line 271, in __del__
AttributeError: 'StreamReaderProtocol' object has no attribute '_closed'

标签: pythonpython-asyncionats.io

解决方案


您将self.CONNECTTIMEOUT作为位置参数传递到Client.connect它期望其io_loop参数引用的位置。这就是为什么你得到一个AttributeError: anint没有create_future属性。通过超时connect_timeout=self.CONNECTTIMEOUT,这个问题应该会消失。


推荐阅读