python - Kivy 应用程序的多线程问题
问题描述
我正在尝试让应用程序 UI 继续运行,即使在异步 BLE 扫描功能运行时也是如此。我尝试创建另一个线程,但是当我使用激活按钮时 UI 仍然冻结。有问题的屏幕是 Screen4,其余的都不重要。我已经尝试了几种不同的方法来让它工作,但是我在实现一些示例时遇到了麻烦,它要么产生错误,要么仍然导致 UI 冻结几秒钟。这是代码:
import sys
import json
import googlemaps
import pygatt
import time
import asyncio
from bleak import discover
from urllib.request import urlopen
from twilio.rest import Client
from threading import Thread
#UI import libraries
from kivy.app import App
from kivy.properties import ObjectProperty
from kivy.properties import StringProperty
from kivy.uix.widget import Widget
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.textinput import TextInput
from kivy.uix.button import Button
from kivy.uix.screenmanager import ScreenManager, Screen
from kivy.uix.anchorlayout import AnchorLayout
from kivy.uix.label import Label
from kivy.uix.image import Image
from kivy.event import EventDispatcher
from kivy.clock import Clock
accSID = '**********'
authtkn = '*********'
twilcli = Client(accSID, authtkn)
sendNum = '********'
key = '*********'
url = 'http://ipinfo.io/json'
i = 0
sm = ScreenManager()
class Screen1(Screen):
def pair_pressed(self):
def BLEdetect (self):
async def run():
devices = await discover()
for d in devices:
print(d)
k = 0
if (str(d).find('DSD TECH') != -1):
global address
address = str(d)[0:18]
print(address)
sm.current = 's2'
k = 1
break
if (k != 1):
k = 0
sm.current = 's3'
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
Clock.schedule_once(BLEdetect, 0.5)
class Screen2(Screen):
def submit_pressed(self):
contact1_inp = ObjectProperty(None)
contact1 = StringProperty('')
self.contact1 = self.contact1_inp.text
phone1_inp = ObjectProperty(None)
phone1 = StringProperty('')
self.phone1 = self.phone1_inp.text
#----------------------------------------------------------------
contact2_inp = ObjectProperty(None)
contact2 = StringProperty('')
self.contact2 = self.contact2_inp.text
phone2_inp = ObjectProperty(None)
phone2 = StringProperty('')
self.phone2 = self.phone2_inp.text
#---------------------------------------------------------------
contact3_inp = ObjectProperty(None)
contact3 = StringProperty('')
self.contact3 = self.contact3_inp.text
phone3_inp = ObjectProperty(None)
phone3 = StringProperty('')
self.phone3 = self.phone3_inp.text
#--------------------------------------------------------------
contact4_inp = ObjectProperty(None)
contact4 = StringProperty('')
self.contact4 = self.contact4_inp.text
phone4_inp = ObjectProperty(None)
phone4 = StringProperty('')
self.phone4 = self.phone4_inp.text
#-------------------------------------------------------------
global contactlst
contactlst = [[self.contact1, self.phone1], [self.contact2, self.phone2],
[self.contact3, self.phone3],[self.contact4, self.phone4]]
#if contaclst is None:
j = 0
while (j < 4):
print("Contact " + str(j + 1) + ": " + contactlst[j][0])
print("Phone " + str(j + 1) + ": " + contactlst[j][1])
j += 1
sm.current = 's4'
class Screen3(Screen):
def tryagain_pressed(self):
sm.current = 's1'
def cancel_pressed(self):
exit()
class Screen4(Screen):
stop = threading.Event()
global BLEscan, i
def BLEscan(self):
while (i == 0):
async def run():
devices = await discover()
for d in devices:
print(d)
k = 0
if (str(d).find(address) != -1):
response = urlopen(url)
data = json.load(response)
city = data['city']
loc = data['loc']
country = data['country']
region = data['region']
gmapurl = 'http://maps.googleapis.com/maps/api/staticmap?center='
gmapurl = gmapurl + loc + '&size=600x600&zoom=14&sensor=true&markers=color:red%7Clabel:A%7C' + loc + '&key=' + key
def waste_time(dt):
pass
while ((k < 4) and (contactlst[k][0] != '')):
mes = 'Please Send Help ' + contactlst[k][0]
info = '{4} \nRegion : {1} \nCountry : {2} \nCity : {3} \nLoc: {0}'.format(loc,region,country,city,mes)
message = twilcli.messages.create(body=info, from_=sendNum, to=contactlst[k][1], media_url= gmapurl)
Clock.schedule_once(waste_time, 3)
k += 1
i = 1
break
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
def second_thread(self):
Clock.schedule_interval(BLEscan, 5)
def activate_pressed (self):
i = 0
threading.Thread(target=self.second_thread).start()
def deactivate_pressed(self):
i = 1
class esosApp(App):
def on_stop(self):
self.root.stop.set()
def build(self):
screenone = Screen1(name='s1')
screentwo = Screen2(name='s2')
screenthree = Screen3(name='s3')
screenfour = Screen4(name='s4')
sm.add_widget(screenone)
sm.add_widget(screentwo)
sm.add_widget(screenthree)
sm.add_widget(screenfour)
return sm
if __name__ == '__main__':
esosApp().run()```
解决方案
推荐阅读
- c# - 如何在C#中打开没有窗口的bat文件
- heroku - 为什么我的免费测功时间没有减少?(包括图片)
- c - 如何使用 malloc 在 C 中正确创建矩阵?
- flutter - 通过 VSCode 运行 Flutter 项目时找不到名为“web-server-debug-protocol”的选项
- javascript - 粘性部分滚动效果 HTML
- reactjs - 试图在反应 redux 中包装调度功能
- r - 使用特定语言的 rtweet 抓取推文会导致空数据框
- drake - 如何在多体工厂中使用 MathematicalProgram 中的决策变量 (TypeError)
- macos - LaTeX Macbook Package minted 错误:您必须使用 -shell-escape 标志调用 LaTeX。\usemintedstyle
- c - 我可以在命令中获得与 EUID 和 GID 不同的 UID 与 EGID 吗?