python - 是否可以将流保存在变量中?
问题描述
我正在尝试使用 Python 保存 10 秒的缓冲视频,特别是 '.h264' 格式。
为此,我一直在使用连接到 Raspberry Pi 的 PiCamera 和下面显示的脚本。我现在面临的主要障碍是,我不想将文件直接保存到一个位置 [stream.copy_to(str(time)+'.h264')] 我想将它保存到一个变量中以执行某些在最终保存之前进行操作(例如更改视频分辨率)。知道如何实现吗?
提前致谢!
import time
import io
import os
import picamera
import datetime as dt
from PIL import Image
import cv2
#obtain current time
def return_currentTime():
return dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
#trigger event declaration
def motion_detected():
while True:
print ("Trigger event(y)?")
trigger = input ()
if trigger =="y":
time = return_currentTime()
print ("Buffering...")
stream.copy_to(str(time)+'.h264')
else:
camera.stop_recording()
break
#countdown timer
def countdown (t):
while t:
mins, secs = divmod (t,60)
timer = '{:02d}:{:02d}'.format(mins, secs)
print(timer, end="\r")
time.sleep(1)
t-=1
print('Buffer available!')
camera = picamera.PiCamera()
camera.resolution = (640, 480)
stream = picamera.PiCameraCircularIO(camera, seconds = 5)
#code will work using h264 as format
camera.start_recording (stream, format = 'h264')
countdown(5)
motion_detected()
解决方案
我没有 Raspberry Pi,但我知道如何做到这一点。
-
- 我使用
VideoStream
了它也支持PiCamera
,所以你可以使用下面的代码。
-
stream = VideoStream(usePiCamera=False, resolution=(640, 480), framerate=32).start() # Wait for a two-second for warming the webcam. time.sleep(2.0)
- 我使用
-
- 开始获取帧
-
while True: frame = stream.read() countdown(5) motion_detected()
-
修改
motion_detected()
以保存帧。while True: frame = stream.read() countdown(5) motion_detected(frame)
-
- 现在我们需要使用
array
或来存储帧dictionary
。
字典比数组快。(来源)
我们需要在项目文件之上初始化一个全局字典。
import time import datetime as dt from imutils.video import VideoStream dictionary = {} count = 0
我们需要修改
motion_detected
方法,我们先初始化入参# trigger event declaration def motion_detected(input_frame):
二、我们在里面定义全局变量
motion_detected
# trigger event declaration def motion_detected(input_frame): global dictionary global count
不幸的是, VideoStream 对象没有
copy_to
属性,因此我必须直接将帧分配给字典:def motion_detected(input_frame): global dictionary global count while True: print("Trigger event(y)?") trigger = input() if trigger == "y": current_time = return_current_time() print("Buffering...") # stream.copy_to(str(current_time) + '.h264') dictionary[count] = input_frame count += 1 if count == 10: print("\n10 frames are stored\n") else: stream.stop() break
- 现在我们需要使用
-
- 现在我们可以执行某些操作,例如检测边缘。
-
while True: frame = stream.read() countdown(5) motion_detected(frame) for stored_frame in dictionary.values(): result = cv2.Canny(image=stored_frame, threshold1=50, threshold2=100)
-
- 输出:
-
- 保存帧
要保存帧,您需要枚举存储的帧。
-
for count, stored_frame in enumerate(dictionary.values()):
-
然后,应用您的操作:
-
for count, stored_frame in enumerate(dictionary.values()): result = cv2.Canny(image=stored_frame, threshold1=50, threshold2=100)
-
将其保存到文件夹中。
-
for count, stored_frame in enumerate(dictionary.values()): result = cv2.Canny(image=stored_frame, threshold1=50, threshold2=100) cv2.imwrite("output/frame_{}.png".format(count), result)
-
如果你想循环多次,上面的代码是行不通的。在这种情况下,您需要在循环之上初始化
while
循环。-
counter = 0 while True: frame = stream.read() countdown(5) motion_detected(frame) for stored_frame in dictionary.values(): result = cv2.Canny(image=stored_frame, threshold1=50, threshold2=100) cv2.imwrite("output/frame_{}.png".format(counter), result) counter += 1
-
代码:
import cv2
import time
import datetime as dt
from imutils.video import VideoStream
dictionary = {}
count = 0
# obtain current time
def return_current_time():
return dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# trigger event declaration
def motion_detected(input_frame):
global dictionary
global count
while True:
print("Trigger event(y)?")
trigger = input()
if trigger == "y":
current_time = return_current_time()
print("Buffering...")
# stream.copy_to(str(current_time) + '.h264')
dictionary[count] = input_frame
count += 1
if count == 10:
print("\n10 frames are stored\n")
else:
stream.stop()
break
# countdown timer
def countdown(t):
while t:
mins, secs = divmod(t, 60)
timer = '{:02d}:{:02d}'.format(mins, secs)
print(timer, end="\r")
time.sleep(1)
t -= 1
print('Buffer available!')
stream = VideoStream(usePiCamera=False,
resolution=(640, 480),
framerate=32).start()
time.sleep(2.0)
counter = 0
while True:
frame = stream.read()
countdown(5)
motion_detected(frame)
for stored_frame in dictionary.values():
result = cv2.Canny(image=stored_frame,
threshold1=50,
threshold2=100)
cv2.imwrite("output/frame_{}.png".format(counter), result)
counter += 1
推荐阅读
- javascript - 访问Angular 2模板之外的DOM元素的“正确”方式?
- git - 如何撤消 git 命令以更改差异检查器
- reactjs - 在流星中启动反应包时出现问题
- java - 如何通过 Java SDK 正确启动使用 Spark 的 EMR 集群(command-runner.jar 与直接引用 JAR 路径)
- regex - 我如何找到一个角色并向前和向后看?
- node.js - 在 vscode 扩展中执行一个电子应用程序
- php - 在 php 中缓存包含的远程文件
- java - 如何在android中获取通话记录项目的sim卡名称?
- php - 我们如何自定义 Wordpress 编码标准并在 VSCode 中使用它们?
- apache-spark - 如何在不加载整个文件的情况下读取前 n 行?