python - Gstreamer 添加动态解复用器元素链
问题描述
我们有多个摄像头将混合的 RTP 和 RTCP 发送到视频处理器的同一端口。在这个例子中,我只是使用原始视频帧来简化它,稍后我希望在 GPU 上解码 H.264。
使用 gst-launch 我让它工作:
gst-launch-1.0 rtpbin name=rtpbin funnel name=frtp videotestsrc pattern=ball is-live=true ! "video/x-raw,framerate=10/1" ! rtpvrawpay ssrc=10 ! rtpbin.send_rtp_sink_0 rtpbin.send_rtp_src_0 ! frtp.sink_0 rtpbin.send_rtcp_src_0 ! frtp.sink_1 frtp.src ! udpsink host=127.0.0.1 port=5000
gst-launch-1.0 -v rtpssrcdemux name=rtpdemux udpsrc name=udpsrc port=5000 ! rtpdemux.sink rtpdemux.src_10 ! "application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)RAW, sampling=(string)RGBA, depth=(string)10, width=(string)320, height=(string)240, colorimetry=(string)BT601-5, payload=(int)96" ! rtpvrawdepay ! videoconvert ! autovideosink
同样,如果我在 python 代码中使用相同的字符串并使用它解析它Gst.parse_launch
,在这种情况下,我已经在发送方设置了 ssrc,因为它是分路器上的 pad 名称的一部分。
但是当我尝试在 python 中动态构建链时,它失败了。关于如何解决这个问题的任何建议?这是我的测试代码:
import gi
import time
gi.require_version('Gst', '1.0')
from gi.repository import Gst
class Video():
def __init__(self, port=5000):
Gst.init(None)
self.port = port
# UDP video mux stream (:5000)
self.launch_pipline = [ 'rtpssrcdemux name=rtpdemux',
f'udpsrc name=udpsrc port={port}',
'! rtpdemux.sink',
# 'rtpdemux.src_10'
# '! application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)RAW,sampling=(string)RGBA,depth=(string)10,width=(string)320,height=(string)240,colorimetry=(string)BT601-5,payload=(int)96',
# '! rtpvrawdepay name=depay ! videoconvert',
# f'! appsink name=appsink{port} emit-signals=true sync=false max-buffers=0 drop=true'
]
self.start_gst(self.launch_pipline)
def start_gst(self, config):
command = ' '.join(config)
print(command)
self.video_pipe = Gst.parse_launch(command)
self.video_pipe.set_state(Gst.State.PLAYING)
self.demuxer = self.video_pipe.get_by_name('rtpdemux')
self.demuxer.connect("pad-added", self._demuxer_new_pad)
bus = self.video_pipe.get_bus()
bus.add_signal_watch()
bus.connect("message::error", self._on_error)
bus.connect("message::eos", self._on_eos)
self.app_sink = {}
#------
# app_sink = self.video_pipe.get_by_name(f'appsink{self.port}')
# app_sink.connect('new-sample', self._new_frame)
def _on_error(self, _, message):
err, debug = message.parse_error()
print("Error: %s" % err, debug)
def _on_eos(self, _, message):
print("EOF!")
def _demuxer_new_pad(self, demuxer, pad):
name = pad.get_name()
print(f"---------\n{demuxer}\n{pad}\n{name}\n-------")
is_rtcp = name.startswith("rtcp")
sink = Gst.ElementFactory.make("appsink", f"appsink_{name}")
sink.set_property("emit-signals", True)
sink.set_property("sync", False)
sink.set_property("max-buffers", 0)
sink.set_property("drop", True)
if not is_rtcp:
sink.connect('new-sample', self._new_frame)
self.video_pipe.add(sink)
self.app_sink[name] = sink
if is_rtcp:
chain_pad = sink.get_static_pad("sink")
else:
# caps = Gst.caps_from_string("application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)RAW,sampling=(string)RGBA,depth=(string)10,width=(string)320,height=(string)240,colorimetry=(string)BT601-5,payload=(int)96")
depay = Gst.ElementFactory.make("rtpvrawdepay")
# depay_pad = depay.get_static_pad("sink")
# depay_pad.set_caps(caps)
# pad.set_caps(caps)
convert = Gst.ElementFactory.make("videoconvert")
self.video_pipe.add(depay)
self.video_pipe.add(convert)
chain_pad = depay.get_static_pad("sink")
depay.link(convert)
convert.link(sink)
pad.link(chain_pad)
def _new_frame(self, sink):
print("in video_udp callback")
sample = sink.emit('pull-sample')
caps = sample.get_caps()
name = sink.get_name()
print(f"caps: {caps}, name: {name}")
# height = caps.get_structure(0).get_value('height')
# width = caps.get_structure(0).get_value('width')
# got_buf = sample.get_buffer() is not None
# print(f"{height}x{width} {got_buf}")
return Gst.FlowReturn.OK
if __name__ == '__main__':
# Create the video object
# Add port= if is necessary to use a different one
video = Video(port=5000)
time.sleep(1000)
解决方案
所以事实证明我错过了添加到管道中的任何新元素都需要设置为播放。我错过了表明管道的所有元素都处于相同状态的文档。以下是使其工作的更改:
def _demuxer_new_pad(self, demuxer, pad):
name = pad.get_name()
print(f"---------\n{demuxer}\n{pad}\n{name}\n-------")
is_rtcp = name.startswith("rtcp")
sink = Gst.ElementFactory.make("appsink", f"appsink_{name}")
sink.set_property("emit-signals", True)
sink.set_property("sync", False)
sink.set_property("max-buffers", 0)
sink.set_property("drop", True)
if not is_rtcp:
sink.connect('new-sample', self._new_frame)
self.video_pipe.add(sink)
self.app_sink[name] = sink
if is_rtcp:
chain_pad = sink.get_static_pad("sink")
pad.link(chain_pad)
sink.set_state(Gst.State.PLAYING)
else:
caps = Gst.caps_from_string("application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)RAW,sampling=(string)RGBA,depth=(string)10,width=(string)320,height=(string)240,colorimetry=(string)BT601-5,payload=(int)96")
depay = Gst.ElementFactory.make("rtpvrawdepay")
pad.set_caps(caps) # gives warning, but ok
convert = Gst.ElementFactory.make("videoconvert")
self.video_pipe.add(depay)
self.video_pipe.add(convert)
chain_pad = depay.get_static_pad("sink")
depay.link(convert)
convert.link(sink)
pad.link(chain_pad)
depay.set_state(Gst.State.PLAYING)
convert.set_state(Gst.State.PLAYING)
sink.set_state(Gst.State.PLAYING)
推荐阅读
- java - JNLP 没有自动下载
- mysql - 是否有一个 MySQL 函数可以将多个单元格中的数据拉到一个单元格中
- python - 保存在 excel 中的 Python 浮点值 4.999999999999999 显示为 5.0
- oracle - 在 LOV Oracle 表单中调用 GET_LIST
- sql - BigQuery 重复的 rank() 数字
- java - Java - 从文本文件中读取并将项目(字符串,数组列表)添加到哈希图中
- android - kotlin - 以编程方式为视图设置 aplha
- javascript - Javascript API GET 请求失败,代码为 404。服务器响应被截断
- python - 多维度上的 Tensorflow embedding_lookup
- python-3.x - 删除烛台_ohlc 图表中两个日期之间的空格