python - multiprocessing Python program on Raspberry PI - async function is not called
问题描述
In my current project, I am trying to stop a video stream (the program is running on Raspberry PI), when I send a stop signal ( the program is running on my computer).
To achieve this objective, I have written a multiprocessing python code, which is running two processes.
Process 1 is reading data from a camera attached with the raspberry PI and send it to amazon Kinesis. It is using
OpenCV
library for raspberry PI to send images to Kinesis.Process 2 is a subscriber (through AWS IoT), which waits for the stop signal from AWS IoT. When it receives the stop signal, it stops the streaming.
I have used commandFlag
as a Flag. It is initialized with 0
. Whenever, thread 2 receives the stop signal, the commandFlag
is turned to 1
.
In the current program, def on_message(client, userdata, msg):
is not called. Thus, it is not stopping the video stream ( because commandFlag
flag is not set to 1
)
Moreover, it does not releases the attached camera resource.
Could you please suggest any other methods to resolve this issue? Do I need to make any other changes in the program to work it correctly?
import sys
import cPickle
import datetime
import cv2
import boto3
import time
import cPickle
from multiprocessing import Pool
import pytz
import threading
import multiprocessing
#Subscriber
import paho.mqtt.client as paho
import os
import socket
import ssl
commandFlag = 0
# The unique hostname that &IoT; generated for this device.
awshost = "AAAAAA-ats.iot.us-east-1.amazonaws.com"
awsport = 8883
# A programmatic shadow handler name prefix.
clientId = "IoTDevice"
thingName = "IoTDevice"
# The relative path to the correct root CA file for &IoT;, which you have already saved onto this device.
caPath = "AmazonRootCA1.pem"
# The relative path to your certificate file that
# &IoT; generated for this device, which you
# have already saved onto this device.
certPath = "AAAAA-certificate.pem.crt"
# The relative path to your private key file that
# &IoT; generated for this device, which you
# have already saved onto this device.
keyPath = "AAAAAA-private.pem.key"
kinesis_client = boto3.client("kinesis", region_name='us-east-1')
rekog_client = boto3.client("rekognition",region_name='us-east-1')
camera_index = 0 # 0 is usually the built-in webcam
capture_rate = 30 # Frame capture rate.. every X frames. Positive integer.
rekog_max_labels = 123
rekog_min_conf = 50.0
def on_connect(client, userdata, flags, rc):
global commandFlag
print("I am ready to receive control message...." )
client.subscribe("#" , 1 )
def on_message(client, userdata, msg):
global commandFlag
print("Received Switch Off message from AWS IoT....")
commandFlag = 1
def subscriber():
global commandFlag
mqttc = paho.Client()
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.tls_set(caPath, certfile=certPath, keyfile=keyPath, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
mqttc.connect(awshost, awsport, keepalive=60)
mqttc.loop_forever()
#Send frame to Kinesis stream
def encode_and_send_frame(frame, frame_count, enable_kinesis=True, enable_rekog=False, write_file=False):
try:
#convert opencv Mat to jpg image
#print "----FRAME---"
retval, buff = cv2.imencode(".jpg", frame)
img_bytes = bytearray(buff)
utc_dt = pytz.utc.localize(datetime.datetime.now())
now_ts_utc = (utc_dt - datetime.datetime(1970, 1, 1, tzinfo=pytz.utc)).total_seconds()
frame_package = {
'ApproximateCaptureTime' : now_ts_utc,
'FrameCount' : frame_count,
'ImageBytes' : img_bytes
}
if write_file:
print("Writing file img_{}.jpg".format(frame_count))
target = open("img_{}.jpg".format(frame_count), 'w')
target.write(img_bytes)
target.close()
#put encoded image in kinesis stream
if enable_kinesis:
print "Sending image to Kinesis"
response = kinesis_client.put_record(
StreamName="FrameStream",
Data=cPickle.dumps(frame_package),
PartitionKey="partitionkey"
)
print response
if enable_rekog:
response = rekog_client.detect_labels(
Image={
'Bytes': img_bytes
},
MaxLabels=rekog_max_labels,
MinConfidence=rekog_min_conf
)
print response
except Exception as e:
print e
def main():
global commandFlag
#capture_rate
argv_len = len(sys.argv)
if argv_len > 1 and sys.argv[1].isdigit():
capture_rate = int(sys.argv[1])
cap = cv2.VideoCapture(0) #Use 0 for built-in camera. Use 1, 2, etc. for attached cameras.
pool = Pool(processes=3)
frame_count = 0
while True:
# Capture frame-by-frame
ret, frame = cap.read()
#cv2.resize(frame, (640, 360));
if ret is False:
break
if frame_count % 30 == 0:
result = pool.apply_async(encode_and_send_frame, (frame, frame_count, True, False, False,))
frame_count += 1
# Display the resulting frame
cv2.imshow('frame', frame)
#if cv2.waitKey(1) & 0xFF == ord('q'):
if commandFlag == 1:
break;
# When everything done, release the capture
cap.release()
cv2.destroyAllWindows()
return
if __name__ == '__main__':
t1 = multiprocessing.Process(target=main)
t1.start()
t2 = multiprocessing.Process(target=subscriber)
t2.start()
while True:
if commandFlag == 1:
t1.terminate()
t2.terminate()
sys.exit(1)
解决方案
推荐阅读
- r - 使用 data.table `:=` 进行动态分配
- python - NameError - 如何修复?
- reactjs - 无法将 google 的 Material Web 组件 SASS 导入 React 项目
- sql - SQL 中引用表的给定键没有唯一约束匹配
- sql-server - 如何使用参数化 SQL 插入地理列
- debugging - 如何调试 App Script 中的 OnEdit 代码?
- ionic-framework - 带有 .p8 键的 iOS 推送通知在 Firebase 中不起作用
- c++ - 为什么在将 libcurl 与 CURLOPT_PROXY 一起使用时会出现 CURLE_NOT_BUILT_IN?
- javascript - 单击元素后Javascript不会隐藏文本
- c# - C#解析泛型类型参数