python - 将来自无人机摄像机的 h264 视频输入保存为计算机上的视频文件
问题描述
我有一架 Tello ryze 无人机,上面装有摄像头。我正在连接无人机以通过 wifi 接收视频。我的系统是 Windows 10,我使用的是 python 2.7。
我收到一个 h264 字节码,我使用 Tello 的 libh264 解码器来获取视频的帧,我也显示在我的 UI 中。我需要做的是将此视频作为文件保存在我的计算机上,但我遇到了问题。我可以使用 opencv 制作快照并将其保存为图像,这不是问题。但是由于某种原因制作视频不起作用。我在这里读了很多这样的帖子,但它们不适合我。我收到错误或我收到一个非常小的视频文件,无法打开。我的帧是具有 RGB 值的列表列表,例如:
[[255,200,100][55,200,100][25,20,100]]
这是我的代码以便更好地理解以帮助我
这是 UI 部分(我只是在这里复制所需的代码):
def videoLoop(self):
try:
# start the thread that get GUI image and draw skeleton
time.sleep(0.5)
self.sending_command_thread.start()
while not self.stopEvent.is_set():
system = platform.system()
# read the frame for GUI show
self.frame = self.drone.read()
if self.frame is None or self.frame.size == 0:
continue
# transfer the format from frame to image
image = Image.fromarray(self.frame)
# we found compatibility problem between Tkinter,PIL and Macos,and it will
# sometimes result the very long preriod of the "ImageTk.PhotoImage" function,
# so for Macos,we start a new thread to execute the _updateGUIImage function.
if system == "Windows" or system == "Linux":
self.refreshUI(image)
else:
thread_tmp = threading.Thread(target=self.refreshUI, args=(image,))
thread_tmp.start()
time.sleep(0.03)
except RuntimeError as e:
print("[INFO] caught a RuntimeError")
def refreshUI(self, image):
image = ImageTk.PhotoImage(image)
# if the imagePanel none ,we need to initial it
if self.imagePanel is None:
self.imagePanel = tki.Label(image=image)
self.imagePanel.image = image
self.imagePanel.pack(side="left", fill="both",
expand="yes", padx=10, pady=10)
# otherwise, simply update the imagePanel
else:
self.imagePanel.configure(image=image)
self.imagePanel.image = image
def takeSnapshot(self):
# grab the current timestamp and use it to construct the filename
ts = datetime.datetime.now()
filename = "{}.jpg".format(ts.strftime("%d-%m-%Y_%H-%M-%S"))
p = os.path.sep.join((self.screenShotPath, filename))
# save the file
cv2.imwrite(p, cv2.cvtColor(self.frame, cv2.COLOR_RGB2BGR))
print("[INFO] saved {}".format(filename))
您可以在下面找到 Tello 代码(也只有需要的部分):
def read(self):
"""Return the last frame from camera."""
if self.is_freeze:
return self.last_frame
else:
return self.frame
def video_freeze(self, is_freeze=True):
"""Pause video output -- set is_freeze to True"""
self.is_freeze = is_freeze
if is_freeze:
self.last_frame = self.frame
def _receive_video_thread(self):
"""
Listens for video streaming (raw h264) from the Tello.
Runs as a thread, sets self.frame to the most recent frame Tello captured.
"""
packet_data = b''
while True:
try:
res_string, ip = self.socket_video.recvfrom(2048)
packet_data += res_string
# end of frame
if len(res_string) != 1460:
for frame in self._h264_decod(packet_data):
self.frame = frame
packet_data = b''
except socket.error as exc:
print(("Caught exception sock.error : %s" % exc))
def _h264_decod(self, packet_data):
"""
decode raw h264 format data from Tello
:param packet_data: raw h264 data array
:return: a list of decoded frame
"""
res_frame_list = []
frames = self.decoder.decode(packet_data)
for framedata in frames:
(frame, w, h, ls) = framedata
if frame is not None:
# print ('frame size %i bytes, w %i, h %i, linesize %i' % (len(frame), w, h, ls))
frame = np.frombuffer(frame, dtype=np.ubyte, count=len(frame))
frame = (frame.reshape((h, ls // 3, 3)))
frame = frame[:, :w, :]
res_frame_list.append(frame)
return res_frame_list
如果有人能帮我写一个像这样的伪代码这样的方法,你会非常好心:
def saveVideo(self, frame_or_whatever_i_need_here):
out = cv2.VideoWriter('output.avi_or_other_format', -1, 20.0, (640,480))
out.write(frame_or_whatever_i_need_here)
out.release()
编辑 1:我找到了一个从我的快照中制作视频的选项,这意味着我可以通过线程制作快照,然后将它们保存到视频中。那将是一个选择。问题是,它会占用磁盘中的太多空间。解决方法链接在这里
解决方案
我找到了解决方案,所以如果有人需要同样的东西,我会在这里发布。我使用了以下博客并修改了代码来完成我的工作,你可以在这里找到帖子
self.frame = None
self.frame_array = []
def videoLoop(self):
try:
# start the thread that get GUI image and draw skeleton
time.sleep(0.5)
self.sending_command_thread.start()
while not self.stopEvent.is_set():
system = platform.system()
# read the frame for GUI show
self.frame = self.drone.read()
self.frame_array.append(self.frame)
if self.frame is None or self.frame.size == 0:
continue
# transfer the format from frame to image
image = Image.fromarray(self.frame)
if system == "Windows" or system == "Linux":
self.refreshUI(image)
else:
thread_tmp = threading.Thread(target=self.refreshUI, args=(image,))
thread_tmp.start()
time.sleep(0.03)
except RuntimeError as e:
print("[INFO] caught a RuntimeError")
def convert_frames_to_video(self, pathOut, fps):
size = (self.videoWidth, self.videoHeight)
out = cv2.VideoWriter(pathOut, cv2.VideoWriter_fourcc(*'DIVX'), fps, size)
for i in range(len(self.frame_array)):
# writing to a image array
out.write(self.frame_array[i])
out.release()
def onClose(self):
print("[INFO] closing...")
self.convert_frames_to_video('video.avi', 25.0)
self.stopEvent.set()
del self.drone
self.root.quit()
推荐阅读
- python - 如何获取具有一些标签的画布项目?
- python - 如何使用 Pushbutton 刷新 ProxyModel 和 tableview?
- elasticsearch - Kubernetes 在 kubernetes 集群中运行 packetbeat 时出现 CrashLoopBackOff 错误
- c# - 表格,基本计算器按键触发问题
- flutter - 我无法解决错误''类型''不是类型''''的子类型
- python - 如何合并 itertools 生成的数据框和熊猫中的普通数据框?
- elasticsearch - Logstash:配置聚合+经过的过滤器
- kubernetes - 如何查看hostPath中的文件
- python - 更改 window.columnconfigure() 后 Tkinter 窗口未更新
- flutter - 列表和列表有什么区别
> 和列表 >