首页 > 解决方案 > 使用 Python 进行线程化

问题描述

完整的python新手......我正在使用Arduino pyfirmata包,我正在尝试做一些非常简单的事情。

根据用户对 python 的输入,我希望 LED 是否闪烁。

我的问题是 python 程序只要求用户输入一次,但我希望它总是要求输入,以便用户可以随时更改功能。

我曾尝试使用 threading 包但没有成功......也许有一种更简单的方法,但我对编码完全陌生,所以我不知道其他任何方法。开放的建议!

这是我的代码,

import pyfirmata
import threading
import time

board = pyfirmata.Arduino('/dev/cu.usbmodem14101')

def flash():
    for i in range(1000):
        board.digital[13].write(1)
        time.sleep(1)
        board.digital[13].write(0)
        time.sleep(1)

def stop():
    board.digital[13].write(0)


while True:
    runMode = input("Run or Stop? ")

    if runMode == "Run":
        x = threading.Thread(target=flash(), args=(1,))
        x.start()
        # x.join()

    elif runMode == "Stop":
        x = threading.Thread(target=stop(), args=(1,))
        x.start()
        #x.join()

标签: pythonpython-3.xconcurrencypython-multithreading

解决方案


如果您只想杀死线程,您可以使用 multiprocessing.Process 可以 p.terminate()

p = Process(target=flash, args=(,))
while True:
    runMode = input("Run or Stop? ")
    if runMode == "Run":
        p.start()
    elif runMode == "Stop":
        p.terminate()

但是,不建议只杀死线程,因为如果进程正在处理关键资源或依赖于其他线程,这可能会导致错误,请参阅此处以获得更好的解释。有什么方法可以杀死线程吗?

这里描述的一个更好的选择是使用标志来处理你的闪烁,它们允许线程之间的简单通信

from threading import Event

e = event()

def check_for_stop(e):
    while not e.isSet():
         flash()
    print("Flashing Ended")

while True:
    runMode = input("Run or Stop? ")

    if runMode == "Run":
        x = threading.Thread(target=check_for_stop, args=(e,))
        x.start()
        # x.join()

    elif runMode == "Stop":
        e.set() #set flag true
        e.clear() #reset flag

这是有关事件对象的更多信息的文档https://docs.python.org/2.0/lib/event-objects.html

我还没有测试过这段代码只是一个例子,如果它不能立即工作,我们深表歉意

编辑:只需再次查看您的功能,您就想在闪烁期间检查标志,这是我的错误道歉,因此您的闪光灯功能看起来像

def flash():
    while e.isSet():
        board.digital[13].write(1)
        time.sleep(1)
        board.digital[13].write(0)
        time.sleep(1)

你会像以前一样将它传递到线程中

x = threading.Thread(target=flash(), args=(1,))
x.start()

推荐阅读