进程池,线程池
以时间换空间,控制进程,线程开启的数量
进程池与cpu一一对应是并行
线程池是并发:一个容器,这个容器限制住你开启线程(进程)的数量,比如4个,第一次肯定只能变更发的处理4个任务,只要有任务完成,线程马上就会接下一个人任务
以时间换空间,控制进程,线程开启的数量
进程池与cpu一一对应是(并行)或并行加并发
线程池是并发:一个容器,这个容器限制住你开启线程(进程)的数量,比如4个,第一次肯定只能变更发的处理4个任务,只要有任务完成,线程马上就会接下一个人任务
以时间换空间
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
def fun():
print(1)
t=ThreadPoolExecutor()#实例化一个进程池对象
t.submit(fun)#用submit开启一个进程池
基于线程池的服务端
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import socket
import os
import time
import random
def communicate(conn, addr):
while 1:
#
# try:
from_client_data = conn.recv(1024)
print(f'来自客户端{addr[1]}的消息: {from_client_data.decode("utf-8")}')
to_client_data = input('>>>').strip()
conn.send(to_client_data.encode('utf-8'))
if __name__ == '__main__':
server = socket.socket()
server.bind(('127.0.0.1', 8848))
server.listen(5)
print(1)
while 1:
con, addr = server.accept()
t=ThreadPoolExecutor()
t.submit(communicate,con,addr)
基于进程的服务端
import socket
from threading import Thread
import time
import random
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from multiprocessing import Process
def comniate(conn,addr):
while 1:
time.sleep(random.randint(1,3))
for_client_data=conn.recv(1024)
print(f'客户端{addr[1]}消息{for_client_data.decode("utf-8")}')
time.sleep(random.randint(3,4))
# to_client_data=input('>>>').strip()#不能使用input
conn.send('121212'.encode('utf-8'))
conn.close()
def func():
server=socket.socket()
server.bind(('127.0.0.1',8848))
server.listen(5)
print('开启')
while 1:
conn,addr=server.accept()
t.submit(comniate,conn,addr)
if __name__ == '__main__':
t = ProcessPoolExecutor(3)
func()
阻塞非阻塞
阻塞,非阻塞程序两种运行状态
三种运行状态 运行,就绪,阻塞
遇到io操作就会发生阻塞,程序一旦遇到阻塞操作就会停在原地,并且会立刻释放cpu资源(cpu切换到其他进程中),结束进入就绪态
非阻塞:没有遇到io操作,但是我通过某种手段,让cpu强行运行我的程序
(或者用某种手段让程序即使遇到io操作也不会停在原地,执行其他操作,力求尽可能多的占有cpu)
同步与异步
obj=pool.submit()#obj是动态对象
打印obj出现状态
obj.result()等待这个任务结束,返回结果,在执行下一个任务(加上这个result会变成同步)#需要等到拿到返回结果
提交任务的两种方式
同步调用:
提交完任务后(任务执行不完会被cpu会切走),(不管有计算还是遇到io)就在原地等待,直到任务运行完毕后,拿到任务的返回值(不一定有),才继续执行下一行代码(同步调用和阻塞没有关系)
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
def task(i):
print(f'{os.getpid()}开始任务')
time.sleep(random.randint(1,3))
print(f'{os.getpid()}任务结束')
return i
if __name__ == '__main__':
pool=ProcessPoolExecutor()
for i in range(20):
obj=pool.submit(task,i)#开启子进程
print(obj.result())#等待打印返回值#return 的返回值返回给了 result
pool.shutdown(wait=True)
print('===主')
多个任务,同步调用效率低
异步调用:
一次提交多个任务,然后直接执行下一行代码,
如何接收返回值
1.统一回收结果
假如异步发出10个任务,并发的执行,但是统一的接收所有的任务的返回值(效率低,不能实时的获取结果)
缺点:我不能马上收到任何一个已经完成的任务的返回值
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
def task(i):
print(f'{os.getpid()}开始任务')
time.sleep(random.randint(1,3))
print(f'{os.getpid()}任务结束')
return i
if __name__ == '__main__':
pool=ProcessPoolExecutor()
li=[]
for i in range(20):
obj=pool.submit(task,i)
li.append(obj)
# print(obj.result()) # 打印返回值
pool.shutdown(wait=True)
for i in li:
print(i.result())
print('===主')
2.完成一个任务接收一个返回结果(有问题)
import requests
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
def task(url):
ret=requests.get(url)
if ret.status_code==200:
return parse(ret.text)
#parse(ret.text)#一个线程除了取数据还需要清洗数据 时间长
def parse(obj):
return(len(obj))
#def task(url):
#ret=requests.get(url)
#if ret.status_code==200:
#return parse(ret.text)
#def parse(obj):
#return(len(obj))
if __name__ == '__main__':
pool=ThreadPoolExecutor(4)
url_list=[
'https://www.luffycity.com/',
'https://www.baidu.com/',
'https://www.luffycity.com/',
'https://www.luffycity.com/',
'https://www.luffycity.com/',
'https://www.luffycity.com/',
'https://www.luffycity.com/',
]
li=[]
for url in url_list:
obj=pool.submit(task,url)
li.append(obj)
pool.shutdown(wait=True)
for i in li:
print(i.result())#统一接收结果
#两个函数耦合性太高
shutdown:
只有线程 进程池里面有
1.让我的主进程池中所有的子进程都结束任务之后,类似join,一个任务是通过一个函数实现的,任务完成了的返回值是函数的返回值
2.执行时不允许别人添加执行任务,接受返回值就证明任务结束了
浏览器的工作原理
向服务端发送一个请求,服务端验证你的请求,如果正确,给你的浏览器返回一个文件,将文件里面的代码渲染成你看到的漂亮美丽的模样.
爬虫
1.利用代码模拟一个浏览器,浏览器的工作流程
2.数据清洗(处理源代码)
对源代码进程数据 清晰得到我想要的数据
import requests
response=requests.get('http://www.baidu.com')#拿到一个网址url
if response.status_code==200: #200可以访问
print(response.text)#进行网页数据获取
异步+回调
运行完不等待返回值,返回值需统一回收,想要不统一,需要用到回调机制
进行
异步是io密集型,回调是计算密集型
异步统一回收,实时接收,回调接收处理结果
什么是回调函数?
按顺序接收每个任务的结果,进行下一步 处理.
import requests
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
def task(url):
'''模拟的就是爬取多个源代码 一定有IO操作'''
ret=requests.get(url)
if ret.status_code==200:
return ret.text
def parse(obj):
'''模拟对数据进行分析 一般没有IO'''
print(len(obj.result()))
if __name__ == '__main__':
pool=ThreadPoolExecutor(4)
url_list=[
'https://www.luffycity.com/',
'https://www.baidu.com/',
'https://www.luffycity.com/',
'https://www.luffycity.com/',
'https://www.luffycity.com/',
'https://www.luffycity.com/',
'https://www.luffycity.com/',
]
for url in url_list:
obj=pool.submit(task,url)
obj.add_done_callback(parse)#回调函数
#线程池设置4个线程, 异步发起10个任务,每个任务是通过网页获取源码, 并发执行,
# 当一个任务完成之后,将parse这个分析代码的任务交由剩余的空闲的线程去执行,你这个线程继续去处理其他任务.
#进程池+回调是全部交由主进程进行运行
#线程池+回调是全部交由空闲线程运行
线程queue
十四 线程队列
线程之间的通信我们列表行不行呢,当然行,那么队列和列表有什么区别呢
queue队列 :使用import queue,用法与进程Queue一样
- class queue.Queue(maxsize=0) #先进先出
[](javascript:void(0)