首页 > 技术文章 > 四、Python-Redis、加密和解密、mock接口开发、网络请求

krystal-xiao 2020-09-19 00:07 原文

(一)Redis

1、数据库分为关系型数据库和非关系型数据库:

(1)关系型数据库分为:MySQL、Oracle、SQL Server、SQLite...

  数据库

  表

  SQL语句

(2)非关系型数据库分为:NoSQL(Redis、MongoDB...)

  key-value

  Redis:存在内存里面,做缓存用。

  MongoDB:放在磁盘里面。

 

2、Redis(Remote Dictionary Server):(摘自:http://www.redis.cn/)

  Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。 它支持多种类型的数据结构,如 字符串(strings), 散列(hashes), 列表(lists), 集合(sets), 有序集合(sorted sets) 与范围查询, bitmaps, hyperloglogs 和 地理空间(geospatial) 索引半径查询。 Redis 内置了 复制(replication),LUA脚本(Lua scripting), LRU驱动事件(LRU eviction),事务(transactions) 和不同级别的 磁盘持久化(persistence), 并通过 Redis哨兵(Sentinel)和自动 分区(Cluster)提供高可用性(high availability)。

 

3、Redis与其他key-value缓存产品的特点:

(1)Redis支持数据的持久化,将内存中的数据保存在磁盘中,重启后再次可以加载使用;

(2)Redis不仅支持简单的key-value类型的数据,还支持list、set、zset、hash数据结构存储;

(3)Redis支持数据备份,master-slave模式的数据备份。

 

4、根据自己的需求到官网下载安装:https://redis.io/download

 

5、Redis 命令参考:http://doc.redisfans.com/

 

6、数据结构:

(1)String:字符串,最基本的数据类型,最大能存储512MB。

(2)Hash:散列,是一个string类型的field和value的映射表,特别适合用于存储对象。(存储、读取、修改用户属性)

(3)List:列表,是简单的字符串类别,按照插入顺序排序。

(4)Set:集合,是string类型的无序集合,通过hash表实现的。

(5)Sorted Set:有序集合,是string类型元素的集合,且不允许重复的成员。

 

7、连接Redis:如图所示,填写自己的配置:

(1)配置信息填写:

 

 

(2)测试连接:

 

 

(3)查看当前库里的信息,都是key-value 结构:

 

 

8、使用Python来操作Redis-String类型,相当于字典类型

(1)Python中使用connection连接Redis

import redis
r = redis.Redis(host='118.24.3.40',
                password='HK139bc&*',
                port=6379,
                db=2,
                decode_responses=True
                )

 

(2)Python中利用跳板机ssh远程连接redis

from sshtunnel import SSHTunnelForwarder # ssh连接库
import redis # redis模块
server = SSHTunnelForwarder(
        ssh_address_or_host= , # ssh地址
        ssh_username=     , # ssh连接的用户名                    
        ssh_password=  ,  # ssh连接的用户名
        remote_bind_address=('远程机器地址', 端口号))

server.start()
r=redis.Redis(host='redis地址', port=server.local_bind_port, decode_responses=True)
server.close() #关闭连接

 

(3)string类型:添加数据

import redis
r = redis.Redis(host='118.24.3.40',
                password='HK139bc&*',
                port=6379,
                db=2,
                decode_responses=True
                )
#string
r.set('iphone','{"price":6999,"count":50}')

执行这个程序后,去Redis客户端查看是否有该数据存在:

 

(4)string类型:获取数据

r.set('students','{"name":John,"age":15,"hobby":"playing Games"}')
print(r.get('students'))

  执行结果为:

 

(5)string类型:删除数据

import redis
r = redis.Redis(host='118.24.3.40',
                password='HK139bc&*',
                port=6379,
                db=2,
                decode_responses=True
                )
r.set('矿泉水','{"price":10.5,"count":20}')
data = r.get('矿泉水')
r.delete('矿泉水')

 

(6)string类型:修改数据

import redis
r = redis.Redis(host='118.24.3.40',
                password='HK139bc&*',
                port=6379,
                db=2,
                decode_responses=True
                )
r.set('矿泉水','{"price":10.5,"count":20}')
r.set('矿泉水','xxxx')

 

(7)bytes:字节类型(上传一个图片或者下载一个图片会遇到这个字节类型)

# bytes类型通过wb写入
f = open('a.jpg','wb')
f.write('xxx')

         bytes类型和字符串类型相互转换

data = r.get('矿泉水')
data.decode()  # bytes类型转换成字符串
s='hello'
s.encode()     # 字符串转换成bytes类型

 

(8)Python中设置过期时间:(指定某个key的过期时间)

expire_time = 60 * 60 * 24
r.set('Bob_session','sdgx312vsdrq',expire_time)

Redis客户端过期时间显示:

 

 

 9、使用Python来操作Redis-hash类型,相当于二维字典

(1)Hash添加数据

r.hset('salary','Ann','{"money":2500}')
r.hset('salary','Sarah','{"money":5500}')
r.hset('salary','Sun','{"money":3860}')
r.hset('salary','Beans','{"money":7800}')

  Redis客户端查看新增的数据:

 

(2)Hash获取数据

# 获取单个数据
print(r.hget('salary','Ann'))

# 获取所有的数据
int(r.hgetall('salary'))

 

(3)Hash-当含有bytes类型,需要变成正常的字典类型

d = r.hgetall('students')
new_d  = {}
for k,v in d.items():
     new_d[k.decode()] = v.decode()
print(new_d)

  从根源上解决Bytes类型问题:

r = redis.Redis(host='118.24.3.40',
                password='HK139bc&*',
                port=6379,
                db=2,
                decode_responses=True     # 解决Bytes类型问题
                )

 

(4)Hash-修改数据

(5)Hash-删除数据

r.hdel('salary','Bob')

 

(6)设置过期时间

# 指定某个key的过期时间
r.expire('salary',1000) 

 

10、String和Hush(Redis客户端展示)

(1)String

 

 (2)Hash

(3)从Redis取数据:Key是什么,Key的类型是什么

(4)删掉大Key值:

r.delete('students')

 

11、Redis其他命令:

# 获取当前数据库有多少key
print(r.keys())

# 获取具体的某个开头的key进行匹配
print(r.keys('s*'))

#查看key的类型
print(r.type('students'))

# 清空当前数据库里面所有的key
print(r.flushdb()) 

# 清空所有数据库里面所有的key
print(r.flushall())

#判断Key存不存在
print(r.exists('students'))    

 

12、Redis-管道:批量操作

(1)Python中建立管道,批量执行操作

p= r.pipeline()  #建立管道

p.hset('salary','Ann','{"money":2500}')
p.hset('salary','Sarah','{"money":5500}')
p.hset('salary','Sun','{"money":3860}')
p.hset('salary','Beans','{"money":7800}')

# 执行,返回一个list,这个list里面是每个命令执行的结果
p.execute() 

 

(2)使用管道和不用管道对比时间:

import redis
import time

r = redis.Redis(host='118.24.3.40',
                password='HK139bc&*',
                port=6379,
                db=2,
                decode_responses=True
                )
# 不用管道:
start_time =time.time()
for i in range(100):
    r.set('key%s'%i,'%s'%i)
print('不用管道的时间',time.time() - start_time)

# 使用管道:
start_time =time.time()
p = r.pipeline()
for i in range(100):
    p.set('pipeline_key%s'%i,'%s'%i)
p.execute()
print('用管道的时间',time.time() - start_time)

执行最后的结果为:

 

13、Python里新增数据带有文件夹,key里有冒号:就会有文件夹

r.set('product:water','{"count":1,"price":5}')
r.set('product:apple','{"count":1,"price":15}')
r.set('product:melon','{"count":1,"price":35}')

  在Redis客户端中可以查看到添加的文件夹里数据信息

 

 

14、当前服务器需要向另外一个服务器进行Redis数据迁移

编程分析:

 

r = redis.Redis(host='118.24.3.40',
                 password='HK139bc&*',
                 port=6379,
                 db=14,
                 decode_responses=True
                 )

r2 = redis.Redis(host='118.24.3.40',
                 password='HK139bc&*',
                 port=6378,
                 db=14,
                 decode_responses=True
                 )
p = r2.pipeline()

for k in r.keys():
     key_type = r.type(k)
     if key_type == 'string':
         value = r.get(k)
         p.set(k,value)
     elif key_type =='hash':
         hash_data = r.hgetall(k) # {'XX':XXX}
         for field,data in hash_data.items():
             p.hset(k,field,data)
p.execute()

 

15、若需要多个数据库操作,需要循环数据库:(一般用不到)

for i in range(17):
    r = redis.Redis(host='118.24.3.40',
                    password='HK139bc&*',
                    port=6379,
                    db=i,
                    decode_responses=True
                    )
    r2 = redis.Redis(host='118.24.3.40',
                    password='HK139bc&*',
                    port=6379,
                    db=i,
                    decode_responses=True
                    )

 

(二)加密和解密

1、加密,md5是一种不可逆,同时也是常见的加密方式,需要导入hashlib模块:

(1)实例1代码:

import hashlib

s = '123456'

# 对字符串进行加密
m = hashlib.md5(s) 

 # 加密后的结果用二进制表示 
result = m.hexdigest() 
print(result)

 

(2)执行结果报错,这是因为MD5必须传Bytes类型的:hashlib.md5()

 

 (3)解决方法:将字符串转换成bytes类型

#导入hashlib模块
import hashlib

# 定义字符串
s = '123456'

# 将字符串转换成bytes类型
s =s.encode()

# 对字符串进行加密
m = hashlib.md5(s) 

# 加密后的结果用二进制表示 
result = m.hexdigest()  
print(result)

执行结果为:

 

(4)同样的字符串,MD5出来的结果都一样

 

(5)加盐,例如设置用户表

import hashlib

# 密码+随机字符串(随机字符串就叫盐值)
s = '123=123' + 'djsjks51@#$$' 

# 转换成bytes
s = s.encode()   
m = hashlib.md5(s)  # bytes,不可逆
result = m.hexdigest()
print(result)

 

(6)定义函数

def my_md5(s):
    s = str(s)
    s = s.encode()
    m = hashlib.md5(s)
    result = m.hexdigest()
    return result

 

2、加密主要有两种方式:对称加密和非对称加密

(1)对称加密:对称加密算法在加密和解密时使用的是同一个秘钥。

对称加密的模式是:甲方选择某一种加密规则,对信息进行加密;乙方使用同一种规则,对信息进行解密;

客户端和服务端进行通信,采用对称加密,若使用同一个秘钥很容易破解,若使用不同的秘钥,秘钥的管理和传输成本又较高;

(2)非对称加密:非对称加密算法需要两个秘钥来进行加密和解密,分别为:公开密码(公钥)和私有秘钥(私钥)。

乙方生成两把密钥(公钥和私钥)。公钥是公开的,任何人都可以获得,私钥是保密的。

甲方获取乙方的公钥,然后用它对信息加密。

(3)http和https

常规的http请求,明文传播;

https可以认为是:http + TLS ,TLS是传输层加密协议,前身是SSL协议

 

3、解密(撞库解密)

(1)拖库与撞库:

 拖库是指黑客盗取了网站的数据库。撞库是指黑客用拖库获得的用户名和密码在其它网站批量尝试登陆,进而盗取更有价值的东西。由于一些用户在多个网站用相同的用户名和密码,所以撞库是有一定成功率的。要想撞库,必须得知道密码的明文,也就是用户真正输入的密码。

(2)MD5实际上只能破解比较简单的密码

 

 4、Base64加密:是可以破解的

(1)例如百度搜索“动画片”url中里面是转译密文

https://www.baidu.com/s?wd=%E5%8A%A8%E7%94%BB%E7%89%87&rsv_spt=1&rsv_iqid=0xcc0d32ae00192d4f&issp=1&f=8&rsv_bp=1&rsv_idx=2&ie=utf-8&rqlang=cn&tn=baiduhome_pg&rsv_enter=0&rsv_dl=tb&oq=%25E5%258A%25A0%25E5%25AF%2586%25E6%2592%259E%25E5%25BA%2593&rsv_t=298fJqJmONwDQA1vi18GP009wC%2Fh67nlOij54DtqzQ7X6efhzWNd7AcKIyB6FO0SBNyM&rsv_btype=t&inputT=10055&rsv_pq=b07987030011a206&rsv_sug3=80&rsv_sug1=32&rsv_sug7=101&bs=%E5%8A%A0%E5%AF%86%E6%92%9E%E5%BA%93

(2)Base64使用:

import base64
# 一般用于传输数据过程中
# 加密
s = 'sjfkjdkjxh1145646你好'
r = base64.b64encode(s.encode())
result =r.decode()
print(result)

# 解密
r = base64.b64decode('c2pma2pka2p4aDExNDU2NDbkvaDlpb0=')
print(r.decode)

 

(三)mock接口开发:开发接口,模拟数据

1、Python中可安装flash模块或fastapi模块:

# 建议安装FastAPI接口
pip install fastapi

# 若安装了FastAPI,还需要安装一个ASGI服务器,用于生产如Uvicorn或Hypercorn
pip install uvicorn

# 或者安装Flask
pip install Flask

 

2、FastAPI (摘自:https://fastapi.tiangolo.com/)

FastAPI是一种现代、快速(高性能)的Web框架,快速编码,更少的错误,直观简易,短而健壮,基于API的开放标准:OpenAPI(旧称Swagger)和JSON Schema。

 

3、如何mock接口开发:

(1)例如银行姓名需要查询产品、开户接口

# Product 
{
"code":0,
"data":[
{"id":1,"product_name":"XXX"},
{"id":2,"product_name":"XXX"},
{"id":3,"product_name":"XXX"},
{"id":4,"product_name":"XXX"},
{"id":5,"product_name":"XXX"}
]
}

 

(2)使用fastapi编写接口

import fastapi   # 自动生成一个API接口文档
import uvicorn

# 启动一个服务
server = fastapi.FastAPI()

@server.get('/login')
def login(username:str,password:str):
    return {'username':username,'password':password}
uvicorn.run(server,port=8800,debug=True)

执行结果如下:

 

 (3)复制url在浏览器打开,添加参数:http://127.0.0.1:8800/login

 

 (4)在http://127.0.0.1:8800/login,再次补充其他参数:http://127.0.0.1:8800/login?username=Ann&password=123456

 

 (5)定义一个不加参数的接口:

@server.get('/test')
def test():
    return {'msg':'hello world!'}
uvicorn.run(server,port=8800,debug=True)

  在浏览器打开输入url:http://127.0.0.1:8800/test,就可以看到无参数的接口信息

 

 (6)查看接口文档:http://127.0.0.1:8800/docs(根据设置的端口进行查看)

 

 (7)在接口文档中修改参数进行执行

 

 执行结果如下:

 

(8)定义一个product接口:

import fastapi
import uvicorn
server = fastapi.FastAPI()

@server.get('/product')
def test():
    return {
        'code':0,
        'data':[
            {'product_name': '基金1号', 'status': 0},
            {'product_name': '基金2号', 'status': 2},
            {'product_name': '基金3号', 'status': 1},
            {'product_name': '基金4号', 'status': 0},
            {'product_name': '基金5号', 'status': 1}
        ]
    }

 

执行结果是:

   

注: http://127.0.0.1指的是本机的IP地址

若想别人访问你的地址需要加上host:,例如:

# 自己访问自己,无需加host

uvicorn.run(server,port=8800,debug=True)

# 若别人访问你的地址,需要加上host

uvicorn.run(server,port=8800,debug=True,host='0.0.0.0')

# http://127.0.0.1:8800/product
# http://192.168.1.xx:8800/product

 

 (9)定义一个支付pay的接口

import fastapi
import uvicorn
server = fastapi.FastAPI()

@server.get('/pay')
def pay(money:float,status:str):
    if status == '0':
        return {'code':1, 'status':'fail'}
    elif status=='1':
        return {'code':0, 'status':'success','balance':money}

uvicorn.run(server,port=8800,debug=True)

 

打开浏览器,url中输入:http://127.0.0.1:8800/pay?money=1&status=0,如图所示:

 

  

pay接口中,status加上默认值1:

@server.get('/pay')

# status加上默认值1
def pay(money:float,status='1'):
    if status == '0':
        return {'code':1, 'status':'fail'}
    elif status=='1':
        return {'code':0, 'status':'success','balance':money}

uvicorn.run(server,port=8800,debug=True)

 

接口文档中显示默认值1:

 

 

  (10)定义一个注册接口

@server.post('/register')
def register(username:str,password:str,cpassword:str):
    if username.strip() and password.strip() and cpassword.strip():
        if password.strip() != cpassword.strip():
            return {'code':-1, 'msg':'两次输入的密码不一致!'}
        else:
            sql = 'select * from app_myuser where username="%s";'%username
            if tools.execute_sql(sql):
                return {'code':-1, 'msg':'用户已经存在!'}
            else:
                p = tools.my_md5(password)
                insert_sql = 'insert into app_myuser (username,password) values ("%s","%s");'%(username,p)
                tools.execute_sql(insert_sql)
                return {'code':0, 'msg':'注册成功!'}
    else:
        return {'code':-1, 'msg':'必填参数不能为空!'}

 

前往postman应用中,进行接口测试:

 

 (11)定义一个登录接口:

@server.post('/login')
def login(username:str=Form(...),password:str=Form(...)):
    if username.strip() and password.strip():
        p = tools.my_md5(password)
        sql = 'select * from app_myuser where username="%s" and password="%s";' % (username,p)
        if tools.execute_sql(sql):
            return {'code': 0, 'msg': '登录成功!'}
        else:
            return {'code':-1, 'msg':'输入的账号/密码不存在!'}
    else:
        return {'code':-1, 'msg':'必填参数不能为空!'}

 

 4、flask:开发接口

(1)安装flask模块:

pip install flask

 

(2)用flask定义一个登录接口:

# flask 是一个轻量级web开发框架
import flask

import tools

# 因为flask返回的是一个字符串,故需要导入JSON模块
import json
server = flask.Flask(__name__)


@server.route('/login',methods=['post','get'])
def login():
    username = flask.request.values.get('username')
    password = flask.request.values.get('password')

    # flask.json.get('xxxx')           # 如果入参是json类型的话,使用这样的方式
    # flask.request.cookies.get('xxx') # 获取cookie里面的数据
    # flask.request.headers.get('xx')
    # flask.request.files.get("xxx")   # 文件

    if username.strip() and password.strip():
        p = tools.my_md5(password)
        query_sql = 'select * from app_myuser where username= "%s" and passwd="%s";' % (username, p)
        print(query_sql)
        if tools.execute_sql(query_sql):
            return json.dumps({'code': '0', 'msg': '登录成功'},ensure_ascii=False)
        else:
            return json.dumps({'code': '-1', 'msg': '输入的用户名/密码错误'})
    else:
        return json.dumps({'code': '-1', 'msg': '不能为空'})


server.run(host='0.0.0.0',port=8999,debug=True)

 

(3)定义一个注册接口:

import flask

import tools

# 因为flask返回的是一个字符串,故需要导入JSON模块
import json
server = flask.Flask(__name__)

@server.route('/reg',methods=['post','get'])
def reg():
    username = flask.request.values.get('username')
    password = flask.request.values.get('password')
    cpassword = flask.request.values.get('cpassword')
    if username.strip() and password.strip() and cpassword.strip():
        if password.strip() != cpassword.strip():
            return json.dumps({'code': -1, 'msg': '两次输入的密码不一样'})
        else:
            sql='select * from app_myuser where username="%s";'%username
            if tools.execute_sql(sql):
                return json.dumps({'code':-1,'msg':'用户已经存在'})
            else:
                p = tools.my_md5(password)
                insert_sql = 'insert into app_myuser (username,passwd) value ("%s","%s");'%(username,p)
                tools.execute_sql(insert_sql)
                return json.dumps({'code':0,'msg':'注册成功!'},ensure_ascii=False)

    else:
        return json.dumps({'code':-1,'msg':'必填参数不能为空'})


server.run(host='0.0.0.0',port=8999,debug=True)

 

(四)网络请求:做接口自动化

1、Python自带的模块发送网络请求:urllib

(1)安装urllib模块

import urllib

 

(2)urllib模块:get请求

from urllib import request
import json

url = 'http://127.0.0.1:8999/login?username=Bob&password=123456'

# get请求
req = request.urlopen(url)

dic = json.loads(req.read().decode())

print(dic)

 

(3)urllib模块:post请求

# post请求
from urllib import request
from urllib.parse import urlencodeimport json

url = 'http://127.0.0.1:8999/login'
data ={'username':'Ann01','password':'123456'}
req = request.urlopen(url,urlencode(data).encode())
dic = json.loads(req.read().decode())
print(dic)

 

2、requests模块

(1)接口测试的请求方式:get、post、传cookie、传文件、传json、传headers

(2)安装requests模块

pip install requests

 

(3)使用requests查看请求

import requests
import json

url = 'http://127.0.0.1:8999/login'
data ={'username':'Ann01','password':'123456'}

r = requests.get(url,data)

# 字典类型
print(r.json)

# 字符串格式
print(r.text)

# bytes类型的
print(r.content)

#返回的状态码:200、404、500、502
print(r.status_code)

 

(4)get请求方式(根据使用情况选择不同的方法):

# 字典格式,为了获取某个字段的值:d.get('XXX')
r.json

# 字符串格式:接口返回结果不处理,直接存入库里或者json文件里面
r.text

# bytes类型,例如下载一个图片
r.content

#返回的状态码
r.status_code

 

(5)post请求方式:

r = requests.post(url,data)

print(r.json())

print(r.text)

print(r.content)

print(r.status_code)

 

(6)post请求url中,有时会遇到url带有参数,这时候需要:

r = requests.post(url,data,params={"version":1.0)
# params是把参数传到url后头的

 

(7)传cookie:

# url=http://www.nnzhp.cn/
# cookie:PHPSESSID=8a97d6ac860319bf067a674b8b5a8e34

cookie = {'PHPSESSID':'8a97d6ac860319bf067a674b8b5a8e34'}
r = requests.post(url,data=data,params={"version":1.0},cookies=cookie)

 

(8)传headers:

headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.102 Safari/537.36','cookie':'PHPSESSID':'8a97d6ac860319bf067a674b8b5a8e34'}
r = requests.post(url,data=data,params={"version":1.0},headers=headers)

 

(9)传json和传文件

# 传json

r = requests.post(url,json=data)
print(r.json())

# 传文件

url = 'http://api.nnzhp.cn/api/file/file_upload'
data = {'file':open('XXX.xls','rb')} 
r = requests.post(url,files=data)

 

(10)下载文件

#下载文件

r = requests.get('https://ss0.bdstatic.com/70cFuHSh_Q1YnxGkpoWK1HF6hhy/it/u=3086278442,1750390944&fm=26&gp=0.jpg')

f = open('dog.jpg','wb')
print(r.content)
f.close()

 

注:有时会遇到https的报错,这时需要:

r = requests.get('https://ss0.bdstatic.com/70cFuHSh_Q1YnxGkpoWK1HF6hhy/it/u=3086278442,1750390944&fm=26&gp=0.jpg',verify=False)

 

推荐阅读