首页 > 解决方案 > 在 django Serializer save() 方法中发布到 MQTT

问题描述

我有一个非常简单的 API,它使用 django 和 restframework 作为物联网设备的端点。

iot Devices --HTTP POST--> REST API (django) 验证并保存数据。

根本不需要任何渲染或 GET/PATCH/DELETE。

唯一的事情是我没有保存到数据库,但我想推送到MQTT通道(侦听消息的其他进程将保存/后处理)

因为我根本不是 django 专家,所以我的想法是覆盖save()它实际上不保存但发布的 Serializer 方法。

模型

class Meas(models.Model):
    SENSOR_TYPES = [
        ('temperature','temperature'),
        ('humidity','humidity')
        ]
    sensorType = models.CharField(max_length=100, default='UNKNOWN', choices = SENSOR_TYPES )
    sensorId = models.CharField(max_length=100)
    homeId = models.CharField(max_length=100)
    roomId = models.CharField(max_length=150)
    hubId = models.CharField(max_length=100)
    value = models.CharField(max_length=100)
    last_seen = models.DateTimeField()
    elapsed = models.IntegerField()

    objects = MeasManager()


序列化器



class MeasMQTTSerializer(serializers.ModelSerializer):


    client = RMQPublisher(ch_name=rmq_chname,routing_key=rmq_routingkey,host=rmq_host,user=rmq_user,password=rmq_pass,port=rmq_port)
    class Meta:
        model = Meas
        fields = '__all__'     
    def save(self):

        logging.debug("Saving measurement")

        measurement = self.validated_data['sensorType']
        mytags = { 
            'sensorId' : self.validated_data['sensorId'],
            'roomId' : self.validated_data['roomId'],
            'hubId' : self.validated_data['hubId'],
            'homeId' : self.validated_data['homeId'],
            'elapsed' : self.validated_data['elapsed'],
            'last_seen' : self.validated_data['last_seen']
        }
        for a, x in mytags.items():
            mytags[a]=str(x)
        value = float(self.validated_data['value'])

        rmq_msg = mytags
        rmq_msg['value']=value
        rmq_msg['measurement'] = measurement
        MeasMQTTSerializer.client.pushObject(rmq_msg)
        logging.debug("Pushed to RMQ")  

和 RMQPublisher 只使用简单的推送到 mqtt:

class RMQPublisher(object):



    def __init__(self,ch_name,routing_key,host,user,password,port):
        self.routing_key = routing_key
        self.ch_name     = ch_name
        self.rmq_host    = host
        self.rmq_user    = user
        self.rmq_pass    = password
        self.rmq_port    = port
        pass


    def pushObject(self,object):
        self.connect()
        self.sendMessage(object)
        self.disconnect()


    def connect(self):
        credentials = pika.PlainCredentials(self.rmq_user, self.rmq_pass)
        parameters = pika.ConnectionParameters(self.rmq_host,self.rmq_port,'/',credentials)
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()
        self.channel.exchange_declare(exchange=self.ch_name, exchange_type='direct')

    def sendMessage(self,pushObj):   
        message = {}
        message["content"]  = pushObj

        self.channel.basic_publish(exchange=self.ch_name,
                            routing_key=self.routing_key,
                            body=json.dumps(message))

    def disconnect(self):
        self.connection.close()


方法似乎是一半的工作,因为一些测量被推动,但对于一些测量,我得到了一个我无法真正找到根本原因的错误。

我的猜测是,随着消息数量的增加,序列化程序的生命周期会出现问题,我可能无法正确理解

safeh-api      |     body=json.dumps(message))
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2248, in basic_publish
safeh-api      |     self._flush_output()
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1336, in _flush_output
safeh-api      |     self._connection._flush_output(lambda: self.is_closed, *waiters)
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 513, in _flush_output
safeh-api      |     self._impl.ioloop.poll()
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 576, in poll
safeh-api      |     self._poller.poll()
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 1200, in poll
safeh-api      |     self._dispatch_fd_events(fd_event_map)
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 904, in _dispatch_fd_events
safeh-api      |     handler(fileno, events)
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/selector_ioloop_adapter.py", line 391, in _on_reader_writer_fd_events
safeh-api      |     callbacks.writer()
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/diagnostic_utils.py", line 53, in log_exception_func_wrap
safeh-api      |     return func(*args, **kwargs)
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/io_services_utils.py", line 1108, in _on_socket_writable
safeh-api      |     self._initiate_abort(error)
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/diagnostic_utils.py", line 53, in log_exception_func_wrap
safeh-api      |     return func(*args, **kwargs)
safeh-api      |   File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/io_services_utils.py", line 911, in _initiate_abort
safeh-api      |     'non-_STATE_COMPLETED', self._state)
safeh-api      | AssertionError: ('_AsyncTransportBase._initate_abort() expected non-_STATE_COMPLETED', 4)

任何更好的设计或修复现有的提示将不胜感激!谢谢!

标签: pythondjangorabbitmqmqttpika

解决方案


我知道这听起来像是一个偏执的解决方案,但我最终将这个简单的 API 重写为 3 个文件 flask-restplus 应用程序。MQTT 的相同模块具有魅力,因此可能与我对 django 对象生命周期的误解有关。


推荐阅读