首页 > 技术文章 > 进程,线程池 ,同步异步

strawberry-1 2019-08-29 17:09 原文

进程池,线程池

以时间换空间,控制进程,线程开启的数量

进程池与cpu一一对应是并行

线程池是并发:一个容器,这个容器限制住你开启线程(进程)的数量,比如4个,第一次肯定只能变更发的处理4个任务,只要有任务完成,线程马上就会接下一个人任务

以时间换空间,控制进程,线程开启的数量

进程池与cpu一一对应是(并行)或并行加并发

线程池是并发:一个容器,这个容器限制住你开启线程(进程)的数量,比如4个,第一次肯定只能变更发的处理4个任务,只要有任务完成,线程马上就会接下一个人任务

以时间换空间

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
def fun():
    print(1)
t=ThreadPoolExecutor()#实例化一个进程池对象
t.submit(fun)#用submit开启一个进程池

基于线程池的服务端

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import socket
import os
import time
import random


def communicate(conn, addr):

    while 1:
        #
        # try:

            from_client_data = conn.recv(1024)

            print(f'来自客户端{addr[1]}的消息: {from_client_data.decode("utf-8")}')

            to_client_data = input('>>>').strip()

            conn.send(to_client_data.encode('utf-8'))
if __name__ == '__main__':
    server = socket.socket()
    server.bind(('127.0.0.1', 8848))
    server.listen(5)
    print(1)
    while 1:
        con, addr = server.accept()
        t=ThreadPoolExecutor()
        t.submit(communicate,con,addr)

基于进程的服务端

import socket
from threading import Thread
import time
import random
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from multiprocessing import Process
def comniate(conn,addr):
    while 1:
        time.sleep(random.randint(1,3))
        for_client_data=conn.recv(1024)
        print(f'客户端{addr[1]}消息{for_client_data.decode("utf-8")}')
        time.sleep(random.randint(3,4))
        # to_client_data=input('>>>').strip()#不能使用input
        conn.send('121212'.encode('utf-8'))
    conn.close()


def func():
    server=socket.socket()
    server.bind(('127.0.0.1',8848))
    server.listen(5)
    print('开启')
    while 1:
        conn,addr=server.accept()

        t.submit(comniate,conn,addr)


if __name__ == '__main__':
    t = ProcessPoolExecutor(3)
    func()

阻塞非阻塞

阻塞,非阻塞程序两种运行状态

三种运行状态 运行,就绪,阻塞

遇到io操作就会发生阻塞,程序一旦遇到阻塞操作就会停在原地,并且会立刻释放cpu资源(cpu切换到其他进程中),结束进入就绪态

非阻塞:没有遇到io操作,但是我通过某种手段,让cpu强行运行我的程序

(或者用某种手段让程序即使遇到io操作也不会停在原地,执行其他操作,力求尽可能多的占有cpu)

同步与异步

obj=pool.submit()#obj是动态对象

打印obj出现状态

obj.result()等待这个任务结束,返回结果,在执行下一个任务(加上这个result会变成同步)#需要等到拿到返回结果

提交任务的两种方式

同步调用:

提交完任务后(任务执行不完会被cpu会切走),(不管有计算还是遇到io)就在原地等待,直到任务运行完毕后,拿到任务的返回值(不一定有),才继续执行下一行代码同步调用和阻塞没有关系

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os

def task(i):
    print(f'{os.getpid()}开始任务')
    time.sleep(random.randint(1,3))
    print(f'{os.getpid()}任务结束')
    return i
if __name__ == '__main__':
    pool=ProcessPoolExecutor()
    for i in  range(20):
        obj=pool.submit(task,i)#开启子进程
        print(obj.result())#等待打印返回值#return 的返回值返回给了 result
        pool.shutdown(wait=True)
    print('===主')

多个任务,同步调用效率低

异步调用:

一次提交多个任务,然后直接执行下一行代码,

如何接收返回值

1.统一回收结果

假如异步发出10个任务,并发的执行,但是统一的接收所有的任务的返回值(效率低,不能实时的获取结果)

缺点:我不能马上收到任何一个已经完成的任务的返回值

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
def task(i):
    print(f'{os.getpid()}开始任务')
    time.sleep(random.randint(1,3))
    print(f'{os.getpid()}任务结束')
    return i
if __name__ == '__main__':
    pool=ProcessPoolExecutor()
    li=[]
    for i in  range(20):
        obj=pool.submit(task,i)
        li.append(obj)
        # print(obj.result())  # 打印返回值
    pool.shutdown(wait=True)
    for i in li:
        print(i.result())

    print('===主')

2.完成一个任务接收一个返回结果(有问题)


import requests
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
def task(url):
    ret=requests.get(url)
    if ret.status_code==200:
        return parse(ret.text)
    #parse(ret.text)#一个线程除了取数据还需要清洗数据 时间长
def parse(obj):
    return(len(obj))


#def task(url):
    #ret=requests.get(url)
    #if ret.status_code==200:
        #return parse(ret.text)
#def parse(obj):
    #return(len(obj))


if __name__ == '__main__':
    pool=ThreadPoolExecutor(4)
    url_list=[
         'https://www.luffycity.com/',
        'https://www.baidu.com/',
        'https://www.luffycity.com/',
        'https://www.luffycity.com/',
        'https://www.luffycity.com/',
        'https://www.luffycity.com/',
        'https://www.luffycity.com/',
    ]
    li=[]
    for url in url_list:
        obj=pool.submit(task,url)
        li.append(obj)
    pool.shutdown(wait=True)
    for i in li:
    	print(i.result())#统一接收结果
#两个函数耦合性太高

shutdown:

只有线程 进程池里面有

1.让我的主进程池中所有的子进程都结束任务之后,类似join,一个任务是通过一个函数实现的,任务完成了的返回值是函数的返回值

2.执行时不允许别人添加执行任务,接受返回值就证明任务结束了

浏览器的工作原理

向服务端发送一个请求,服务端验证你的请求,如果正确,给你的浏览器返回一个文件,将文件里面的代码渲染成你看到的漂亮美丽的模样.

爬虫

1.利用代码模拟一个浏览器,浏览器的工作流程

2.数据清洗(处理源代码)

对源代码进程数据 清晰得到我想要的数据

import requests
response=requests.get('http://www.baidu.com')#拿到一个网址url
if response.status_code==200:   #200可以访问  
    print(response.text)#进行网页数据获取

异步+回调

运行完不等待返回值,返回值需统一回收,想要不统一,需要用到回调机制

进行

异步是io密集型,回调是计算密集型

异步统一回收,实时接收,回调接收处理结果

什么是回调函数?

按顺序接收每个任务的结果,进行下一步 处理.

import requests
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
def task(url):
    '''模拟的就是爬取多个源代码 一定有IO操作'''
    ret=requests.get(url)
    if ret.status_code==200:
        return ret.text
def parse(obj):
    '''模拟对数据进行分析 一般没有IO'''
    print(len(obj.result()))


if __name__ == '__main__':
    pool=ThreadPoolExecutor(4)
    url_list=[
         'https://www.luffycity.com/',
        'https://www.baidu.com/',
        'https://www.luffycity.com/',
        'https://www.luffycity.com/',
        'https://www.luffycity.com/',
        'https://www.luffycity.com/',
        'https://www.luffycity.com/',
    ]
    for url in url_list:
        obj=pool.submit(task,url)
        obj.add_done_callback(parse)#回调函数
   #线程池设置4个线程, 异步发起10个任务,每个任务是通过网页获取源码, 并发执行,
   # 当一个任务完成之后,将parse这个分析代码的任务交由剩余的空闲的线程去执行,你这个线程继续去处理其他任务.
 #进程池+回调是全部交由主进程进行运行
#线程池+回调是全部交由空闲线程运行

线程queue

十四 线程队列

  线程之间的通信我们列表行不行呢,当然行,那么队列和列表有什么区别呢

  queue队列 :使用import queue,用法与进程Queue一样

  • class queue.Queue(maxsize=0) #先进先出

img

[复制代码](javascript:void(0)

推荐阅读