首页 > 技术文章 > Flask 学习 七 用户认证

Erick-L 2017-05-21 18:01 原文

使用werkzeug 实现密码散列

from werkzeug.security import generate_password_hash,check_password_hash

class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(64), unique=True,index=True)
    role_id=db.Column(db.Integer,db.ForeignKey('roles.id'))
    password_hash=db.Column(db.String(128))
    @property
    def password(self):
        raise AttributeError('密码不是一个可读属性') #只写属性
    @password.setter
    def password(self,password):
        self.password_hash = generate_password_hash(password)

    def verify_password(self,password):
        return check_password_hash(self.password_hash,password)

    def __repr__(self):
        return '<User %r>' % self.username

密码散列化测试tests/test_url_model.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import unittest
from app.models import User
class UserModelTestCase(unittest.TestCase):
    def test_password_setter(self):
        u = User(password='cat')
        self.assertTrue(u.password_hash is not None)
    def test_no_password_getter(self):
        u = User(password='cat')
        with self.assertRaises(AttributeError):
            u.password
    def test_password_verification(self):
        u = User(password='cat')
        self.assertTrue(u.verify_password('cat'))
        self.assertFalse(u.verify_password('dog'))
    def test_password_salts_are_random(self):
        u =User(password='cat')
        u2=User(password='cat')
        self.assertTrue(u.password_hash != u2.password_hash)
View Code

创建用户认证蓝本

app/auth/__init__.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Blueprint
auth = Blueprint('auth',__name__)
from . import views

app/auth/views.py 蓝本路由和视图函数

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import render_template
from . import auth

@auth.route('/login')
def login():
    return render_template('auth/login.html')

app/__init__.py 注册蓝本

from .auth import auth as auth_blueprint
app.register_blueprint(auth_blueprint,url_prefix='/auth')

安装flask-login插件

pip install flask-login

修改User模型,支持用户登陆 app/models.py

from flask_login import UserMixin

class User(UserMixin,db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    email=db.Column(db.String(64),unique=True,index=True)
    username = db.Column(db.String(64), unique=True,index=True)
    role_id=db.Column(db.Integer,db.ForeignKey('roles.id'))
    password_hash=db.Column(db.String(128))

app/__init__.py 初始化Flask_Login

from flask_login import LoginManager
#初始化Flask-Login
login_manager=LoginManager()
login_manager.session_protection='strong' # 记录客户端ip,用户代理信息,发现异动,登出用户,可以设置不同等级None,'basic','strong'
login_manager.login_view='auth.login' # 设置登录页面端点,蓝本的名字也要加到前面

def create_app(config_name):
    #....
    login_manager.init_app(app) #初始化登陆
    #....

app/models.py 加载用户的回掉函数

from . import login_manager

# 加载用户的回调函数
@login_manager.user_loader
def load_user(user_id):
    return User.query.get(int(user_id))
# 加载用户回掉函数,接收以Unicode字符串表示的用户标识符,如果能找到这个用户必须返回用户对象,否则返回None

保护路由

为了只让认证的用户访问,未认证的用户会拦截请求,发往登陆页面

#保护路由
@app.route('/secret')
@login_required
def secret():
    return '只有认证后的用户才能登陆'

添加登陆表单

app/auth/forms.py

from flask_wtf import FlaskForm
from wtforms import StringField,PasswordField,BooleanField,SubmitField
from wtforms.validators import DataRequired,Length,Email

class LoginForm(FlaskForm):
    email=StringField('邮箱',validators=[DataRequired(),Length(1,64),Email()])
    password=PasswordField('密码',validators=[DataRequired()])
    remember_me=BooleanField('保持登陆')
    submit = SubmitField('登陆')

auth/login.html  用wtf.quick_form()渲染表单

{% extends 'base.html' %}
{% import'bootstrap/wtf.html' as wtf %}
{% block title %}Flask-登陆{% endblock %}
{% block page_content %}
    <div class="page-header">
    <h1>登陆</h1>
    </div>
    <div class="col-md-4">
        {{ wtf.quick_form(form) }}
    </div>
{% endblock %}

base.html  # current_user.is_authenticated 如果是匿名用户登陆is_authenticated返回False,这个方法可以判断当前用户是否登陆

<ul class="nav navbar-nav navbar-right">
            {% if current_user.is_authenticated %}
             <li><a href="{{ url_for('auth.logout') }}">退出登陆</a></li>
            {% else %}
            <li><a href="{{ url_for('auth.login') }}">立即登陆</a></li>
            {% endif %}
</ul>

登入用户

app/auth/views.py #登陆路由

from flask import render_template,redirect,request,url_for,flash
from flask_login import login_user,login_required,logout_user,current_user
from . import auth
from ..models import User
from .forms import LoginForm

@auth.route('/login',methods=['get','post'])
def login():
    form = LoginForm()
    if form.validate_on_submit(): # 验证表单数据
        user = User.query.filter_by(email=form.email.data).first()
        if user is not None and user.verify_password(form.password.data): # verify_password会验证表单数据
            login_user(user,form.remember_me.data) # 如果为True则为用户生成长期有效的cookies
            return redirect(request.args.get('next') or url_for('main.index'))# next保存原地址(从request.args字典中读取)
        flash('用户名或密码无效')
    return render_template('auth/login.html',form=form)

更新登陆模板auth/login.html

{% extends 'base.html' %}
{% import'bootstrap/wtf.html' as wtf %}
{% block title %}Flask-登陆{% endblock %}
{% block page_content %}
    <div class="page-header">
    <h1>登陆</h1>
    </div>
    <div class="col-md-4">
        {{ wtf.quick_form(form) }}
    </div>
{% endblock %}

登出用户

auth/views.py

from flask_login import login_user,login_required,logout_user,current_user

@auth.route('/logout')
@login_required
def logout():
    logout_user()
    flash('你已经退出')
    return redirect(url_for('main.index'))

测试登陆 index.html

<h1>Hello,
        {% if current_user.is_authenticated %}
            {{ current_user.username }}
        {% else %}
            访客
        {% endif %}!
</h1>

注册新用户

添加用户注册表单

from flask_wtf import FlaskForm
from wtforms import StringField,PasswordField,BooleanField,SubmitField
from wtforms.validators import DataRequired,Length,Email,Regexp,EqualTo
from ..models import User

class RegistrationForm(FlaskForm):
    email = StringField('邮箱', validators=[DataRequired(), Length(1, 64), Email()])
    username= StringField('用户名', validators=[DataRequired(), Length(1, 64), Regexp('^[A-Za-z][A-Za-z0-9_.]*$',0,'用户名必须是字母,数字,点号,下划线')])
    password = PasswordField('密码', validators=[DataRequired(),EqualTo('password2',message='密码不一致')])
    password2 =  PasswordField('再次输入密码', validators=[DataRequired()])
    submit = SubmitField('注册')
    def validate_email(self,filed):
        if User.query.filter_by(email=filed.data).first():
            raise ValidationError('邮箱已被注册')
    def validate_username(self,filed):
        if User.query.filter_by(username=filed.data).first():
            raise ValidationError('用户名已被使用')

auth/register.html

{% extends 'base.html' %}
{% import'bootstrap/wtf.html' as wtf %}
{% block title %}Flask-注册{% endblock %}
{% block page_content %}
    <div class="page-header">
    <h1>注册</h1>
    </div>
    <div class="col-md-4">
        {{ wtf.quick_form(form) }}
    </div>
{% endblock %}

auth/login.html 添加链接到注册页面

 <p>新用户?<a href="{{ url_for('auth.register') }}">点击这里注册</a></p>

app/auth/views.py

@auth.route('/register',methods=['get','post'])
def register():
    form = RegistrationForm()
    if form.validate_on_submit():
        user=User(email = form.email.data,
                  username=form.username.data,
                  password=form.password.data)
        db.session.add(user)
        flash('你现在已经登陆了')
        return redirect(url_for('auth.login'))
    return render_template('auth/register.html',form=form)

确认账户

使用isdangerous生成确认令牌

app/models.py 确认用户账户

from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from flask import current_app
from . import db
class User(UserMixin,db.Model):
    # ...
    confirmed = db.Column(db.Boolean,default=False)
    def generate_confirmation_token(self,expiration=3600):  #生成令牌,有效期1小时
        s = Serializer(current_app.config['SECRET_KEY'],expiration)
        return s.dumps({'confirm':self.id})
    def confirm(self,token):  # 检验令牌 如果通过把confirmed字段设为True
        s=Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except:
            return False
        if data.get('confirm') !=self.id: # 检查令牌id是否存储是已登录用户进行匹配
            return False
        self.confirmed=True
        db.session.add(self)
        return True

发送确认邮件

auth/views.py

from ..email import send_mail

@auth.route('/register',methods=['get','post'])
def register():
    form = RegistrationForm()
    if form.validate_on_submit():
        user=User(email = form.email.data,
                  username=form.username.data,
                  password=form.password.data)
        db.session.add(user)
        db.session.commit()
        token = user.generate_confirmation_token()
        send_mail(user.email,'确认你的账户','auth/email/confirm',user=user,token=token)
        flash('我们已经给你发送了一封确认邮件')
        return redirect(url_for('main.index'))
    return render_template('auth/register.html',form=form)

auth/email/cinfirm.txt

亲爱的,{{ user.username }}
欢迎来到 Flasky
为了确认您的账户,请点击以下链接:

{{ url_for('auth.confirm',token=token,_external=True) }} #返回绝对路径

Flasky 团队

auth/views.py

from flask_login import current_user
@auth.route('/confirm/<token>')
@login_required # 保护这个路由,用户需要先登录
def confirm(token):
    if current_user.confirmed:
        return redirect(url_for('main.index'))
    if current_user.confirm(token):
        flash('你已经确认了你的账户,谢谢')
    else:
        flash('确认链接失效或过期')
    return redirect(url_for('main.index'))

app/auth/views.py 使用before_app_request来全局使用处理程序中未确认的用户

@auth.before_app_request # 如果返回响应或重定向,会直接发送至客户端,不会调用请求视图函数
def before_request():
    if current_user.is_authenticated\
            and not current_user.confirmed \
            and request.endpoint[:5] != 'auth.'\   #访问路由获取权限
            and request.endpoint != 'static':
        return redirect(url_for('auth.unconfirmed'))

@auth.route('/unconfirmed')
def unconfirmed():
    if current_user.is_anonymous or current_user.confirmed:
        return redirect(url_for('main.index'))
    return render_template('auth/unconfirmed.html')

auth/views.py 重新发送确认邮件

@auth.route('/confirm')
@login_required
def resend_confirmation():
    token=current_user.generate_confirmation_token()
    send_mail(current_user.email,'确认你的账户','auth/email/confirm',user=current_user,token=token)
    flash('一封新的确认邮件已经发送到您的邮箱')
    return redirect(url_for('main.index'))

管理账户:

修改密码  重设密码 修改电子邮件地址

https://github.com/Erick-LONG/flask_blog/commit/3748e70d9f986b066328185bcf417d8b8205ed8c

https://github.com/Erick-LONG/flask_blog/commit/ce4a67d31f0d7d5f5c6719eebb193b2949c77b0c

https://github.com/Erick-LONG/flask_blog/commit/f89ef3dac3819129165be4d9b2a67a2bb092cf34

 

推荐阅读