首页 > 解决方案 > 我如何解决断管问题 MQTT 客户端

问题描述

当我尝试向代理发布一些消息时,出现以下错误:

Exception in thread Thread-4:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/site-packages/opcua/client/client.py", line 66, in run
    self.client.open_secure_channel(renew=True)
  File "/usr/local/lib/python3.8/site-packages/opcua/client/client.py", line 325, in open_secure_channel
    result = self.uaclient.open_secure_channel(params)
  File "/usr/local/lib/python3.8/site-packages/opcua/client/ua_client.py", line 265, in open_secure_channel
    return self._uasocket.open_secure_channel(params)
  File "/usr/local/lib/python3.8/site-packages/opcua/client/ua_client.py", line 197, in open_secure_channel
    future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
  File "/usr/local/lib/python3.8/site-packages/opcua/client/ua_client.py", line 72, in _send_request
    self._socket.write(msg)
  File "/usr/local/lib/python3.8/site-packages/opcua/common/utils.py", line 118, in write
    self.socket.sendall(data)
BrokenPipeError: [Errno 32] Broken pipe

我在客户端连接后使用网络循环,我试图处理错误但是,它不起作用我总是有同样的问题,我的函数每秒被调用多次,每次我收到来自的数据更改通知时服务器。有人可以帮忙吗?下面是导致错误的代码:

def clientPub(self,top,msg):
    try: 
        cl=self.client
        if(self.connectionFlag !=1):
            cl.connect(self.broker,1883)
            cl.loop_start()            
        cl.publish(top,msg,2)
    except IOError as e: 
        if e.errno == errno.EPIPE:
            print("Trying again to publish the message ...")
            cl.publish(top,msg,2) 

对于来自服务器的每个通知,此函数都会调用上述函数:

def datachange_notification(self, node, val, data):
    print("data recieved :",data.monitored_item.Value.SourceTimestamp,val,machine_number+".G3_"+node.nodeid.Identifier[3:])        
    valuesDict={"name":machine_number+".G3_"+node.nodeid.Identifier[3:],"datapoint":[[str(data.monitored_item.Value.SourceTimestamp),val,3]],"attributes":{"machine-type":"opcua"}}   
    finalvalue={"messageId":str(round(time.time()*1000)),"body":valuesDict}
    sentval=json.dumps(finalvalue)
    self.clientPub(self.topic,sentval)

等你回复,谢谢。

标签: pythonunixmqttopc-uabroken-pipe

解决方案


这是一个经典的生产者-消费者问题。

您在sentval中生成和存储数据 ,同时尝试通过调用来使用sentval: self.clientPub(self.topic,sentval)

我也有类似的问题,我使用 Producer-Consumer Threading Using Lock解决了

它对我来说非常好!


推荐阅读