python - 奇怪的python异步行为
问题描述
上下文:我想查询内存中的 dict 结构并使用相同的查询函数来触发一些后处理任务(例如获取附加到 dict 的扩展信息,因此名称为exinfo),因为我不想让那些在“关键处理路径”。Asyncio 表现得很奇怪,可能是因为我在这里做错了一些事情。task_resolve_names和task_save没有被执行,只有当我在运行 run_forever 线程之前取消注释“hello world”测试任务时。我很感激帮助。
exinfo.py
import os, sys
from os import path
import pickle
import asyncio
import threading
import sys
print (sys.version)
print (sys.version_info)
exinfopath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
extpkl = os.path.join(exinfopath, "exinfo.pkl")
exinfo = None
class ExInfo:
instance = None
def __new__(cls, exinfopath, extpkl):
if not ExInfo.instance:
ExInfo.instance = ExInfo.__ExInfo(exinfopath, extpkl)
return ExInfo.instance
class __ExInfo:
client = None
ext = None
count = 0
lock = asyncio.Lock()
loop = None
extpkl = None
def __init__(self, exinfopath, extpkl): #arg TBD
self.loop = asyncio.get_event_loop()
#self.loop.create_task(self.hello()) <--- weird behavior, UNCOMMENT this to mk stuff happen
threading.Thread(target=self.thread_main_loop,
args=[self.lock, self.loop],
daemon=True).start()
def query(self, host, dnsquery): #host can be ipaddr or not
r = {}
r['hits'] = 1
r['query'] = dnsquery
print("query")
if 'post_process1' not in r.keys():
print("self.loop.create_task")
self.loop.create_task(self.task_resolve_names(r, self.lock))
if self.count % 5 == 0: #save pickle every 5 executions
print("self.loop.create_task")
self.loop.create_task(self.task_save(r, self.lock))
return r
def thread_main_loop(self, lock, loop):
print("thread_main_loop / loop.run_forever()")
loop.run_forever()
loop.close()
async def hello(self):
print("WORLD")
async def task_resolve_names(self, ext, lock):
print("task_resolve_names")
async with lock:
ext['post_process1'] = 'OK'
async def task_save(self, ext, lock):
print("task_save")
async with lock:
ext['post_process2'] = 'OK'
#with open(self.extpkl, 'wb') as f:
# pickle.dump(self.ext, f, protocol=pickle.HIGHEST_PROTOCOL)
def init():
global exinfo
exinfo = ExInfo(exinfopath, extpkl)
def test_query(info, query):
global exinfo
print("{}".format(exinfo.query(info, query)))
test_exinfo.py
#!/usr/bin/env python3
import os
import exinfo
import time
exinfopath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
extpkl = os.path.join(exinfopath, "exinfo.pkl")
exinfo.init()
exinfo.test_query("someinfo", "someinfo")
time.sleep(10)
实际结果:
3.7.6 (default, Jan 8 2020, 13:42:34)
[Clang 4.0.1 (tags/RELEASE_401/final)]
sys.version_info(major=3, minor=7, micro=6, releaselevel='final', serial=0)
thread_main_loop / loop.run_forever()
query
self.loop.create_task
self.loop.create_task
{'hits': 1, 'query': ''}
预期结果:
3.7.6 (default, Jan 8 2020, 13:42:34)
[Clang 4.0.1 (tags/RELEASE_401/final)]
sys.version_info(major=3, minor=7, micro=6, releaselevel='final', serial=0)
thread_main_loop / loop.run_forever()
query
self.loop.create_task
self.loop.create_task
{'hits': 1, 'query': ''}
task_resolve_names
task_save
解决方案
该query
方法用于loop.create_task
在没有适当同步的情况下将任务添加到在不同线程中运行的事件循环。要解决此问题,请asyncio.run_coroutine_threadsafe
改用。
推荐阅读
- powershell - 如何选择“”之间的所有内容并将其保存为变量?
- html - Html:将标题设置为中心
- elasticsearch - 如何使用 Elasticsearch 按数组检索类型数组字段的数组并检索是否至少有一个匹配
- ruby-on-rails - 一个活动记录表中的层次结构?
- spring-boot - Spring Boot 应用程序通过 JOSS Swift Client 使用 Ceph Swift API
- cakephp - cakephp 3 插件本地化
- r - 如何更改函数中的变量。(右)
- java - 什么是柔软可触及的物体?
- c# - 文件“/ajax/namespace.class.GetSessionValue.ashx”不存在
- android - Android 'Firebase' 成员使用名称关闭节点进入组,所有者收到消息以允许