首页 > 解决方案 > 异步 asyncio Django 命令按顺序运行

问题描述

我编写了一个简单的命令来遍历所有Result对象并检查其www字段(表示已发表科学结果的 URL,例如https://doi.org/10.1109/5.771073

我们的数据库中有 1M 个结果,我想检查该www字段,如果链接已损坏,我会通过将实际附加doihttps://doi.org/来猜测它并保存(在该www字段中)

这是我第一次使用 asyncio,但我认为我的代码的准系统是正确的,我不知道为什么代码会同步运行。

主要命令:

# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import asyncio
import time

from django.core.management.base import BaseCommand

from models import Result


def run_statistics(array_of_results, num_of_results):
    num_of_correct_urls = 0
    sum_check_time = 0
    max_check_time = 0
    for res in array_of_results:
        if res[0]:
            num_of_correct_urls += 1
        if res[1] > max_check_time:
            max_check_time = res[1]

        sum_check_time += res[1]

    return f"""ran statistics on {num_of_results} results \n
            ----------------------------------------------------------------------------
            correct/corrupted link ratio: {num_of_correct_urls} / {num_of_results - num_of_correct_urls}\n
            Mean time to check URL: {sum_check_time / num_of_results}\n
            """


class Command(BaseCommand):
    help = 'checks url in www field of result, if the link is unresponsive, tries to generate new hyperlink ' \
           '(using DOI) and saves it in www_processed field'

    async def run_check(self, obj):
        """
        Takes care of checking Result www filed.
         `await obj.get_www()` passes function control back to the event loop.
         :returns
         True on unchanged url
         False otherwise
         """
        print('STARTING run_check', file=self.stdout)
        start_time = time.perf_counter()
        final_url = await obj.get_www_coroutine()

        if final_url == obj.www:
            print('STOPPING run_check', file=self.stdout)
            return True, time.perf_counter() - start_time
        else:
            print('STOPPING run_check', file=self.stdout)
            return False, time.perf_counter() - start_time

    async def main(self, objs):
        await asyncio.gather(self.run_check(objs[0]), self.run_check(objs[1]))

    def handle(self, *args, **kwargs):
        start_time = time.perf_counter()
        print('started the process', file=self.stdout)
        objs = Result.objects.all().only('www', 'www_processed', 'www_last_checked').order_by('?')[:2]
        num_of_results = 10 # Result.objects.all().count()
        print('running main', file=self.stdout)

        async def _main_routine():
            array_of_responses = await asyncio.gather(*(self.run_check(_) for _ in objs))

            print(f'retrieved {num_of_results} results, running command', file=self.stdout)

            # print(res_array, file=self.stdout)
            print(run_statistics(array_of_responses, 10) + f'total time: {time.perf_counter() - start_time}\n',
                  file=self.stdout)

        asyncio.run(_main_routine())

检查www字段并保存猜测链接的方法,如果需要的话

 async def get_www_coroutine(self):
        if not self.www_last_checked or datetime.date.today() - self.www_last_checked > datetime.timedelta(days=365):
            if not self.www or not await check_url_returns_200_in_time_coroutine(self.www):  # www is corrupted
                if self.doi:
                    self.www_processed = self.get_doi_url()
                else:
                    self.www_processed = None
                self.www_last_checked = datetime.date.today()
            else:  # www looks alright
                self.www_processed = self.www
            self.save()

        return self.www_processed or False

检查链接是否返回 200 的方法

async def check_url_returns_200_in_time_coroutine(url, timeout=1):
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return response.status == 200

    except aiohttp.client_exceptions.InvalidURL:
        return False

实际输出:

started the process

running main

STARTING run_check

STOPPING run_check

STARTING run_check

STOPPING run_check

retrieved 10 results, running command

ran statistics on 10 results 

            ----------------------------------------------------------------------------
            correct/corrupted link ratio: 1 / 9

            Mean time to check URL: 0.17720807899999896

            total time: 73.279784077

如您所见,代码是按顺序执行的,并且需要很长时间才能完成。我希望首先看到STARTING run_check所有对象,然后STOPPING run_check

标签: pythondjangodjango-modelspython-asynciodjango-commands

解决方案


我终于解决了这个问题!!!

代码异步运行(我只测试了两个结果,因此从输出中不清楚)

瓶颈实际上是db查询,objs = Result.objects.all().only('www', 'www_processed', 'www_last_checked').order_by('?')[:2]因为有1M个对象,所以需要很多时间,并且order_by(?)需要先做一些逻辑。更多信息:如何使用 Django 的 ORM 提取随机记录?


推荐阅读