首页 > 解决方案 > ZMQ pub/sub 用于传输 base64 图像

问题描述

我面临一个任务:使用 zmq 套接字发送和接收 base64 字符串(从 800x600 图像生成)。目前,我正在使用 pub/sub 连接来执行此任务。但是看起来消息很大以至于套接字无法立即传输它,并且后​​面的消息卡在网络缓冲区中。虽然我不想丢失这么多消息,但我必须限制 HWM 值,以便套接字正常工作。所以我有一些问题:

  1. 是否有另一种有效的库/方式来执行我的任务?或者我应该使用 zmq 提供的其他连接类型(路由器/经销商和请求/回复)?
  2. 要传输图像(由 OpenCV 处理的图像),除了转换为 base64 格式外,我是否可以使用一种方法来最小化发送图像的大小?
  3. 如果我必须继续使用 zmq pub/sub 连接,我该如何限制存储旧消息的时间,而不是它们的数量,比如 3 分钟?

这是我的套接字python代码:

出版商

import numpy as np
import zmq
import base64
context = zmq.Context()
footage_socket = context.socket(zmq.PUB)
footage_socket.setsockopt(zmq.SNDHWM, 1)
footage_socket.connect(<tcp address>)

def send_func(frame, camera_link):
   height, width, _ = frame.shape
   frame = np.ascontiguousarray(frame)
   base64_img = base64.b64encode(frame)
   data = {"camera_link":camera_link,'base64_img':base64_img, "img_width":width, "img_height":height}
   footage_socket.send_json(data)

订阅者

footage_socket = context.socket(zmq.SUB)
footage_socket.bind(<tcp address>)
footage_socket.setsockopt(zmq.RCVHWM, 1)
def rcv_func(): 
    while True:
        print("run socket")
        try:
            framex = footage_socket.recv_string()
            data = json.loads(framex)
            frame = data['base64_img']
            img = np.frombuffer(base64.b64decode(frame), np.uint8)
            img = img.reshape(int(frame_height), int(frame_width), 3)

        except Exception as e:
            print(e)

标签: pythonsocketsimage-processingbase64zeromq

解决方案


在开始之前,让我做一些说明: -如果只是为了便于编码,请

避免将数据重新打包到 JSON中。JSON-re-serialised 数据“增长”的大小,没有为您提供超快速和资源高效的流处理的单一增值。只有当专业系统有足够的时间和几乎无限的备用 CPU 处理能力时,他们才会“求助” JSON 格式,他们浪费在将有价值的数据重新打包到另一个盒子内的另一个数据盒中 -数据的。在可行的情况下,他们可以支付所有成本和效率低下 - 在这里,您将得不到任何东西来换取所用的 CPU 时钟,超过一倍的 RAM - 需要重新打包自己并且还必须传输更大的数据

-审查,如果相机确实提供了“值得”成为8 字节/64 位“深度”的图像数据,如果没有,您将获得第一个显着的免费图像数据缩减

使用sys.getsizeof()可能会让您感到惊讶:

>>> aa = np.ones( 1000 )
>>> sys.getsizeof(  aa )
8096 <---------------------------- 8096 [B] object here "contains"
>>> (lambda array2TEST: array2TEST.itemsize * array2TEST.size )( aa )
8000 <---------------------------- 8000 [B] of data
>>> bb = aa.view()     # a similar effect happen in smart VECTORISED computing
>>> sys.getsizeof( bb )
96 <------------------------------   96 [B] object here "contains"
>>> (lambda array2TEST: array2TEST.itemsize * array2TEST.size )( bb )
8000 <---------------------------- 8000 [B] of data
>>> bb.flags
  C_CONTIGUOUS    : True
  F_CONTIGUOUS    : True
  OWNDATA         : False <-------------------------------||||||||||
  WRITEABLE       : True
  ALIGNED         : True
  WRITEBACKIFCOPY : False
  UPDATEIFCOPY    : False
  >>> bb.dtype
  dtype('float64') <--------------    8 [B] per image-pixel even for {1|0} B/W

我可以使用一种方法来最小化发送图像的大小......?

是的,已经花费了数百万的[man * years]研发资金,致力于解决这个问题,并且仍在不断发展最好的方法来解决这个问题。

最好的结果,正如任何人自己可能已经预料到的那样,对于极端极端的情况是需要的——从遥远的深空传输卫星图像,回到家——就像 JAXA 执行第二次小行星会合任务时一样,这次访问的是龙宫小行星

您的原样代码以迄今为止未指定的 fps 速率和颜色深度生成 800x600 图像帧。一个简短的视图显示,如果处理过程没有得到更多的关注和应有​​的注意,那么在-3 分钟内可以轻松生成多少数据:

>>> (lambda a2T: a2T.itemsize * a2T.size )( np.ones( ( 800, 600, 3 ) ) ) / 1E6
11.52 <---- each 800x600-RGB-FRAME handled/processed this way takes ~ 11.5 [MB]
                    @~30 fps                                        ~345.6 [MB/s]
                                                                    ~ 62.2 [GB/3min]

解决方案?从一流的专业知识中汲取灵感:

在那里,您的功率有限(无论是在能源方面还是在处理方面 - 不要忘记,这颗卫星“内部”的 CPU-s 已经在大约 5 到 7 年前制造,在项目启动之前 - 没有人认真将敢于用明亮而炙手可热的新但未经证实的 COTS 芯片发送任务),有限的 RAM(再次,功率加上重量限制,因为起飞和“那里”飞行所需的燃料量随着每一克的增加而增加“有用的有效载荷”)和最后但并非最不重要的 - 最大的限制因素 - 你的 R/F-COMM 手段非常有限- 如此“loooooooong”-wire(如果您尝试从“这里”回答任何远程请求或检测到错误后请求重新发送)。当前的 DSN 有效遥测数据传输速度约为 6.4 ~ 9.6 kbps(是的,不超过约 7000 位/秒)

在这里,最聪明的头脑已经将人类智慧的所有艺术投入到实现这一目标中:

  • 图像压缩的终极手段——除非它确实是至关重要和必要的,否则永远不要发送一点
  • 图像转码数据 错误自纠的终极手段添加 - 如果有任何值得添加的内容,则不需要进行错误检测(您将不得不等待将近一天,才能再次“重新传输”它,希望那里没有另一个错误)。这里我们需要一种(有限的——见上面发送单个比特的成本,所以这必须是非常经济的附加)自我纠正的方法,它确实可以修复一些有限范围的信号/数据传输错误,在从深空返回家园的 R/F-COMM 信号期间可能会出现并且确实会出现。对于较大的错误,您必须等待几天才能通过另一次尝试发送更大的数据包来解决重新安排的图像数据错误恢复,这是无法通过内置功能从“损坏”数据中恢复的- 错误自我纠正。

从哪里开始?

如果您的用例没有大量可用的“备用” CPU 能力(确实需要有足够的“空闲” CPU+RAM 资源来执行任何此类高级图像数据转码和错误- 恢复再处理,无论是在规模上(用于转码和再处理的额外数据量 - 两者都是大尺寸 - 比单个图像帧的大小大几个数量级)和时间(额外 CPU 处理的速度 ) )获得最终的图像数据压缩并没有什么神奇的技巧,您的故事到此结束。

如果您的用例可以提高 CPU 能力,那么您的下一个敌人就是时间。设计一个足够聪明的图像处理的时间和处理每个图像帧的时间,使用您设计的图像数据转码,在相当短的时间内,然后发送到接收端。前者可由您的项目资源管理(通过财务- 让合适的熟练工程师加入,以及由执行(执行)实际设计和工程阶段的人员)后者是不可管理的,它取决于您的项目的需求 - 您的项目仍然可以存活以执行预期功能的速度(fps)和承受的延迟(多晚,累积 [ms] 延迟)。

  • python是一个简单的原型生态系统,一旦你需要提高吞吐量(参考上面),这很可能(30 多年的经验让我非常有信心这么说 - 即使你加入附加类固醇,就像进入 cython + C-extensions 来完成整个马戏团确实有点,但只是快一点,必须付出巨大的附加成本(如果还没有掌握新技能 - 学习曲线持续时间昂贵,并且那些技术娴熟的人的薪水会增加)重新设计和重构您迄今为止原型良好的代码库)将成为正在进行的节目的第一个障碍

  • OpenCV可以并且将为您提供一些基本的图像处理工具

  • 图像数据转码和普通或终极数据压缩必须遵循,以减少数据大小

  • ZeroMQ是问题最少的部分——性能方面的可扩展性和独特的低延迟吞吐量能力。没有任何细节,人们可能会忘记PUB/SUB,除非你一直阻止和避免任何订阅列表处理(这样做的成本会对 { central-node | network-dataflows + all remote-nodes 造成巨大的副作用}-重载,对预期的快速和正确大小的图像数据管道处理没有实际影响。


Q :如果我必须继续使用 zmq pub/sub 连接,我怎样才能限制存储旧消息的时间,而不是它们的数量,比如 3 分钟?

ZeroMQ 是一个智能工具,但必须了解它的强大功能 - ZeroCopy 将帮助您在生产中保持低 RAM 配置,但如果您计划存储 -3 分钟的图像数据流,您将需要两个巨大的负载RAM 和 CPU 能力,这一切也很大程度上取决于.recv()-ing 对等点的实际数量。

ZeroMQ 是一个无代理系统,因此您实际上并没有“存储”消息,但.send()- 方法只是告诉 ZeroMQ 基础设施,只要 ZeroMQ 基础设施有机会发送,所提供的数据是免费发送的将它们发送给指定的对等接收者(无论是在本地还是在大西洋上或通过卫星连接)。这意味着,正确的 ZeroMQ 配置是必须的,如果您计划让发送/接收端准备好入队/传输/接收/出队约 3 分钟的压缩图像数据流,可能会提供倍数,如果 1:多方通信出现在生产中。

正确的分析和合理的设计决策是您的项目能够满足所有这些要求的唯一机会,因为 CPU、RAM 和传输装置先验已知是有限的。


推荐阅读