首页 > 技术文章 > 自定义异步IO模块

longyunfeigu 2018-08-20 11:23 原文

高性能爬虫

假设有3个url需要发请求。

串行

import requests

urls_list = [
    'https://www.cnblogs.com/longyunfeigu/p/9491496.html',
    'https://dig.chouti.com/',
    'https://dig.chouti.com/r/pic/hot/1'
]

for url in urls_list:
    requests.get(url)

串行肯定是最慢的,怎么改进?第一反应:开多个线程。好吧,lowB 第一反应都是这个方法

多线程

import time
from concurrent.futures import ThreadPoolExecutor
import requests

urls_list = [
    'https://www.cnblogs.com/longyunfeigu/p/9491496.html',
    'https://dig.chouti.com/',
    'https://dig.chouti.com/r/pic/hot/1'
]

start_time = time.time()
pool = ThreadPoolExecutor(3)
def task(url):
    requests.get(url)
    print('ending')

for i in urls_list:
    pool.submit(task, i)
pool.shutdown()
end_time = time.time()
print(end_time - start_time)

开多个线程虽然可以实现并发,但是开线程总归是要耗费资源的,那能不能利用一个线程帮忙提高效率呢,这就需要异步非阻塞登场了

异步非阻塞

什么是异步非阻塞?异步非阻塞就是 异步+非阻塞。异步就是回调,非阻塞指的是单个任务不等待。
在网络IO中,有两个阶段会出现"浪费时间"的情况,一个阶段是connect的时候,请求发出去等待信息回来通知这个客户端socket就绪可以发http报文信息。
另一个阶段是recv接受数据的时候(发送数据不需要等待,直接发就行)需要等待,因为服务端需通过网络把数据发送过来。非阻塞就是不再等待,connect原先需要等待是吧,ok现在我不等了,
recv需要等待是吧,ok我也不等了。不等报错咋办?报错就报错呗,大不了异常捕获就行。如果是非阻塞socket,那么3个url的请求在connect的时候都发出去了,注意,即使报错,请求也是如弦上之箭射出去了。
那么等connect成功后就应该发送数据了呀,这时候就体现出回调了,成功就回来调用一段代码。所有的抽象的话语都可以结合大致的代码来理解。以后如果遇到一些抽象话语不好理解,那么就应该用代码去理解这个抽象话语。
有了异步非阻塞的概念是,那么我们就可以利用一个线程把所有的连接都发送出去,等连接都发送出去而且任意一个connect的信息都没返回的时候,线程就只好等待了,
等有connect的信息返回表示这个客户端socket准备就绪可以发http报文了,这时候再去执行发送数据的代码。这个过程说起来简单,但有一个问题还不清晰:程序怎么知道哪个socket是就绪的?这个大致有2种解决方式:要么是通知(叫醒服务,这个更偏向底层,我们写的应用程序代码可以利用底层已经实现好的技术);要么是用while死循环不断去检测 。python中有一些模块已经帮我们实现了异步非阻塞的功能。

twisted

from twisted.web.client import getPage, defer
from twisted.internet import reactor


def all_done(arg):
    reactor.stop()


def callback(contents):
    print(contents)


deferred_list = []

url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
for url in url_list:
    deferred = getPage(bytes(url, encoding='utf8'))
    # 次数使用回调函数,猜测用到了异步功能
    deferred.addCallback(callback)
    deferred_list.append(deferred)

dlist = defer.DeferredList(deferred_list)
dlist.addBoth(all_done)
# 死循环,不断地处理deferred_list里的socket对象,按理说处理完成就应该自动停止了,但可能是这个框架架构设计问题,不能做到自动停止,需要手动停
reactor.run()

gevent

gevent 是基于greenlet做的,greenlet实现了协程。协程和线程不一样,协程不是真实存在的,是程序员伪造出来的具有类似于线程的切换效果。单纯的协程不能完成提高效率的方式,如greenlet需要手动切换,所以这出现了gevent。
gevent 可以在遇到IO阻塞的时候自动切换协程的运行,这样就可以提升效率。而协程切来切去的功能正好可以利用过来实现异步非阻塞

import gevent
from gevent import monkey

monkey.patch_all()
import requests

def fetch_async(method, url, req_kwargs):
    print(method, url, req_kwargs)
    response = requests.request(method=method, url=url, **req_kwargs)
    print(response.url)

# ##### 发送请求 #####
gevent.joinall([
    gevent.spawn(fetch_async, method='get', url='https://www.cnblogs.com/longyunfeigu/', req_kwargs={}),
    gevent.spawn(fetch_async, method='get', url='https://www.baidu.com/', req_kwargs={}),
    gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),
])

requests模块发请求,内部也是用socket,先去connect,然后发送请求,最后收到响应

asyncio

asyncio默认只支持发tcp层的报文,想要发http请求,就需要自己封装http报文

import asyncio

@asyncio.coroutine
def fetch_async(host, url='/'):
    print(host, url)
    reader, writer = yield from asyncio.open_connection(host, 80)

    request_header_content = """GET %s HTTP/1.0\r\nHost: %s\r\n\r\n""" % (url, host,)
    request_header_content = bytes(request_header_content, encoding='utf-8')

    writer.write(request_header_content)
    yield from writer.drain()
    text = yield from reader.read()
    print(host, url, text)
    writer.close()

tasks = [
    fetch_async('www.cnblogs.com', '/longyunfeigu/'),
    fetch_async('baidu.com', '/')
]

loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

自定义IO模块

运用知识:非阻塞socket + IO多路复用

from socket import *
import select
import time

class HttpContext(object):
    def __init__(self, sock, url_dict):
        import time
        self._start_time = time.time()
        self.sock = sock
        self.port = ''
        self.host = ''
        self.method = ''
        self.data = ''
        self.path = ''
        self.content = b''
        self.text = ''
        self.timeout = 0
        self.callback = None
        self.initial(url_dict)

    def initial(self, url_dict):
        self.port = url_dict.get('port')
        self.host = url_dict.get('host')
        self.method = url_dict.get('method')
        self.data = url_dict.get('data')
        self.path = url_dict.get('path')
        self.timeout = url_dict.get('timeout', 5)
        self.callback = url_dict.get('callback')

    def connect(self):
        self.sock.connect((self.host, self.port))

    def fileno(self):
        return self.sock.fileno()

    def send_get(self):
        content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n"""%(
            self.method.upper(), self.path, self.host)
        self.sock.sendall(bytes(content, encoding='utf8'))

    def sendall(self):
        if self.method.upper() == 'GET':
            self.send_get()
        elif self.method.upper() == 'POST':
            self.send_post()
        else:
            pass

    def send_post(self):
        content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n%s"""%(
            self.method.upper(), self.path, self.host, self.data)
        self.sock.sendall(bytes(content, encoding='utf8'))

    def recv(self):
        import time
        while 1:
            data = self.sock.recv(8096)
            if not data:
                break
            self.content += data
            self.text += str(data, encoding='utf8')
            time.sleep(0.1)
        self.finish()

    def finish(self, msg=''):
        if msg:
            self.text = msg
            self.content = bytes(msg, encoding='utf8')
        if self.callback:
            self.callback(self.text)


class AsynchRequest(object):
    def __init__(self):
        self.conn_socket_list = []
        self.recv_socket_list = []

    def add_request(self, **url_dict):
        # 立即发起connect连接
        soc = socket(AF_INET,SOCK_STREAM)
        soc.setblocking(0)
        ctx = HttpContext(soc, url_dict)
        self.conn_socket_list.append(ctx)
        self.recv_socket_list.append(ctx)
        try:
            ctx.connect()
        except BlockingIOError as e:
            pass
            # print('request is sended')

    def check_timeout(self):
        # 检验是否超时
        ctime = time.time()
        for ctx in self.recv_socket_list:
            if ctx._start_time + ctx.timeout <= ctime:
                self.recv_socket_list.remove(ctx)
                self.conn_socket_list.remove(ctx)
                ctx.finish('connect超时')

    def run(self):
        while 1:
            #  r_list 代表socket对象是否有数据可以读, w_list代表socket是否可以写,也就是是否可以发送数据,写服务端程序一般只需要用到 r_list
            # IO多路复用监听的对象不一定是socket对象,只要对象有fileno方法都能监听,内部也是拿对象的fileno的返回值来监听
            r_list, w_list, e_list = select.select(self.conn_socket_list, self.recv_socket_list, [], 0.05)
            for w in w_list:
                w.sendall()
                self.recv_socket_list.remove(w)
            for r in r_list:
                r.recv()
                self.conn_socket_list.remove(r)
            if not self.conn_socket_list:
                break
            self.check_timeout()

def callback(response):
    print(response)

url_list = [
    {'host': 'www.baidu.com','port': 80, 'path':'/', 'name':'baidu', 'method':'GET', 'callback': callback},
    {'host': 'cn.bing.com','port': 80,'path':'/', 'name':'chouti', 'method':'GET'},
]
if __name__ == '__main__':    
    obj = AsynchRequest()
    for i in url_list:
        obj.add_request(**i)
    obj.run()

这里的自定义IO模块是站在客户端的角度来定义的,借助底层的IO多路复用来帮我们检测socket是否已经准备好(不用我们手动写死循环去检测socket对象是否已经准备好)

推荐阅读