首页 > 解决方案 > Kivy 应用程序的多线程问题

问题描述

我正在尝试让应用程序 UI 继续运行,即使在异步 BLE 扫描功能运行时也是如此。我尝试创建另一个线程,但是当我使用激活按钮时 UI 仍然冻结。有问题的屏幕是 Screen4,其余的都不重要。我已经尝试了几种不同的方法来让它工作,但是我在实现一些示例时遇到了麻烦,它要么产生错误,要么仍然导致 UI 冻结几秒钟。这是代码:

import sys
import json
import googlemaps
import pygatt
import time
import asyncio
from bleak import discover
from urllib.request import urlopen
from twilio.rest import Client
from threading import Thread

#UI import libraries
from kivy.app import App
from  kivy.properties import ObjectProperty
from kivy.properties import StringProperty
from kivy.uix.widget import Widget
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.textinput import TextInput
from kivy.uix.button import Button
from kivy.uix.screenmanager import ScreenManager, Screen
from kivy.uix.anchorlayout import AnchorLayout
from kivy.uix.label import Label
from kivy.uix.image import Image
from kivy.event import EventDispatcher
from kivy.clock import Clock

accSID = '**********'
authtkn = '*********'
twilcli = Client(accSID, authtkn)
sendNum = '********'

key = '*********'
url = 'http://ipinfo.io/json'
i = 0


sm = ScreenManager()

class Screen1(Screen):
    def pair_pressed(self):

        def BLEdetect (self):
            async def run():
                devices = await discover()
                for d in devices:
                    print(d)
                    k = 0
                    if (str(d).find('DSD TECH') != -1):
                        global address
                        address = str(d)[0:18]
                        print(address)
                        sm.current = 's2'
                        k = 1
                        break
                    if (k != 1):
                        k = 0
                        sm.current = 's3'

            loop = asyncio.get_event_loop()
            loop.run_until_complete(run())

        Clock.schedule_once(BLEdetect, 0.5)

class Screen2(Screen):
    def submit_pressed(self):

        contact1_inp = ObjectProperty(None)
        contact1 = StringProperty('')
        self.contact1 = self.contact1_inp.text

        phone1_inp = ObjectProperty(None)
        phone1 = StringProperty('')
        self.phone1 = self.phone1_inp.text
    #----------------------------------------------------------------
        contact2_inp = ObjectProperty(None)
        contact2 = StringProperty('')
        self.contact2 = self.contact2_inp.text

        phone2_inp = ObjectProperty(None)
        phone2 = StringProperty('')
        self.phone2 = self.phone2_inp.text
    #---------------------------------------------------------------
        contact3_inp = ObjectProperty(None)
        contact3 = StringProperty('')
        self.contact3 = self.contact3_inp.text

        phone3_inp = ObjectProperty(None)
        phone3 = StringProperty('')
        self.phone3 = self.phone3_inp.text
    #--------------------------------------------------------------
        contact4_inp = ObjectProperty(None)
        contact4 = StringProperty('')
        self.contact4 = self.contact4_inp.text

        phone4_inp = ObjectProperty(None)
        phone4 = StringProperty('')
        self.phone4 = self.phone4_inp.text
    #-------------------------------------------------------------
        global contactlst
        contactlst = [[self.contact1, self.phone1], [self.contact2, self.phone2],
                      [self.contact3, self.phone3],[self.contact4, self.phone4]]

        #if contaclst is None:


        j = 0
        while (j < 4):
            print("Contact " + str(j + 1) + ": " + contactlst[j][0])
            print("Phone " + str(j + 1) + ": " + contactlst[j][1])
            j += 1

        sm.current = 's4' 

class Screen3(Screen):
    def tryagain_pressed(self):
        sm.current = 's1'

    def cancel_pressed(self):
        exit()

class Screen4(Screen):

    stop = threading.Event()

    global BLEscan, i

    def BLEscan(self):
        while (i == 0):
            async def run():
                devices = await discover()
                for d in devices:
                    print(d)
                    k = 0
                    if (str(d).find(address) != -1):
                        response = urlopen(url)
                        data = json.load(response)


                        city = data['city']
                        loc = data['loc']
                        country = data['country']
                        region = data['region']
                        gmapurl = 'http://maps.googleapis.com/maps/api/staticmap?center='
                        gmapurl = gmapurl + loc + '&size=600x600&zoom=14&sensor=true&markers=color:red%7Clabel:A%7C' + loc + '&key=' + key

                        def waste_time(dt):
                            pass

                        while ((k < 4) and (contactlst[k][0] != '')):
                            mes = 'Please Send Help ' + contactlst[k][0]
                            info = '{4} \nRegion : {1} \nCountry : {2} \nCity : {3} \nLoc: {0}'.format(loc,region,country,city,mes)
                            message = twilcli.messages.create(body=info, from_=sendNum, to=contactlst[k][1], media_url= gmapurl)

                            Clock.schedule_once(waste_time, 3)

                            k += 1
                        i = 1
                        break

            loop = asyncio.get_event_loop()
            loop.run_until_complete(run())

    def second_thread(self):
        Clock.schedule_interval(BLEscan, 5)

    def activate_pressed (self):
        i = 0
        threading.Thread(target=self.second_thread).start()

    def deactivate_pressed(self):
        i = 1




class esosApp(App):
    def on_stop(self):
        self.root.stop.set()

    def build(self):

        screenone = Screen1(name='s1')
        screentwo = Screen2(name='s2')
        screenthree = Screen3(name='s3')
        screenfour = Screen4(name='s4')

        sm.add_widget(screenone)
        sm.add_widget(screentwo)
        sm.add_widget(screenthree)
        sm.add_widget(screenfour)

        return sm    


if __name__ == '__main__':
    esosApp().run()```

标签: pythonmultithreadingasynchronouskivy

解决方案


推荐阅读