python - 在 django Serializer save() 方法中发布到 MQTT
问题描述
我有一个非常简单的 API,它使用 django 和 restframework 作为物联网设备的端点。
iot Devices --HTTP POST--> REST API (django) 验证并保存数据。
根本不需要任何渲染或 GET/PATCH/DELETE。
唯一的事情是我没有保存到数据库,但我想推送到MQTT通道(侦听消息的其他进程将保存/后处理)
因为我根本不是 django 专家,所以我的想法是覆盖save()
它实际上不保存但发布的 Serializer 方法。
模型
class Meas(models.Model):
SENSOR_TYPES = [
('temperature','temperature'),
('humidity','humidity')
]
sensorType = models.CharField(max_length=100, default='UNKNOWN', choices = SENSOR_TYPES )
sensorId = models.CharField(max_length=100)
homeId = models.CharField(max_length=100)
roomId = models.CharField(max_length=150)
hubId = models.CharField(max_length=100)
value = models.CharField(max_length=100)
last_seen = models.DateTimeField()
elapsed = models.IntegerField()
objects = MeasManager()
序列化器
class MeasMQTTSerializer(serializers.ModelSerializer):
client = RMQPublisher(ch_name=rmq_chname,routing_key=rmq_routingkey,host=rmq_host,user=rmq_user,password=rmq_pass,port=rmq_port)
class Meta:
model = Meas
fields = '__all__'
def save(self):
logging.debug("Saving measurement")
measurement = self.validated_data['sensorType']
mytags = {
'sensorId' : self.validated_data['sensorId'],
'roomId' : self.validated_data['roomId'],
'hubId' : self.validated_data['hubId'],
'homeId' : self.validated_data['homeId'],
'elapsed' : self.validated_data['elapsed'],
'last_seen' : self.validated_data['last_seen']
}
for a, x in mytags.items():
mytags[a]=str(x)
value = float(self.validated_data['value'])
rmq_msg = mytags
rmq_msg['value']=value
rmq_msg['measurement'] = measurement
MeasMQTTSerializer.client.pushObject(rmq_msg)
logging.debug("Pushed to RMQ")
和 RMQPublisher 只使用简单的推送到 mqtt:
class RMQPublisher(object):
def __init__(self,ch_name,routing_key,host,user,password,port):
self.routing_key = routing_key
self.ch_name = ch_name
self.rmq_host = host
self.rmq_user = user
self.rmq_pass = password
self.rmq_port = port
pass
def pushObject(self,object):
self.connect()
self.sendMessage(object)
self.disconnect()
def connect(self):
credentials = pika.PlainCredentials(self.rmq_user, self.rmq_pass)
parameters = pika.ConnectionParameters(self.rmq_host,self.rmq_port,'/',credentials)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.ch_name, exchange_type='direct')
def sendMessage(self,pushObj):
message = {}
message["content"] = pushObj
self.channel.basic_publish(exchange=self.ch_name,
routing_key=self.routing_key,
body=json.dumps(message))
def disconnect(self):
self.connection.close()
方法似乎是一半的工作,因为一些测量被推动,但对于一些测量,我得到了一个我无法真正找到根本原因的错误。
我的猜测是,随着消息数量的增加,序列化程序的生命周期会出现问题,我可能无法正确理解
safeh-api | body=json.dumps(message))
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2248, in basic_publish
safeh-api | self._flush_output()
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1336, in _flush_output
safeh-api | self._connection._flush_output(lambda: self.is_closed, *waiters)
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 513, in _flush_output
safeh-api | self._impl.ioloop.poll()
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 576, in poll
safeh-api | self._poller.poll()
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 1200, in poll
safeh-api | self._dispatch_fd_events(fd_event_map)
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 904, in _dispatch_fd_events
safeh-api | handler(fileno, events)
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/selector_ioloop_adapter.py", line 391, in _on_reader_writer_fd_events
safeh-api | callbacks.writer()
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/diagnostic_utils.py", line 53, in log_exception_func_wrap
safeh-api | return func(*args, **kwargs)
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/io_services_utils.py", line 1108, in _on_socket_writable
safeh-api | self._initiate_abort(error)
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/diagnostic_utils.py", line 53, in log_exception_func_wrap
safeh-api | return func(*args, **kwargs)
safeh-api | File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/io_services_utils.py", line 911, in _initiate_abort
safeh-api | 'non-_STATE_COMPLETED', self._state)
safeh-api | AssertionError: ('_AsyncTransportBase._initate_abort() expected non-_STATE_COMPLETED', 4)
任何更好的设计或修复现有的提示将不胜感激!谢谢!
解决方案
我知道这听起来像是一个偏执的解决方案,但我最终将这个简单的 API 重写为 3 个文件 flask-restplus 应用程序。MQTT 的相同模块具有魅力,因此可能与我对 django 对象生命周期的误解有关。
推荐阅读
- mysql - 如何在 MYSQL 中仅使用 mysqldump 特定的数据库和表
- sql-server - 将数据从 BigQuery 导入 SQL Server
- vue.js - 比较道具和数据的测试错误
- apache-spark - 使用在 spark 2.3.0 上创建的 hive 上下文查询 hive 数据库
- python - 为什么要使用虚拟插槽?
- c - 无法使预编译的标头与 arm-none-eabi-gcc 一起使用
- matlab - 使用 trapz 求曲线下面积
- python - NameError:名称“排序”未定义
- javascript - javascript原型由不同类型编写
- dart - 当我第一次渲染大图像时,颤动会等待很长时间。