首页 > 解决方案 > 在有互联网之前,您如何在本地排队消息?

问题描述

我有一个物联网设备,它通过亚马逊 SQS(boto3 库)向我们的服务器发送消息。我们的设备通过 3G USB 加密狗连接到互联网。

在我的 Python 代码中,如果无法发送 SQS 消息,则有一个try/块来捕获它。except我们的加密狗有 99% 的正常运行时间,但我需要找到解决 1% 停机时间的解决方案。

如何在互联网再次可用时以 python 方式创建本地队列以发送消息?我是否需要实现rabbitmq,或者只是创建一个5秒循环的线程,在消息发送时结束?

def send_message(sqs):
    try:
        msg_json = json.dumps({"function": hello, "userid": userid})
        sqs.send_message(
            QueueUrl=que,
            DelaySeconds=1,
            MessageAttributes={},
            MessageBody=(msg_json)
        )
    except Exception as e:
        print("Could not send")
        print(e)

标签: pythonrabbitmqiotamazon-sqs

解决方案


您必须在连接恢复后立即发送消息还是可以等到下一条消息?

在调用的同一线程上重试send_message可能会停止整个应用程序。另一方面,等待另一个呼叫send_message可能会使前一个呼叫延迟太久。

我赞成尽快发送消息,这就是为什么我宁愿异步发送所有消息,如下所示:

import time
import threading

class Messenger:
  def __init__(self, queue_url) -> None:
      self.msg_queue = []
      self.queue_url = queue_url
      self.running = True
      self.thread = threading.Thread(self.send_worker)

  def stop(self):
    self.running = False
    self.thread.join()

  def send_message(self, msg):
    if self.running:
      self.msg_queue.append(msg)

  def send_worker(self):
    while self.running or len(self.queue) > 0:

      msg = self.queue.pop()

      # retry forever      
      while not self.send_message_impl(msg):
        time.sleep(1)

      # sleep a little to prevent high cpu usage
      # you can use condition variables if you want
      if self.running and len(self.queue) == 0:
        time.sleep(1)

  def send_message_impl(sqs):
      try:
          msg_json = json.dumps(msg)
          sqs.send_message(
              QueueUrl=self.queue_url,
              DelaySeconds=1,
              MessageAttributes={},
              MessageBody=(msg_json)
          )
          return True
      except Exception as e:
          print("Could not send")
          print(e)
          return False

推荐阅读