首页 > 技术文章 > PYTHON之路(九)

joey251744647 2016-04-02 14:56 原文



queue队列
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out
class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列
Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty
Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception queue.Full
Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

Queue.qsize()
Queue.empty() #return True if empty
Queue.full() # return True if full
Queue.put(item, block=True, timeout=None)
Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue.put_nowait(item)
Equivalent to put(item, False).

Queue.get(block=True, timeout=None)
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue.get_nowait()
Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue.task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue.join() block直到queue被消费完毕



生产者消费者模型

import time,random
import queue,threading
q = queue.Queue()
def Producer(name):
count = 0
while count <20:
time.sleep(random.randrange(3))
q.put(count)
print('Producer %s has produced %s baozi..' %(name, count))
count +=1
def Consumer(name):
count = 0
while count <20:
time.sleep(random.randrange(4))
if not q.empty():
data = q.get()
print(data)
print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
else:
print("-----no baozi anymore----")
count +=1
p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
p1.start()
c1.start()




#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,queue
import time

def consumer(n):
while True:
print("\033[32;1mconsumer [%s]\033[0m get task: %s" % (n,q.get()))
time.sleep(1)
q.task_done()
def producer(n):
count = 1
while True:
#time.sleep(1)
#if q.qsize() <3:
print("prodcer [%s] produced a new task : %s" %(n,count))
q.put(count)
count +=1
q.join() #queue is emtpy
print("all taks has been cosumed by consumers...")

q = queue.Queue()
c1 = threading.Thread(target=consumer,args=[1,])
c2 = threading.Thread(target=consumer,args=[2,])
c3 = threading.Thread(target=consumer,args=[3,])
p = threading.Thread(target=producer,args=["XiaoYu",])
p2 = threading.Thread(target=producer,args=["LiuYao",])
c1.start()
c2.start()
c3.start()
p.start()
p2.start()




#################################################################################

http://www.cnblogs.com/alex3714/articles/5248247.html

协程
协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。



协程的好处:

无需线程上下文切换的开销
无需原子操作锁定及同步的开销
方便切换控制流,简化编程模型
高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
缺点:

无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
使用yield实现协程操作例子 



import time
import queue
def consumer(name):
print("--->starting eating baozi...")
while True:
new_baozi = yield
print("[%s] is eating baozi %s" % (name,new_baozi))
#time.sleep(1)

def producer():

r = con.__next__()
r = con2.__next__()
n = 0
while n < 5:
n +=1
con.send(n)
con2.send(n)
print("\033[32;1m[producer]\033[0m is making baozi %s" %n )


if __name__ == '__main__':
con = consumer("c1")
con2 = consumer("c2")
p = producer()




Gevent
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
pip3 install gevent

import gevent

def foo():
print('Running in foo')
gevent.sleep(0) #block
print('Explicit context switch to foo again')

def bar():
print('Explicit context to bar')
gevent.sleep(0) #block
print('Implicit context switch back to bar')

gevent.joinall([
gevent.spawn(foo), #gevent.spawn(fun,param) -- > 启动协程,类似让单线程下的任务异步进行
gevent.spawn(bar),
])
输出:

Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar


if change to foo() gevent.sleep(0) change to gevent.sleep(1) , result would be
Running in foo
Explicit context to bar
Implicit context switch back to bar #--- > sleep 1 second, then print the later sentence
Explicit context switch to foo again



同步与异步的性能区别


import gevent

def task(pid):
"""
Some non-deterministic task
"""
print('Task %s start' %pid)
gevent.sleep(0.5)
print('Task %s done' % pid)

def synchronous():
for i in range(1,10):
task(i)

def asynchronous():
threads = [gevent.spawn(task, i) for i in range(10)]
gevent.joinall(threads)

print('Synchronous:')
synchronous()

print('Asynchronous:')
asynchronous()
上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。


遇到IO阻塞时会自动切换任务

from gevent import monkey; monkey.patch_all() #monkey.patch_all()有点类似黑语法,使得程序变成非阻塞 gevent.spawn() + monkey.patch_all()即是异步非阻塞
import gevent
from urllib.request import urlopen

def f(url):
print('GET: %s' % url)
resp = urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
gevent.spawn(f, 'https://www.python.org/'),
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f, 'https://github.com/'),
])

输出
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
46971 bytes received from https://www.python.org/.
25179 bytes received from https://github.com/.
488123 bytes received from https://www.yahoo.com/.


如果没有money.patch_all(),输出结果
GET: https://www.python.org/
46971 bytes received from https://www.python.org/.
GET: https://www.yahoo.com/
474672 bytes received from https://www.yahoo.com/.
GET: https://github.com/
25179 bytes received from https://git


通过gevent实现单线程下的多socket并发

server side

import sys
import socket
import time
import gevent

from gevent import socket,monkey
monkey.patch_all()
def server(port):
s = socket.socket()
s.bind(('0.0.0.0', port))
s.listen(500)
while True:
cli, addr = s.accept()
gevent.spawn(handle_request, cli)
def handle_request(s):
try:
while True:
data = s.recv(1024)
print("recv:", data)
s.send(data)
if not data:
s.shutdown(socket.SHUT_WR)

except Exception as ex:
print(ex)
finally:

s.close()
if __name__ == '__main__':
server(8001)



client side   


import socket

HOST = 'localhost' # The remote host
PORT = 8001 # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
msg = bytes(input(">>:"),encoding="utf8")
s.sendall(msg)
data = s.recv(1024)
#print(data)

print('Received', repr(data))
s.close()





###########################################################################################################


首先列一下,sellect、poll、epoll三者的区别
select
select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。

select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。

另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。

poll
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

epoll
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。



其实主要就是两个部分,内核如何通知就绪的文件描述符,用户空间如何获取就绪的文件描述符。




####################################################################################################

http://www.cnblogs.com/wupeiqi/articles/5095821.html


paramiko
MySQLdb



基于用户名密码连接:

import paramiko

# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', password='123')

# 执行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 获取命令结果
result = stdout.read()

# 关闭连接
ssh.close()




SSHClient 封装 Transport

import paramiko

transport = paramiko.Transport(('hostname', 22))
transport.connect(username='wupeiqi', password='123')

ssh = paramiko.SSHClient()
ssh._transport = transport

stdin, stdout, stderr = ssh.exec_command('df')
print stdout.read()

transport.close()




基于密钥

import paramiko

private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')

transport = paramiko.Transport(('hostname', 22))
transport.connect(username='wupeiqi', pkey=private_key)

ssh = paramiko.SSHClient()
ssh._transport = transport

stdin, stdout, stderr = ssh.exec_command('df')

transport.close()



SFTPClient

用于连接远程服务器并执行上传下载

基于用户名密码上传下载

import paramiko

transport = paramiko.Transport(('hostname',22))
transport.connect(username='wupeiqi',password='123')

sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 将remove_path 下载到本地 local_path
sftp.get('remove_path', 'local_path')

transport.close()


基于公钥密钥上传下载

import paramiko

private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')

transport = paramiko.Transport(('hostname', 22))
transport.connect(username='wupeiqi', pkey=private_key )

sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 将remove_path 下载到本地 local_path
sftp.get('remove_path', 'local_path')

transport.close()





################################################################################################



MySQLdb

python2.7
apt-get install python-mysqldb


python3.4
pip3 install pymysql





delete

import pymysql
#import MySQLdb

conn = pymysql.connect(host='127.0.0.1',user='root',passwd='123456',db='newdb')

cur = conn.cursor()
b=(2,4)
#reCount = cur.execute('create table new (id int(10),name varchar(20))')
for a in b:
reCount1 = cur.execute('delete from new where id in (%s)',a)
conn.commit()
cur.close()
conn.close()
#print(reCount)
print(reCount1)


insert many

import pymysql
#import MySQLdb

conn = pymysql.connect(host='127.0.0.1',user='root',passwd='123456',db='newdb')

cur = conn.cursor()
li = [(3,'rose'),(4,'lize')]
#reCount = cur.execute('create table new (id int(10),name varchar(20))')
reCount1 = cur.executemany('insert into new(id,name) values(%s,%s)',li)
conn.commit()
cur.close()
conn.close()
#print(reCount)
print(reCount1)




insert

import pymysql
#import MySQLdb

conn = pymysql.connect(host='127.0.0.1',user='root',passwd='123456',db='newdb')

cur = conn.cursor()

#reCount = cur.execute('create table new (id int(10),name varchar(20))')
reCount1 = cur.execute('insert into new(id,name) values(%s,%s)',(1,'jack'))
reCount2 = cur.execute('insert into new values(%(id)s,%(name)s)',{'id':2,'name':'alex'})
conn.commit()
cur.close()
conn.close()
#print(reCount)
print(reCount1)
print(reCount2)





general

import pymysql
#import MySQLdb

conn = pymysql.connect(host='127.0.0.1',user='root',passwd='123456')

cur = conn.cursor()

reCount = cur.execute('create database aa')
cur.execute('use aa;create table aaa (id int(1))')
conn.commit()
cur.close()
conn.close()
print(reCount)






select

import pymysql
#import MySQLdb

conn = pymysql.connect(host='127.0.0.1',user='root',passwd='123456',db='mysql')

cur = conn.cursor()

reCount = cur.execute('select user,host from user')
print(cur.fetchone())
print(cur.fetchmany(2))
cur.scroll(-1,mode='relative')
print(cur.fetchone())
cur.scroll(0,mode='absolute')
print(cur.fetchone())
result = cur.fetchall()
cur.close()
conn.close()
print(reCount)
for i in result:
print(i[0],i[1])



update

import pymysql
#import MySQLdb

conn = pymysql.connect(host='127.0.0.1',user='root',passwd='123456',db='newdb')

cur = conn.cursor()
b=(2,4)
#reCount = cur.execute('create table new (id int(10),name varchar(20))')
reCount1 = cur.execute('update new set id = %s',(b[0],))
conn.commit()
cur.close()
conn.close()
#print(reCount)
print(reCount1)





推荐阅读