首页 > 技术文章 > python作业Select版本FTP(第十周)

sean-yao 2017-11-25 01:01 原文

SELECT版FTP:

使用SELECT或SELECTORS模块实现并发简单版FTP

允许多用户并发上传下载文件

 

思路解析:

1. 使用IO多路复用的知识使用SELECTORS封装好的SELECTORS模块编写程序

2. 是用IO多路复用的SELECT编写程序

3 .最后编写多并发程序模拟用户并发上传下载文件,在并发的时候为避免重复写,使用random随机生成新文件名

 

程序核心代码

README

作者:yaobin
版本: Selectors Ftp 示例版本 v0.1
开发环境: python3.6

程序介绍
1. 使用SELECT或SELECTORS模块实现并发简单版FTP
2. 允许多用户并发上传下载文件


文件目录结构
├─bin
│      __init__.py
│      client.py  #客户端主程序
│      server.py  #服务端主程序
│
├─conf
│      setting.py   #配置文件__init__.py
│
│
├─core
│  │  client_main.py        #客户端交互程序
│  │  selectors_client.py   #selectors客户端主程序
│  │  selectors_server.py   #selectors服务端主程序
│  │  select_client.py      #select客户端主程序
│  │  select_server.py      #select服务端主程序
│  │  server_main.py        #server端主程序
│  │  __init__.py
│  │
│  └─__pycache__          #pyc文件目录
│          client_main.cpython-36.pyc
│
├─db
│  │  __init__.py
│  │
│  ├─Client_DownLoad
│  ├─Server_DownLoad
│  ├─Server_Upload
│  └─test
│          test.log
│          test.py
│          __init__.py
│
└─logs
        __init__.py
View Code

conf
setting.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
'''主配置文件'''
import os
import sys
import platform
BASE_DIR  = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)

if platform == 'Windows':#添加上传下载目录变量
    download_path =  (BASE_DIR+'\\'+"\db"+"\Server_DownLoad")
    upload_path = (BASE_DIR+'\\'+"\db"+"\\Server_Upload")
    client_download_path = (BASE_DIR+'\\'+"\db"+"\Client_DownLoad")
else:
    download_path =  (BASE_DIR+'/'+"/db"+"/Server_DownLoad")
    upload_path = (BASE_DIR+'/'+"/db"+"/Server_Upload")
    client_download_path = (BASE_DIR + '/' + "/db" + "/Client_DownLoad")
View Code

core
client_main.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
'''客户端交互程序'''
import os
import sys
BASE_DIR  = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from core import selectors_client
from core import select_client

class client_ftp(object):
    '''client_ftp交互类'''
    def start(self):
        '''
        启动函数
        :return:
        '''
        print('欢迎进入Select Ftp')
        msg = '''
        1.selectors模块客户端上传下载测试
        2.select客户端上传下载测试
        3.exit
        '''
        while True:
            print(msg)
            user_choice = input('请选择操作>>>:')
            if user_choice == '1':
                client = selectors_client.selectors_client()
                client.connect("localhost", 10000)
                client.start()
            elif user_choice == '2':
                client = select_client.select_client()
                client.connect("localhost", 10000)
                client.start()
            elif user_choice == '3' or user_choice == 'q' or user_choice == 'exit':
                sys.exit('程序退出')
            else:
                print('非法操作,请重新输入')
View Code

selectors_client.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
'''selectclient交互程序'''
import sys
import os
import time
import platform
import random
import socket
import json

BASE_DIR  = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from conf import setting

class select_client(object):
    """FTP 客户端"""
    def __init__(self):
        '''
        构造函数
        :return:
        '''
        self.client = socket.socket()


    def start(self):
        '''
        启动函数
        :return:
        '''
        print('login time : %s' % (time.strftime("%Y-%m-%d %X", time.localtime())))
        while True:
            try:
                self.sending_msg_list = []
                self.sending_msg = input('[root@select_ftp_client]# ')
                self.sending_msg_list = self.sending_msg.split()
                self.action = self.sending_msg_list[0]
                if len(self.sending_msg_list) == 0:
                    continue
                elif len(self.sending_msg_list) == 1:
                    if self.sending_msg_list[0] == "exit":
                        print('logout')
                        break
                    else:
                        print(time.strftime("%Y-%m-%d %X", time.localtime()),
                              '-bash : %s command not found' % self.sending_msg_list[0])
                else:
                    try:
                        if platform.system() == 'Windows':
                            self.file_path = self.sending_msg_list[1]
                            self.file_list = self.sending_msg_list[1].strip().split('\\')
                            self.file_name = self.file_list[-1]
                        elif platform.system() == 'Linux':
                            self.file_path = self.sending_msg_list[1]
                            self.file_list = self.sending_msg_list[1].strip().split('/')
                            self.file_name = self.file_list[-1]
                    except IndexError:
                        pass
                    if self.action == "put":
                        self.put(self.action,self.file_name)

                    elif self.action == "get":
                        self.get(self.action,self.file_name)
                    else:
                        print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client:-bash: %s:'
                              % self.sending_msg_list[0], 'command not found')
            except ConnectionResetError and ConnectionRefusedError and OSError and IndexError as e:
                print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client: -bash :', e, 'Restart client')


    def put(self, action,file_name):
        '''
        客户端上传函数
        :param cmd: 上传命令
        :return:
        '''
        if os.path.exists(self.file_path) and os.path.isfile(self.file_path):
            cmd = self.action + " " + self.file_name
            self.client.send(cmd.encode())
            self.client.recv(1024).decode()
            trans_size = 0
            file_size = os.stat(self.file_path).st_size
            if file_size == 0 :
                print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client:-bash: %s:'
                      % self.file_name, 'file not allow null')
            else:
                n = 0
                with open(self.file_path, 'rb') as f:
                    for line in f:
                        self.client.send(line)
                        trans_size += len(line)
                    else:
                        time.sleep(0.5)
                        print("\n文件上传完成。 文件大小:[%s]字节" %trans_size)
                        self.client.send(b'put done(status:200)')
        else :
            print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client:-bash: %s:'
                  % self.file_name, 'file not found')


    def get(self,action,file_name):
        '''
        客户端下载函数
        :param cmd: 下载命令
        :return:
        '''
        cmd = self.action + " " + self.file_name
        os.chdir(setting.client_download_path)#切换到客户端下载目录
        self.client.send(cmd.encode())
        data = self.client.recv(1024)
        file_msg = json.loads(data.decode())
        file_status = file_msg['status']
        file_name = file_msg['filename']
        if file_status == 550:
            print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client:-bash: %s:'
                  % self.file_name, 'file not found')
        elif file_status == 200:
            receive_size = 0
            file_size = file_msg['size']
            new = random.randint(1, 100000)
            n = 0
            with open(file_name+ '.'+ (str(new)), 'wb') as file_object:
                while receive_size < file_size:
                    data = self.client.recv(1024)
                    file_object.write(data)
                    receive_size += len(data)
                    file_object.flush()
                else:
                    file_object.close()
                    print(time.strftime("%Y-%m-%d %X", time.localtime()),
                          "[+]client: -bash :File get done File size is :", file_size)

    def connect(self,ip,port):
        '''
        connect ip,port
        :param ip:IP地址
        :param port:端口
        :return:
        '''
        self.client.connect((ip, port))
View Code

selectors_server.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao

import os
import json
import sys
import random
import time
import select
import socket
import queue

BASE_DIR  = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from conf import setting

class select_ftp(object):
    """Ftp server"""
    def __init__(self, ip, port):
        '''
        构造函数
        :param ip: 监听IP
        :param port: 监听端口
        :return:
        '''
        self.server = socket.socket()
        self.host = ip
        self.port = port
        self.msg_dic = {}
        self.inputs = [self.server,]
        self.outputs = []
        self.file_flag = {}
        self.file_up_flag = {}


    def start(self):
        '''
        主启动函数
        :return:
        '''
        self.server.bind((self.host,self.port))
        self.server.listen(1000)
        self.server.setblocking(False)
        while True:
            readable, writeable, exceptional = select.select(self.inputs, self.outputs, self.inputs)  # 定义检测
            for r in readable:
                self.readable(r)
            for w in writeable:
                self.writeable(w)
            for e in exceptional:
                self.clean(e)


    def readable(self, ser):
        '''
        处理活动的从客户端传来的数据连接
        :param ser: socket server自己
        :return:
        '''
        if ser is self.server:
            conn, addr = self.server.accept()
            print(time.strftime("%Y-%m-%d %X", time.localtime()), ': echoing: newlink ',addr)
            self.inputs.append(conn)
            self.msg_dic[conn] = queue.Queue()
        else:
            try :
                data = ser.recv(1024)
                cmd = data.decode()
                cmd_str = cmd.split()[0]
                if len(cmd.split()) == 2 and hasattr(self, cmd_str):
                    print(time.strftime("%Y-%m-%d %X", time.localtime()), ': echoing: newlink ', cmd)
                    filename = cmd.split()[1]
                    func = getattr(self, cmd_str)
                    func(ser, filename)
                else:
                    self.upload(ser, data)
            except ConnectionResetError as e:
                print(time.strftime("%Y-%m-%d %X", time.localtime()), ": client lost",ser)
                self.clean(ser)
            except UnicodeDecodeError as e :
                self.upload(ser, data)


    def writeable(self, conn):
        '''
        处理活动的传回客户端的数据连接
        :param conn: 客户端连接
        :return:
        '''
        try :
            data_to_client = self.msg_dic[conn].get()
            conn.send(data_to_client)
        except Exception as e :
            print(time.strftime("%Y-%m-%d %X", time.localtime()), ': error client lost')
            self.clean(conn)
            del self.file_flag[conn]
        else:
            self.outputs.remove(conn)
            filename = self.file_flag[conn][2]
            size = self.file_flag[conn][0]
            trans_size = self.file_flag[conn][1]
            if trans_size < size :
                self.load(conn, filename, size)
            else:
                del self.file_flag[conn]


    def clean(self, conn):
        '''
        连接完成,收尾处理
        :param conn: 客户端连接
        :return:
        '''
        if conn in self.outputs:
            self.outputs.remove(conn)
        if conn in self.inputs:
            self.inputs.remove(conn)
        if conn in self.msg_dic:
            del self.msg_dic[conn]


    def put(self, conn, filename):
        '''
        客户端上传函数
        :param conn:
        :param filename:
        :return:
        '''
        os.chdir(setting.upload_path)
        if filename == "done(status:200)":
            del self.file_up_flag[conn]
        else :
            if os.path.isfile(filename):
                try:
                   new = random.randint(1, 100000)
                   self.rename(filename, (filename + '.' + str(new)))
                except FileExistsError:
                    os.remove(filename)

            print(time.strftime("%Y-%m-%d %X", time.localtime()), ': server recv download data')
            conn.send(b'200')
            self.file_up_flag[conn] = filename


    def upload(self, conn, data):
        '''
        客户端上传,数据接收函数
        :param conn: 客户端连接
        :param data: 客户端上传数据
        :return:
        '''
        os.chdir(setting.upload_path)
        if conn in self.file_up_flag:
            filename = self.file_up_flag[conn]
            with open(filename, 'ab') as file_object:
                file_object.write(data)



    def get(self, conn, filename):
        '''
        客户端下载函数
        :param conn:
        :param filename:
        :return:
        '''
        os.chdir(setting.download_path)
        msg_dic = {  # 下载文件信息
            "action" : "get",
            "filename" : filename,
            "size" : None,
            "status" : 550
        }
        if os.path.isfile(filename):
            size = os.stat(filename).st_size
            msg_dic['size'] = size
            msg_dic['status'] = 200
        conn.send(json.dumps(msg_dic).encode())
        if msg_dic['status'] == 200:
            self.load(conn, filename, size)


    def load(self, conn, filename, size):
        '''
        客户端下载,数据传输函数
        :param conn:
        :param filename:
        :param size:
        :return:
        '''
        if conn in self.file_flag:
            trans_size = self.file_flag[conn][1]
        else:
            trans_size = 0
        with open(filename, "rb") as f:
            f.seek(trans_size)
            data = f.readline()
            self.msg_dic[conn].put(data)
            self.outputs.append(conn)
            trans_size += len(data)
            self.file_flag[conn] = [size, trans_size, filename]

    def rename(self, old_name, new_name):
        '''
        重命名函数
        :param old_name:
        :param new_name:
        :return:
        '''
        if os.path.exists(new_name):
            os.remove(new_name)
        os.rename(old_name, new_name)
View Code

select_client.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao

import os
import sys
import json
import time
import random
import socket
import platform
BASE_DIR  = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from conf import setting


class selectors_client(object):
    """FTP 客户端"""
    def __init__(self):
        '''
        构造函数
        :param
        :return
        '''
        self.client = socket.socket()

    def start(self):
        '''
        启动函数
        :param:
        :return:
        '''
        print('login time : %s' % (time.strftime("%Y-%m-%d %X", time.localtime())))
        while True:
            try:
                self.sending_msg_list = []
                self.sending_msg = input('[root@select_ftp_client]# ')
                self.sending_msg_list = self.sending_msg.split()
                self.action = self.sending_msg_list[0]
                if len(self.sending_msg_list) == 0:
                    continue
                elif len(self.sending_msg_list) == 1:
                    if self.sending_msg_list[0] == "exit":
                        print('logout')
                        break
                    else:
                        print(time.strftime("%Y-%m-%d %X", time.localtime()),
                              '-bash : %s command not found' % self.sending_msg_list[0])
                else:
                    try:
                        if platform.system() == 'Windows':
                            self.file_path = self.sending_msg_list[1]
                            self.file_list = self.sending_msg_list[1].strip().split('\\')
                            self.file_name = self.file_list[-1]
                        elif platform.system() == 'Linux':
                            self.file_path = self.sending_msg_list[1]
                            self.file_list = self.sending_msg_list[1].strip().split('/')
                            self.file_name = self.file_list[-1]
                    except IndexError:
                        pass
                    if self.action == "put":
                        self.put()
                    elif self.action == "get":
                        self.get()
                    else:
                        print(time.strftime("%Y-%m-%d %X", time.localtime()),'[+]client:-bash: %s:'
                              %self.sending_msg_list[0], 'command not found')
            except ConnectionResetError and ConnectionRefusedError and OSError and IndexError as e:
                print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client: -bash :', e,'Restart client')
                selectors_client().start()

    def put(self):
         '''
         上传函数
         :param:cmd:上传命令
         :return:
         '''
         if os.path.exists(self.file_path) and os.path.isfile(self.file_path):
             self.file_size = os.path.getsize(self.file_path)
             data_header = {"client": {
                 "action": "put",
                 "file_name": self.file_name,
                 "size": self.file_size}}
             self.client.send(json.dumps(data_header).encode())
             print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]server: -bash : %s '
                   % self.client.recv(1024).decode())
             with open(self.file_path, 'rb') as file_object:
                 for line in file_object:
                     self.client.send(line)
                 file_object.close()
             print(self.client.recv(1024).decode())
         else:
             print(time.strftime("%Y-%m-%d %X", time.localtime()),'[+]client: -bash :%s : No such file'
                   %self.file_name)

    def get(self):
        '''
        下载函数
        :param:cmd 下载命令
        :return:
        '''
        os.chdir(setting.client_download_path)
        data_header = {"client": {
            "action": "get",
            "file_name": self.file_name,
            "size": 0}}
        self.client.send(json.dumps(data_header).encode())
        self.data = self.client.recv(1024)
        if self.data.decode() == '404':
            print(time.strftime("%Y-%m-%d %X", time.localtime()),
                  '[+]server: -bash : %s : No such file' % (self.file_path))
        else:
            print(time.strftime("%Y-%m-%d %X", time.localtime()),
                  "[+]server: -bash : File ready to get File size is :", self.data.decode())
            new = random.randint(1, 100000)
            file_object = open((self.file_name + '.' + (str(new))), 'wb')
            received_size = 0
            file_size = int(self.data.decode())
            while received_size < file_size:
                if file_size - received_size > 1024:
                    size = 1024
                elif file_size < 1024:
                    size = file_size
                else:
                    size = file_size - received_size
                recv_data = self.client.recv(size)
                received_size += len(recv_data)
                file_object.write(recv_data)
            else:
                file_object.flush()
                file_object.close()
                time.sleep(0.1)
                print(time.strftime("%Y-%m-%d %X", time.localtime()),
                      "[+]client: -bash :File get done File size is :", file_size)

    def connect(self, ip, port):
        '''
        链接函数
        :param ip:
        :param port:
        :return:
        '''
        self.client.connect((ip, port))
View Code

select_server.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao

import os
import sys
import json
import selectors
import socket
import time
import errno
import random

BASE_DIR  = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from conf import setting
sel = selectors.DefaultSelector()

class selectors_ftp(object):
    '''selectors_ftp服务端'''
    def __init__(self):
        '''
        构造函数
        '''
        self.sock = socket.socket()

    def upload(self,conn,mask):
        '''
        服务器upload函数
        :param conn:
        :param mask
        :return:
        '''
        os.chdir(setting.upload_path)
        self.conn.send(b'Server receive upload %s request'%self.file_name.encode())
        new = random.randint(1, 100000) #并发测试使用random生成新文件名
        file_object = open((self.file_name+'.'+(str(new))), 'wb')
        received_size = 0
        while received_size < self.file_size:
            try:
                if self.file_size - received_size > 1024:
                    size = 1024
                elif self.file_size < 1024:
                    size = self.file_size
                else:
                    size = self.file_size - received_size
                recv_data = conn.recv(size)
                received_size += len(recv_data)
                file_object.write(recv_data)
            except BlockingIOError as e:
                if e.errno != errno.EAGAIN:
                    raise
            else:
              time.sleep(0.00001)
            #   #print(received_size, file_size)
        else:
            file_object.close()

    def download(self,conn,mask):
        '''
        服务器下载函数
        :param conn:
        :param mask:
        :return:
        '''
        while True:
            os.chdir(setting.download_path)
            if os.path.isfile(self.file_name) and os.path.exists(self.file_name):
                try:
                    file_size = os.path.getsize(self.file_name)
                    self.conn.send(str(file_size).encode())
                    client_file_size = 0
                    with open(self.file_name, "rb") as file_obj:
                        for line in file_obj:
                            client_file_size += len(line)  # 记录已经传送的文件大小
                            self.conn.sendall(line)
                    file_obj.close()
                    if client_file_size >= int(file_size):  # 文件传送完毕
                        break
                except BlockingIOError as e:
                    if e.errno != errno.EAGAIN:  # errno.EAGAIN 缓冲区满 等待下
                        raise
                else:
                    time.sleep(0.00001)  # 等待0.1s进行下一次读取
            else:
                conn.send(b'404')
                break

    def accept(self,sock,mask):
        '''
        服务器监听函数
        :param sock:
        :param mask:
        :return:
        '''
        self.conn, self.addr = sock.accept()
        print(time.strftime("%Y-%m-%d %X", time.localtime()), ': accepted',self.conn,'from', self.addr, mask)
        self.conn.setblocking(False)
        sel.register(self.conn, selectors.EVENT_READ, self.read)

    def read(self,conn,mask):
        '''
        服务器读取命令信息函数
        :param conn:
        :param mask:
        :return:
        '''
        self.data = conn.recv(1024)
        if self.data:
            self.data_receive = json.loads(self.data.decode())
            self.action = self.data_receive['client']['action']
            self.file_name = self.data_receive['client']['file_name']
            self.file_size = self.data_receive['client']['size']
            print(time.strftime("%Y-%m-%d %X", time.localtime()), ': echoing', repr(self.data), 'to', self.conn, mask)
            if self.action == 'put':
                self.upload(self.conn, mask)
                conn.send(b'[+]server: -bash : Server receive upload %s done ' % self.file_name.encode())
                print(time.strftime("%Y-%m-%d %X", time.localtime()), ': client :', self.addr,
                      ': upload %s done' % self.file_name)
            elif self.action == 'get':
                self.download(self.conn, mask)
                print(time.strftime("%Y-%m-%d %X", time.localtime()), ': client :', self.addr,
                      ': download %s done' % self.file_name)
        else:
            print(time.strftime("%Y-%m-%d %X", time.localtime()), ': closing:', self.conn, mask)
            sel.unregister(conn)
            conn.close()

    def register(self,sock):
        '''
        注册函数
        :return:
        '''
        sel.register(self.sock, selectors.EVENT_READ, self.accept)
        while True:
            events = sel.select()
            for key, mask in events:
                callback = key.data
                callback(key.fileobj,mask)


    def start(self,ip,port):
        '''
        启动函数
        :return:
        '''
        self.sock.bind((ip,port))
        self.sock.listen(500)
        self.sock.setblocking(False)
        self.register(self.sock)
View Code

server_main.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
'''server端交互程序'''
import os
import sys
import time

BASE_DIR  = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from core import selectors_server
from core import select_server

class server_ftp(object):
    '''ftp_server交互程序'''
    def start(self):
        '''
        启动函数
        :return:
        '''
        print('欢迎进入Select Ftp')
        msg = '''
        1.selectors  服务端
        2.select     客户端
        3.exit       退  出
        '''
        while True:
            print(msg)
            user_choice = input('请选择操作>>>:')
            if user_choice == '1':
                server = selectors_server.selectors_ftp()
                print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]selectors server ftp already work ')
                server.start("localhost", 10000)
            elif user_choice == '2':
                server = select_server.select_ftp("localhost",10000)
                print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]select server ftp already work ')
                server.start()
            elif user_choice == '3' or user_choice== 'q'or user_choice == 'exit':
                sys.exit('程序退出')
            else:
                print('非法的操作,请重新输入')
View Code

 

程序测试样图

Windows 有没有类似ulimit的文件不太清楚默认是有限制链接的,Linux是可以修改的

Windows : Win10

500链接测试命令返回效果

Linux  (VMware) Centos6.7

ulimit文件

1W链接测试命令返回效果

5W链接测试命令返回效果

推荐阅读