首页 > 解决方案 > multiprocessing Python program on Raspberry PI - async function is not called

问题描述

In my current project, I am trying to stop a video stream (the program is running on Raspberry PI), when I send a stop signal ( the program is running on my computer).

To achieve this objective, I have written a multiprocessing python code, which is running two processes.

I have used commandFlag as a Flag. It is initialized with 0. Whenever, thread 2 receives the stop signal, the commandFlag is turned to 1.

In the current program, def on_message(client, userdata, msg): is not called. Thus, it is not stopping the video stream ( because commandFlag flag is not set to 1)

Moreover, it does not releases the attached camera resource.

Could you please suggest any other methods to resolve this issue? Do I need to make any other changes in the program to work it correctly?

  import sys
import cPickle
import datetime
import cv2
import boto3
import time
import cPickle
from multiprocessing import Pool
import pytz
import threading
import multiprocessing

#Subscriber
import paho.mqtt.client as paho
import os
import socket
import ssl

commandFlag = 0



# The unique hostname that &IoT; generated for this device.
awshost = "AAAAAA-ats.iot.us-east-1.amazonaws.com"
awsport = 8883

# A programmatic shadow handler name prefix.
clientId = "IoTDevice"
thingName = "IoTDevice"

# The relative path to the correct root CA file for &IoT;, which you have already saved onto this device.
caPath = "AmazonRootCA1.pem"

# The relative path to your certificate file that 
# &IoT; generated for this device, which you 
# have already saved onto this device.
certPath = "AAAAA-certificate.pem.crt"


# The relative path to your private key file that 
# &IoT; generated for this device, which you 
# have already saved onto this device.
keyPath = "AAAAAA-private.pem.key"


kinesis_client = boto3.client("kinesis", region_name='us-east-1')
rekog_client = boto3.client("rekognition",region_name='us-east-1')

camera_index = 0 # 0 is usually the built-in webcam
capture_rate = 30 # Frame capture rate.. every X frames. Positive integer.
rekog_max_labels = 123
rekog_min_conf = 50.0

def on_connect(client, userdata, flags, rc):
    global commandFlag
    print("I am ready to receive control message...." )
    client.subscribe("#" , 1 )

def on_message(client, userdata, msg):
    global commandFlag
    print("Received Switch Off message from AWS IoT....")
    commandFlag = 1


def subscriber():
    global commandFlag
    mqttc = paho.Client()
    mqttc.on_connect = on_connect
    mqttc.on_message = on_message
    mqttc.tls_set(caPath, certfile=certPath, keyfile=keyPath, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
    mqttc.connect(awshost, awsport, keepalive=60)
    mqttc.loop_forever()

#Send frame to Kinesis stream
def encode_and_send_frame(frame, frame_count, enable_kinesis=True, enable_rekog=False, write_file=False):
    try:
        #convert opencv Mat to jpg image
        #print "----FRAME---"
        retval, buff = cv2.imencode(".jpg", frame)

        img_bytes = bytearray(buff)

        utc_dt = pytz.utc.localize(datetime.datetime.now())
        now_ts_utc = (utc_dt - datetime.datetime(1970, 1, 1, tzinfo=pytz.utc)).total_seconds()

        frame_package = {
            'ApproximateCaptureTime' : now_ts_utc,
        'FrameCount' : frame_count,
            'ImageBytes' : img_bytes
        }

        if write_file:
            print("Writing file img_{}.jpg".format(frame_count))
            target = open("img_{}.jpg".format(frame_count), 'w')
            target.write(img_bytes)
            target.close()

        #put encoded image in kinesis stream
        if enable_kinesis:
            print "Sending image to Kinesis"
            response = kinesis_client.put_record(
                StreamName="FrameStream",
                Data=cPickle.dumps(frame_package),
                PartitionKey="partitionkey"
            )
            print response

        if enable_rekog:
            response = rekog_client.detect_labels(
                Image={
                    'Bytes': img_bytes
                },
                MaxLabels=rekog_max_labels,
                MinConfidence=rekog_min_conf
            )
            print response

    except Exception as e:
        print e


def main():
    global commandFlag
    #capture_rate   
    argv_len = len(sys.argv)

    if argv_len > 1 and sys.argv[1].isdigit():
        capture_rate = int(sys.argv[1])

    cap = cv2.VideoCapture(0) #Use 0 for built-in camera. Use 1, 2, etc. for attached cameras.
    pool = Pool(processes=3)

    frame_count = 0
    while True:
        # Capture frame-by-frame
        ret, frame = cap.read()
        #cv2.resize(frame, (640, 360));

        if ret is False:
            break

        if frame_count % 30 == 0:
        result = pool.apply_async(encode_and_send_frame, (frame, frame_count, True, False, False,))

        frame_count += 1

        # Display the resulting frame
        cv2.imshow('frame', frame)

        #if cv2.waitKey(1) & 0xFF == ord('q'):
        if commandFlag == 1:
          break;

    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()
    return


if __name__ == '__main__':

    t1 = multiprocessing.Process(target=main)
    t1.start()
    t2 = multiprocessing.Process(target=subscriber)
    t2.start()

    while True:
        if commandFlag == 1:
           t1.terminate()
           t2.terminate()
           sys.exit(1)

标签: pythonmultithreadingopencvraspberry-piaws-iot

解决方案


推荐阅读