python-3.x - asyncio 似乎在 HTTP 请求完成之前完成
问题描述
我正在尝试使用对 Web 服务(由 GET 查询字符串格式化)运行许多并行调用,asyncio
并且所有调用都立即返回值 0。Web 服务是一个物理模拟,它返回一个整数值以及它的执行情况。但是,我希望该服务运行大约 2 分钟,然后返回该值,但是从asyncio
显示立即 0 值中打印出这些值。
这段代码是遗传算法(运行 DEAP)的一部分,所以我想做的是有一个运行的外部(世代)循环,每个个体(构造的 URL)并行运行并执行评估。我不确定这是否有任何影响,但它是谷歌云功能。最大执行时间设置在预期评估时间的范围内,最大内存也是有效的。
这是我的asyncio
函数,如果响应返回 OK,我希望看到一些有效的整数值,或者如果生成错误,则返回 -999:
# AsyncIO functions for interacting with Google Cloud Functions
async def fetchCF(indv: str, session: ClientSession, **kwargs) -> str:
resp = await session.request(method="GET", url=indv, **kwargs)
resp.raise_for_status()
html = await resp.text()
return html
# Dispatch the CFs and return fitness
async def callCF(indv: str, session: ClientSession, **kwargs) -> int:#float:
try:
html = await fetchCF(indv=indv, session=session, **kwargs)
except (
aiohttp.ClientError,
aiohttp.http_exceptions.HttpProcessingError,
) as e:
print(indv,e)
return -9999,
else:
return int(html),
# Called via GA
async def evalAsync(pop: set, **kwargs) -> None:
async with ClientSession(read_timeout=None) as session:
tasks = []
for p in pop:
#print(p)
indv_url = '%s?seed=%d&x0=%d&y0=%d&x1=%d&y1=%d&x2=%d&y2=%d&x3=%d&y3=%d&emitX=%d&emitY=%d' % \
(url,args.seed,int(p[0]),int(p[1]),int(p[2]),int(p[3]),int(p[4]),int(p[5]),int(p[6]),int(p[7]),int(p[8]),int(p[9]))
tasks.append(
callCF(indv=indv_url,session=session,**kwargs)
)
return await asyncio.gather(*tasks)
以下是我在世代循环中如何称呼它们:
for g in generations:
...
fitnesses = asyncio.run(evalAsync(population))
作为参考,该代码在本地物理模拟中运行良好,我将调用替换为对本地物理驱动程序asyncio.run
的调用。pool.map
解决方案
看起来您显示的代码工作正常。
为了验证,我只使用了您的代码的最小修改变体,其中包含一些固定的 URL,这些 URL 只返回常量,并且有一些延迟。然后它按预期工作。
因此,我假设 Web 服务可能无法为您的 URL 提供您期望的结果。例如,您可以使用带有所有参数的 URL 之一进行验证,并从浏览器或 wget 命令行程序调用它。也许它没有返回预期的数字?
对于独立测试用例,调用了三个不同的 URL,它们返回数字 40、41 和 42。在服务器端,有 1-3 秒的延迟。
只做了一些小的改动:
- 检查
int(html)
语句是否有 `ValueError,例如,如果查询不返回 int - read_timeout 已弃用,因此
aiohttp.ClientTimeout(total=5 * 60)
改为使用 - callCF 被声明为返回一个 int,因此删除逗号以不返回元组
独立测试用例
import asyncio
import aiohttp
from aiohttp import ClientSession
async def fetchCF(indv: str, session: ClientSession, **kwargs) -> str:
resp = await session.request(method="GET", url=indv, **kwargs)
resp.raise_for_status()
html = await resp.text()
return html
# Dispatch the CFs and return fitness
async def callCF(indv: str, session: ClientSession, **kwargs) -> int:
try:
html = await fetchCF(indv=indv, session=session, **kwargs)
result = int(html)
except (
aiohttp.ClientError,
ValueError
) as e:
print(indv, e)
return -9999
else:
return result
# Called via GA
async def evalAsync(pop: set, **kwargs) -> None:
timeout = aiohttp.ClientTimeout(total=5 * 60)
async with ClientSession(timeout=timeout) as session:
tasks = []
for indv_url in pop:
tasks.append(
callCF(indv=indv_url, session=session, **kwargs)
)
return await asyncio.gather(*tasks)
if __name__ == "__main__":
population = {"https://www.software7.biz/tst/number.php",
"https://www.software7.biz/tst/number1.php",
"https://www.software7.biz/tst/number2.php"}
fitnesses = asyncio.run(evalAsync(pop=population))
for fit in fitnesses:
print(fit)
结果
谷歌云功能
由于它适用于传统的 Web 服务器,我们现在可以尝试使用 Google Cloud Functions 进行简单测试。上面的代码无需身份验证即可工作。但是添加一个简单的基本身份验证当然没有什么坏处。然后,服务器端 Google Cloud Function 将如下所示:
import time
from flask import Flask, request
from flask_httpauth import HTTPBasicAuth
auth = HTTPBasicAuth()
users = {
"someUser": "someSecret",
}
@auth.get_password
def get_pw(username):
if username in users:
return users.get(username)
return None
app = Flask(__name__)
@app.route('/', methods=['GET'])
@auth.login_required
def compute42(request):
op1 = request.args.get('op1')
op2 = request.args.get('op2')
time.sleep(25)
return str(int(op1) + int(op2))
它需要两个操作数,休眠 25 秒,然后返回它们的总和。
Python程序的适配
对于此 Cloud Function,调用 Python 程序必须稍作修改:
async def evalAsync(pop: set, **kwargs) -> None:
timeout = aiohttp.ClientTimeout(total=5 * 60)
auth = aiohttp.BasicAuth(login='someUser', password='someSecret')
async with ClientSession(timeout=timeout, auth=auth) as session:
tasks = []
for indv_url in pop:
tasks.append(
callCF(indv=indv_url, session=session, **kwargs)
)
return await asyncio.gather(*tasks)
if __name__ == "__main__":
population = {"https://someserver.cloudfunctions.net/computation42?op1=17&op2=4",
"https://someserver.cloudfunctions.net/computation42?op1=11&op2=4700",
"https://someserver.cloudfunctions.net/computation42?op1=40&op2=2"}
fitnesses = asyncio.run(evalAsync(pop=population))
for fit in fitnesses:
print(fit)
基本上只有aiohttp.BasicAuth
一些 GET 参数被添加到 URL。
然后 Python 程序到控制台的输出是:
21
4711
42
推荐阅读
- python - 使用 gensim 从 fasttext 库中有效加载预训练词嵌入的内存
- python - 如何在函数中使用for循环将列表作为参数传递?
- ruby-on-rails - Rails Webpacker:停止记录“无事可做”
- firefox - 编程和编译:Chromium vs Firefox
- c# - 在 XML 中查找特定父标签的子标签
- amazon-web-services - 无法给 nlb 静态 ip
- tfs - WinRM - IIS Web 应用管理 Azure Pipelines 错误
- python - 可视化 tkinter 中两个字符串的差异
- pdf - 使用 ngf-thumbnail 将 PDF 页面放入容器中
- stata - 如何导出仅包含具有 >N 个观察值的单元格的表格?