首页 > 解决方案 > Gstreamer 添加动态解复用器元素链

问题描述

我们有多个摄像头将混合的 RTP 和 RTCP 发送到视频处理器的同一端口。在这个例子中,我只是使用原始视频帧来简化它,稍后我希望在 GPU 上解码 H.264。

使用 gst-launch 我让它工作:

gst-launch-1.0 rtpbin name=rtpbin funnel name=frtp videotestsrc pattern=ball is-live=true ! "video/x-raw,framerate=10/1" ! rtpvrawpay ssrc=10 ! rtpbin.send_rtp_sink_0 rtpbin.send_rtp_src_0 ! frtp.sink_0 rtpbin.send_rtcp_src_0 ! frtp.sink_1 frtp.src ! udpsink host=127.0.0.1 port=5000

gst-launch-1.0 -v rtpssrcdemux name=rtpdemux udpsrc name=udpsrc port=5000 ! rtpdemux.sink rtpdemux.src_10 ! "application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)RAW, sampling=(string)RGBA, depth=(string)10, width=(string)320, height=(string)240, colorimetry=(string)BT601-5, payload=(int)96" ! rtpvrawdepay ! videoconvert ! autovideosink

同样,如果我在 python 代码中使用相同的字符串并使用它解析它Gst.parse_launch,在这种情况下,我已经在发送方设置了 ssrc,因为它是分路器上的 pad 名称的一部分。

但是当我尝试在 python 中动态构建链时,它失败了。关于如何解决这个问题的任何建议?这是我的测试代码:

import gi
import time
gi.require_version('Gst', '1.0')
from gi.repository import Gst


class Video():

    def __init__(self, port=5000):
        Gst.init(None)

        self.port = port

        # UDP video mux stream (:5000)
        self.launch_pipline = [ 'rtpssrcdemux name=rtpdemux',
                               f'udpsrc name=udpsrc port={port}',
                                '! rtpdemux.sink', 
                            #     'rtpdemux.src_10'
                            #     '! application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)RAW,sampling=(string)RGBA,depth=(string)10,width=(string)320,height=(string)240,colorimetry=(string)BT601-5,payload=(int)96', 
                            #     '! rtpvrawdepay name=depay ! videoconvert', 
                            #    f'! appsink name=appsink{port} emit-signals=true sync=false max-buffers=0 drop=true'
                            ]
        self.start_gst(self.launch_pipline)

    def start_gst(self, config):
        command = ' '.join(config)
        print(command)
        self.video_pipe = Gst.parse_launch(command)
        self.video_pipe.set_state(Gst.State.PLAYING)
        self.demuxer = self.video_pipe.get_by_name('rtpdemux')
        self.demuxer.connect("pad-added", self._demuxer_new_pad)
        bus = self.video_pipe.get_bus()
        bus.add_signal_watch()
        bus.connect("message::error", self._on_error)
        bus.connect("message::eos", self._on_eos)
        self.app_sink = {}
        #------
        # app_sink = self.video_pipe.get_by_name(f'appsink{self.port}')
        # app_sink.connect('new-sample', self._new_frame)

    def _on_error(self, _, message):
        err, debug = message.parse_error()
        print("Error: %s" % err, debug)

    def _on_eos(self, _, message):
        print("EOF!")

    def _demuxer_new_pad(self, demuxer, pad):
        name = pad.get_name()
        print(f"---------\n{demuxer}\n{pad}\n{name}\n-------")
        is_rtcp = name.startswith("rtcp")
        sink = Gst.ElementFactory.make("appsink", f"appsink_{name}")
        sink.set_property("emit-signals", True)
        sink.set_property("sync", False)
        sink.set_property("max-buffers", 0)
        sink.set_property("drop", True)
        if not is_rtcp:
            sink.connect('new-sample', self._new_frame)
        self.video_pipe.add(sink)
        self.app_sink[name] = sink
        if is_rtcp:
            chain_pad = sink.get_static_pad("sink")
        else:
            # caps = Gst.caps_from_string("application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)RAW,sampling=(string)RGBA,depth=(string)10,width=(string)320,height=(string)240,colorimetry=(string)BT601-5,payload=(int)96")
            depay = Gst.ElementFactory.make("rtpvrawdepay")
            # depay_pad = depay.get_static_pad("sink")
            # depay_pad.set_caps(caps)
            # pad.set_caps(caps)
            convert = Gst.ElementFactory.make("videoconvert")
            self.video_pipe.add(depay)
            self.video_pipe.add(convert)
            chain_pad = depay.get_static_pad("sink")
            depay.link(convert)
            convert.link(sink)
        pad.link(chain_pad)

    def _new_frame(self, sink):
        print("in video_udp callback")
        sample = sink.emit('pull-sample')
        caps = sample.get_caps()
        name = sink.get_name()
        print(f"caps: {caps}, name: {name}")
        # height = caps.get_structure(0).get_value('height')
        # width = caps.get_structure(0).get_value('width')
        # got_buf = sample.get_buffer() is not None
        # print(f"{height}x{width} {got_buf}")
        return Gst.FlowReturn.OK

if __name__ == '__main__':
    # Create the video object
    # Add port= if is necessary to use a different one
    video = Video(port=5000)
    time.sleep(1000)

标签: pythongstreamer

解决方案


所以事实证明我错过了添加到管道中的任何新元素都需要设置为播放。我错过了表明管道的所有元素都处于相同状态的文档。以下是使其工作的更改:

    def _demuxer_new_pad(self, demuxer, pad):
        name = pad.get_name()
        print(f"---------\n{demuxer}\n{pad}\n{name}\n-------")
        is_rtcp = name.startswith("rtcp")
        sink = Gst.ElementFactory.make("appsink", f"appsink_{name}")
        sink.set_property("emit-signals", True)
        sink.set_property("sync", False)
        sink.set_property("max-buffers", 0)
        sink.set_property("drop", True)
        if not is_rtcp:
            sink.connect('new-sample', self._new_frame)
        self.video_pipe.add(sink)
        self.app_sink[name] = sink
        if is_rtcp:
            chain_pad = sink.get_static_pad("sink")
            pad.link(chain_pad)
            sink.set_state(Gst.State.PLAYING)
        else:
            caps = Gst.caps_from_string("application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)RAW,sampling=(string)RGBA,depth=(string)10,width=(string)320,height=(string)240,colorimetry=(string)BT601-5,payload=(int)96")
            depay = Gst.ElementFactory.make("rtpvrawdepay")
            pad.set_caps(caps) # gives warning, but ok
            convert = Gst.ElementFactory.make("videoconvert")
            self.video_pipe.add(depay)
            self.video_pipe.add(convert)
            chain_pad = depay.get_static_pad("sink")
            depay.link(convert)
            convert.link(sink)
            pad.link(chain_pad)
            depay.set_state(Gst.State.PLAYING)
            convert.set_state(Gst.State.PLAYING)
            sink.set_state(Gst.State.PLAYING)

推荐阅读