首页 > 解决方案 > asyncio 似乎在 HTTP 请求完成之前完成

问题描述

我正在尝试使用对 Web 服务(由 GET 查询字符串格式化)运行许多并行调用,asyncio并且所有调用都立即返回值 0。Web 服务是一个物理模拟,它返回一个整数值以及它的执行情况。但是,我希望该服务运行大约 2 分钟,然后返回该值,但是从asyncio显示立即 0 值中打印出这些值。

这段代码是遗传算法(运行 DEAP)的一部分,所以我想做的是有一个运行的外部(世代)循环,每个个体(构造的 URL)并行运行并执行评估。我不确定这是否有任何影响,但它是谷歌云功能。最大执行时间设置在预期评估时间的范围内,最大内存也是有效的。

这是我的asyncio函数,如果响应返回 OK,我希望看到一些有效的整数值,或者如果生成错误,则返回 -999:

# AsyncIO functions for interacting with Google Cloud Functions
async def fetchCF(indv: str, session: ClientSession, **kwargs) -> str:
  resp = await session.request(method="GET", url=indv, **kwargs)
  resp.raise_for_status()
  html = await resp.text()
  return html

# Dispatch the CFs and return fitness
async def callCF(indv: str, session: ClientSession, **kwargs) -> int:#float:
  try:
    html = await fetchCF(indv=indv, session=session, **kwargs)
  except (
    aiohttp.ClientError,
    aiohttp.http_exceptions.HttpProcessingError,
  ) as e:
    print(indv,e)
    return -9999,
  else:
    return int(html),

# Called via GA
async def evalAsync(pop: set, **kwargs) -> None:
  async with ClientSession(read_timeout=None) as session:
    tasks = []
    for p in pop:
      #print(p)
      indv_url = '%s?seed=%d&x0=%d&y0=%d&x1=%d&y1=%d&x2=%d&y2=%d&x3=%d&y3=%d&emitX=%d&emitY=%d' % \
                 (url,args.seed,int(p[0]),int(p[1]),int(p[2]),int(p[3]),int(p[4]),int(p[5]),int(p[6]),int(p[7]),int(p[8]),int(p[9]))
      tasks.append(
        callCF(indv=indv_url,session=session,**kwargs)
      )
return await asyncio.gather(*tasks)

以下是我在世代循环中如何称呼它们:

for g in generations:
  ...
  fitnesses = asyncio.run(evalAsync(population))

作为参考,该代码在本地物理模拟中运行良好,我将调用替换为对本地物理驱动程序asyncio.run的调用。pool.map

标签: python-3.xgoogle-cloud-functionspython-asyncio

解决方案


看起来您显示的代码工作正常。

为了验证,我只使用了您的代码的最小修改变体,其中包含一些固定的 URL,这些 URL 只返回常量,并且有一些延迟。然后它按预期工作。

因此,我假设 Web 服务可能无法为您的 URL 提供您期望的结果。例如,您可以使用带有所有参数的 URL 之一进行验证,并从浏览器或 wget 命令行程序调用它。也许它没有返回预期的数字?

对于独立测试用例,调用了三个不同的 URL,它们返回数字 40、41 和 42。在服务器端,有 1-3 秒的延迟。

只做了一些小的改动:

  • 检查int(html)语句是否有 `ValueError,例如,如果查询不返回 int
  • read_timeout 已弃用,因此aiohttp.ClientTimeout(total=5 * 60)改为使用
  • callCF 被声明为返回一个 int,因此删除逗号以不返回元组

独立测试用例

import asyncio
import aiohttp
from aiohttp import ClientSession


async def fetchCF(indv: str, session: ClientSession, **kwargs) -> str:
    resp = await session.request(method="GET", url=indv, **kwargs)
    resp.raise_for_status()
    html = await resp.text()
    return html


# Dispatch the CFs and return fitness
async def callCF(indv: str, session: ClientSession, **kwargs) -> int:
    try:
        html = await fetchCF(indv=indv, session=session, **kwargs)
        result = int(html)
    except (
            aiohttp.ClientError,
            ValueError
    ) as e:
        print(indv, e)
        return -9999
    else:
        return result


# Called via GA
async def evalAsync(pop: set, **kwargs) -> None:
    timeout = aiohttp.ClientTimeout(total=5 * 60)
    async with ClientSession(timeout=timeout) as session:
        tasks = []
        for indv_url in pop:
            tasks.append(
                callCF(indv=indv_url, session=session, **kwargs)
            )
        return await asyncio.gather(*tasks)


if __name__ == "__main__":
    population = {"https://www.software7.biz/tst/number.php",
                  "https://www.software7.biz/tst/number1.php",
                  "https://www.software7.biz/tst/number2.php"}

    fitnesses = asyncio.run(evalAsync(pop=population))
    for fit in fitnesses:
        print(fit)

结果

结果

谷歌云功能

由于它适用于传统的 Web 服务器,我们现在可以尝试使用 Google Cloud Functions 进行简单测试。上面的代码无需身份验证即可工作。但是添加一个简单的基本身份验证当然没有什么坏处。然后,服务器端 Google Cloud Function 将如下所示:

import time
from flask import Flask, request
from flask_httpauth import HTTPBasicAuth

auth = HTTPBasicAuth()

users = {
    "someUser": "someSecret",
}

@auth.get_password
def get_pw(username):
    if username in users:
        return users.get(username)
    return None

app = Flask(__name__)

@app.route('/', methods=['GET'])
@auth.login_required
def compute42(request):
    op1 = request.args.get('op1')
    op2 = request.args.get('op2')
    time.sleep(25) 
    return str(int(op1) + int(op2))

它需要两个操作数,休眠 25 秒,然后返回它们的总和。

Python程序的适配

对于此 Cloud Function,调用 Python 程序必须稍作修改:

async def evalAsync(pop: set, **kwargs) -> None:
    timeout = aiohttp.ClientTimeout(total=5 * 60)
    auth = aiohttp.BasicAuth(login='someUser', password='someSecret')
    async with ClientSession(timeout=timeout, auth=auth) as session:
        tasks = []
        for indv_url in pop:
            tasks.append(
                callCF(indv=indv_url, session=session, **kwargs)
            )
        return await asyncio.gather(*tasks)


if __name__ == "__main__":
    population = {"https://someserver.cloudfunctions.net/computation42?op1=17&op2=4",
                  "https://someserver.cloudfunctions.net/computation42?op1=11&op2=4700",
                  "https://someserver.cloudfunctions.net/computation42?op1=40&op2=2"}

    fitnesses = asyncio.run(evalAsync(pop=population))
    for fit in fitnesses:
        print(fit)

基本上只有aiohttp.BasicAuth一些 GET 参数被添加到 URL。

然后 Python 程序到控制台的输出是:

21
4711
42

推荐阅读