首页 > 技术文章 > flask扩展

xiongying4 2019-11-12 21:14 原文

一、严格模式

二、重定向

三、自定义URL正则表达式

 添加到flask中
 我们要用自定义的路由,用正则的话

1.导入from werkzeug.routing import BaseConverter
2.我先要写一个类,然后继承BaseConverter,然后实现__inti__, def to_python(self, value):to_url(self, value)
3.app.url_map.converters['谁便'] = RegexConverter
4.我们在路由里面@app.route('/index/<regex1("\d+"):nid>'),regex1='谁便,regex1("正则表达式")
5.regex1("正则表达式")匹配出来的结果,返回to_python,一定要return
6.当我们做反向解析的解析的时候,我们的参数,会传递给to_url,return的结果才是我们拼接到我们路由上
from flask import Flask, views, url_for
from werkzeug.routing import BaseConverter

app = Flask(import_name=__name__)

class RegexConverter(BaseConverter):
    """
    自定义URL匹配正则表达式
    """
    def __init__(self, map, regex):
        super(RegexConverter, self).__init__(map)
        self.regex = regex

    def to_python(self, value):
        """
        路由匹配时,匹配成功后传递给视图函数中参数的值
        """
        #value就正则匹配出来的结果
        print('value',value,type(value))
        return "asdasdasd"

    def to_url(self, value):
        """
        使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
        """
        val = super(RegexConverter, self).to_url(value)
        print(val)
        return val

app.url_map.converters['regex1'] = RegexConverter
@app.route('/index/<regex1("\d+"):nid>',endpoint="sb")
def index(nid):
    print("nid",nid,type(nid))
    print(url_for('sb', nid='888'))
    # /index/666
    return 'Index'

if __name__ == '__main__':
    app.run()

四、请求与响应

from flask import Flask
    from flask import request
    from flask import render_template
    from flask import redirect
    from flask import make_response

    app = Flask(__name__)


    @app.route('/login.html', methods=['GET', "POST"])
    def login():

        # 请求相关信息
        # request.method  提交的方法
        # request.args  get请求提及的数据
        # request.form   post请求提交的数据
        # request.values  post和get提交的数据总和
        # request.cookies  客户端所带的cookie
        # request.headers  请求头
        # request.path     不带域名,请求路径
        # request.full_path  不带域名,带参数的请求路径
        # request.script_root  
        # request.url           带域名带参数的请求路径
        # request.base_url      带域名请求路径
        # request.url_root      域名
        # request.host_url      域名
        # request.host          127.0.0.1:500
        # request.files
        # obj = request.files['the_file_name']
        # obj.save('/var/www/uploads/' + secure_filename(f.filename))

        # 响应相关信息
        # return "字符串"
        # return render_template('html模板路径',**{})
        # return redirect('/index.html')
        #return jsonify({'k1':'v1'})

        # response = make_response(render_template('index.html'))
        # response是flask.wrappers.Response类型
        # response.delete_cookie('key')
        # response.set_cookie('key', 'value')
        # response.headers['X-Something'] = 'A value'
        # return response
        return "内容"

    if __name__ == '__main__':
        app.run()

五、session

cookie:存放在客户端的键值对
session:存放在服务端的键值对
token:存放在客户端,通过算法来校验
'''
app.session_interface这里面看
存session,
1 调用save_session,将我们的session加密的val,读取配置文件['SESSION_COOKIE_NAME']得到key
2 将1种的key,val存储到cookies

取session
1 获取request里面的cookies,获取里面key,这个key就是['SESSION_COOKIE_NAME'],值就是加密的值
2 对该值进行解密

'''

六、闪现

什么是闪现

a 产生信息,传给 c 页面
但是用户访问a 页面以后,不是直接跳转到c,而是到b,或则是其他页面,但是用户访问c页面的时候,我希望把a给我的信息拿到

-设置:flash('aaa')
-取值:get_flashed_message()
-
-假设在a页面操作出错,跳转到b页面,在b页面显示a页面的错误信息
from flask import Flask,flash,get_flashed_messages,request

app = Flask(__name__)
app.secret_key = 'asdfasdf'
#1 如果要用flash就必须设置app.secret_key = 'asdfasdf'
#2 只能取一次,在取就没有了
#3 可以通过  flash('普通信息',category="info"),对信息做分类
#4 get_flashed_messages(with_categories=True,category_filter=("error",)),with_categories以键值对的形式获取
#  设置闪现,category_filter=("error",)进行分类信息的过滤
@app.route('/index1')
def index():
    #(category="message", message))
    flash('超时错误',category="error")
    flash('普通信息',category="info")
    return "ssdsdsdfsd"
    # return redirect('/error')

@app.route('/error1')
def error1():
    return "ok"

@app.route('/error')
def error():
    data = get_flashed_messages(with_categories=True,category_filter=("error","info"))
    data1 = get_flashed_messages(with_categories=True, category_filter=("error", "info"))
    print("data1",data1)
    print("data",data)
    return "错误信息"

if __name__ == '__main__':
    app.run()
代码

 

 

 

 

#1 如果要用flash就必须设置app.secret_key = 'asdfasdf'
#2 只能取一次,在取就没有了
#3 可以通过  flash('普通信息',category="info"),对信息做分类
#4 get_flashed_messages(with_categories=True,category_filter=("error",)),with_categories以键值对的形式获取
#  设置闪现,category_filter=("error",)进行分类信息的过滤

七、请求扩展

1 before_request

类比django中间件中的process_request,在请求收到之前绑定一个函数做一些事情

#基于它做用户登录认证
@app.before_request
def process_request(*args,**kwargs):
    if request.path == '/login':
        return None
    user = session.get('user_info')
    if user:
        return None
    return redirect('/login')

2 after_request

类比django中间件中的process_response,每一个请求之后绑定一个函数,如果请求没有异常

@app.after_request
def process_response1(response):
    print('process_response1 走了')
    return response

3 before_first_request

第一次请求时,跟浏览器无关(接收的第一个请求)

@app.before_first_request
def first():
    pass

4 teardown_request

如论有没有异常都会执行,如果没有异常这个参数就是None,有就记录这个异常

@app.teardown_request 
def ter(e):
    pass

5 errorhandler

捕获异常,如果出现异常,而且状态就是@app.errorhandler(404),服务器内部错误500

@app.errorhandler(404)
def error_404(arg):
    return "404错误了"

6 template_global

标签

@app.template_global()
def sb(a1, a2):
    return a1 + a2
#{{sb(1,2)}}

7 template_filter

过滤器

@app.template_filter()
def db(a1, a2, a3):
    return a1 + a2 + a3
#{{ 1|db(2,3)}}

总结:

1 重点掌握before_request和after_request,

2 注意有多个的情况,执行顺序

3 before_request请求拦截后(也就是有return值),response所有都执行

八、中间件

 

from flask import Flask

app = Flask(__name__)

@app.route('/')
def index():
    return 'Hello World!'
# 模拟中间件
class Md(object):
    def __init__(self,old_wsgi_app):
        self.old_wsgi_app = old_wsgi_app

    def __call__(self,  environ, start_response):
        print('开始之前')
        ret = self.old_wsgi_app(environ, start_response)
        print('结束之后')
        return ret

if __name__ == '__main__':
    #1我们发现当执行app.run方法的时候,最终执行run_simple,最后执行app(),也就是在执行app.__call__方法 
    #2 在__call__里面,执行的是self.wsgi_app().那我们希望在执行他本身的wsgi之前做点事情。
    #3 所以我们先用Md类中__init__,保存之前的wsgi,然后我们用将app.wsgi转化成Md的对象。
    #4 那执行新的的app.wsgi_app,就是执行Md的__call__方法。
    #把原来的wsgi_app替换为自定义的,
    
    app.wsgi_app = Md(app.wsgi_app)
    app.run()

请求所有的流程

ctx = self.request_context(environ)
        error = None
        try:
            try:
                ctx.push()
                #根据路径去执行视图函数,视图类
                response = self.full_dispatch_request()
            except Exception as e:
                error = e
                response = self.handle_exception(e)
            except:  # noqa: B001
                error = sys.exc_info()[1]
                raise
            return response(environ, start_response)
        finally:
            #不管出不出异常,都会走这里
            if self.should_ignore_error(error):
                error = None
            ctx.auto_pop(error)

九、请求上下文源码分析

 

 

 

 

'''
1     app.__call__
2     wsgi_app(environ, start_response) 
2.1 ctx = self.request_context(environ)
    2.1.1 return RequestContext(self, environ)
        这里的self是app,environ请求相关
    2.1.2 return RequestContext(self, environ)
    得到了RequestContext的对象,而且有request属性
2.2  2.1中的ctx就是RequestContext的对象  

2.3  ctx.push()执行这个,就是RequestContext的对象的push方法
     2.3.1  #执行这个,self-->ctx
        _request_ctx_stack.push(self) 
        2.3.1.1 我们发现_request_ctx_stack = LocalStack()
        他的push方法的源码:
                def push(self, obj):
                    rv = getattr(self._local, "stack", None)
                    if rv is None:     
                        # self._local=>stack-->storage['线程id']['stack']=[ctx,]
                        self._local.stack = rv = []
                    rv.append(obj)
                    return rv


3在请求中获取request.form 3.1 request是LocalProxy的对象,当获取属性的时候会走__getattr__ def __getattr__(self, name): if name == "__members__": return dir(self._get_current_object()) #name-->form, #self._get_current_object()===>ctx.request,form #_get_current_object()---》self.__local() return getattr(self._get_current_object(), name) 3.1.1 self._get_current_object():源码:最终:partial(_lookup_req_object, "request") def _get_current_object(self): if not hasattr(self.__local, "__release_local__"): #local==>partial(_lookup_req_object, "request") #def __init__(self, local, name=None): # object.__setattr__(self, "_LocalProxy__local", local) #self.__local()===>local() return self.__local() try: return getattr(self.__local, self.__name__) except AttributeError: raise RuntimeError("no object bound to %s" % self.__name__)


4 partial(_lookup_req_object, "request")偏函数的源码 def _lookup_req_object(name): #name是request #ctx top = _request_ctx_stack.top if top is None: raise RuntimeError(_request_ctx_err_msg) #ctx-->request return getattr(top, name) 4.1中_request_ctx_stack.top @property def top(self): try: return self._local.stack[-1] except (AttributeError, IndexError): return None

十、蓝图

对程序进行目录结构划分

不使用蓝图,自己分文件

目录结构:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<h1>随便写点东西</h1>
</body>
</html>
index.html
from flask import Flask
app = Flask(__name__)
from pro import views

app.register_blueprint(views.us)  # 注册
__init__.py
from flask import Blueprint,render_template
us = Blueprint("user",__name__)
@us.route('/')
def index():

    return render_template("index.html")
views.py
from pro import app

if __name__ == '__main__':
    app.run()
manage.py

十一、g对象

专门用来存储用户信息的g对象,g的全称的为global

g对象在一次请求中的所有的代码的地方,都是可以使用的

g对象和session的区别

session对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session,但是g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次

g对象的特性

当前请求内你设置就可以取,必须先设置,后取,当前请求可以取无限次
就算你当前请求,设置了,如果不取,其他请求过来,也取不到

十二、信号

Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为

安装:

pip3 install blinker

内置信号:

request_started = _signals.signal('request-started')                # 请求到来前执行
request_finished = _signals.signal('request-finished')              # 请求结束后执行
 
before_render_template = _signals.signal('before-render-template')  # 模板渲染前执行
template_rendered = _signals.signal('template-rendered')            # 模板渲染后执行
 
got_request_exception = _signals.signal('got-request-exception')    # 请求执行出现异常时执行
 
request_tearing_down = _signals.signal('request-tearing-down')      # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
 
appcontext_pushed = _signals.signal('appcontext-pushed')            # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped')            # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed')                # 调用flask在其中添加数据时,自动触发

使用信号:

 

 

from flask import Flask,signals
app = Flask(__name__)
def func(*args,**kwargs):
    print('触发信号',args,kwargs)
# 与该信号进行绑定
signals.request_started.connect(func)
# 触发信号:signals.request_started.send()
@app.before_first_request
def before_first1(*args,**kwargs):
    print('before_first_request')
@app.before_request
def before_first2(*args,**kwargs):
    print('before_request')

@app.route('/',methods=['GET','POST'])
def index():
    print('视图')
    return '视图'
if __name__ == '__main__':
    app.run()
代码

自定义信号:

十三、flask_session

作用:将默认保存的签名cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy

安装:pip3 install flask-session

第一种使用方法:

from flask import Flask,session
from flask_session import RedisSessionInterface
import redis
app = Flask(__name__)
app.secret_key="ajksda"
conn=redis.Redis(host='127.0.0.1',port=6379)
app.session_interface=RedisSessionInterface(conn,key_prefix='jason',use_signer=True,permanent=False)
@app.route('/')
def hello_world():
    session['nb']='jason'
    return 'Hello World!'

@app.route("/index")
def index():
    print(session['nb'])
    return "ok"

if __name__ == '__main__':
    app.run()

第二种使用方法:

from flask import Flask,session
import  redis
from flask_session import Session
app = Flask(__name__)
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] =redis.Redis(host='127.0.0.1',port='6379')
app.config['SESSION_KEY_PREFIX']="jason"
Session(app)

@app.route('/')
def hello_world():
    session['sb']='jason'
    return 'Hello World!'

@app.route("/index")
def index():
    print(session['sb'])
    return "ok"

if __name__ == '__main__':
    app.run()

十四、多APP应用

from werkzeug.wsgi import DispatcherMiddleware
from werkzeug.serving import run_simple
from flask import Flask
app1 = Flask('app01')
app2 = Flask('app02')

@app1.route('/index')
def index():
    return "app01"

@app2.route('/index')
def index2():
    return "app2"
dm = DispatcherMiddleware(app1, {
    '/sec12': app2,
})
if __name__ == "__main__":

    run_simple('localhost', 5000, dm)

十五、flask-script

用于实现类似于django中 python3 manage.py runserver ...类似的命令

安装:pip3 install flask-script

from flask import Flask
from flask_script import Manager
app = Flask(__name__)
manager = Manager(app)

@app.route('/')
def index():
    return 'ok'

if __name__ == '__main__':
    manager.run()

 

 

 

 自定义命令

#第一步安装:pip3 install flask-script
from flask import Flask
from flask_script import Manager
app = Flask(__name__)
manager=Manager(app)

@app.route("/")
def index():
    return "ok"

@manager.command
def custom1(arg,a):
    """
    自定义命令
    python manage.py custom 123
    :param arg:
    :return:
    """
    print(arg,a)


@manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cmd(name, url):
    """
    自定义命令(-n也可以写成--name)
    执行: python manage.py  cmd -n lqz -u http://www.oldboyedu.com
    执行: python manage.py  cmd --name lqz --url http://www.oldboyedu.com
    :param name:
    :param url:
    :return:
    """
    print(name, url)

if __name__ == '__main__':
    manager.run()

 

 

十六、wtforms

安装:pip3 install wtforms

使用1:表单验证

from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__)

app.debug = True


class LoginForm(Form):
    # 字段(内部包含正则表达式)
    name = simple.StringField(
        label='用户名',
        validators=[
            validators.DataRequired(message='用户名不能为空.'),
            validators.Length(min=2, max=6, message='用户名长度必须大于%(min)d且小于%(max)d')
        ],
        widget=widgets.TextInput(), # 页面上显示的插件
        render_kw={'class': 'form-control'}
    )
    # 字段(内部包含正则表达式)
    pwd = simple.PasswordField(
        label='密码',
        validators=[
            validators.DataRequired(message='密码不能为空.'),
            validators.Length(min=8, message='密码长度必须大于%(min)d'),
            # validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
            #                   message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')

        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )



@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'GET':
        form = LoginForm()
        return render_template('login.html', form=form)
    else:
        form = LoginForm(formdata=request.form)
        if form.validate():
            print('用户提交数据通过格式验证,提交的值为:', form.data)
        else:
            print(form.errors)
        return render_template('login.html', form=form)

if __name__ == '__main__':
    app.run()

login.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<h1>登录</h1>
<form method="post">
    <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>

    <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
    <input type="submit" value="提交">
</form>
</body>
</html>

使用2:all表单验证

from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__, template_folder='templates')
app.debug = True



class RegisterForm(Form):

    def validate_pwd_confirm(self, field):
        """
        自定义pwd_confirm字段规则,例:与pwd字段是否一致
        :param field:
        :return:
        """
        # 最开始初始化时,self.data中已经有所有的值
        print(field.data)
        if field.data !="sb":
            #raise validators.ValidationError("sb")  # 继续后续验证
            raise validators.StopValidation("SB")  # 不再继续后续验证

        # if field.data != self.data['pwd']:
        #     raise validators.ValidationError("密码不一致") # 继续后续验证
            #raise validators.StopValidation("密码不一致")  # 不再继续后续验证

    name = simple.StringField(
        label='用户名',
        validators=[
            validators.DataRequired()
        ],
        widget=widgets.TextInput(),
        render_kw={'class': 'form-control'},
        default='tank'
    )

    pwd = simple.PasswordField(
        label='密码',
        validators=[
            validators.DataRequired(message='密码不能为空.')
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

    pwd_confirm = simple.PasswordField(
        label='重复密码',
        validators=[
            validate_pwd_confirm,
            validators.DataRequired(message='重复密码不能为空.'),
            #validators.EqualTo('pwd', message="两次密码输入不一致")
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

    email = html5.EmailField(
        label='邮箱',
        validators=[
            validators.DataRequired(message='邮箱不能为空.'),
            validators.Email(message='邮箱格式错误')
        ],
        widget=widgets.TextInput(input_type='email'),
        render_kw={'class': 'form-control'}
    )

    gender = core.RadioField(
        label='性别',
        choices=(
            (1, ''),
            (2, ''),
        ),
        coerce=int # “1” “2”
     )
    city = core.SelectField(
        label='城市',
        choices=(
            ('bj', '北京'),
            ('sh', '上海'),
        )
    )

    hobby = core.SelectMultipleField(
        label='爱好',
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        coerce=int
    )

    favor = core.SelectMultipleField(
        label='喜好',
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        widget=widgets.ListWidget(prefix_label=False),
        option_widget=widgets.CheckboxInput(),
        coerce=int,
        default=[1, 2]
    )

    def __init__(self, *args, **kwargs):
        super(RegisterForm, self).__init__(*args, **kwargs)
        self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
        self.favor.data=[1,]





@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'GET':
        form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
        return render_template('register.html', form=form)
    else:
        form = RegisterForm(formdata=request.form)
        if form.validate():
            print('用户提交数据通过格式验证,提交的值为:', form.data)
        else:
            print(form.errors)
        return render_template('register.html', form=form)



if __name__ == '__main__':
    app.run()

login.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<h1>用户注册</h1>
<form method="post" novalidate style="padding:0  50px">
    {% for field in form %}
    <p>{{field.label}}: {{field}} {{field.errors[0] }}</p>
    {% endfor %}
    <input type="submit" value="提交">
</form>
</body>
</html>

 

推荐阅读