首页 > 技术文章 > Django上自建一个token校验机制

liaojiafa 2020-03-22 18:38 原文

场景介绍

自己开发的接口有些需要实名认证,不能直接让匿名用户访问,那如果使用django的login_require装饰器设置在接口方法前面 会直接返回登录页面,所以在这个时候,可以考虑自己开发一个验证机制了,防止匿名用户调用。

验证流程

  1. 先让用户提交用户密码到获取token的接口
  2. 拿到用户名密码后去后台验证是否有效,如果有效就判断这个用户是否有token在数据库中,有的话就更新这个token,否则就新建一条记录在数据库,然后返回这个新建好的token的给请求者。
  3. 用户拿到刚才请求到的token后,在请求头或者请求参数里面携带这个token。然后再去请求对应的接口。
  4. 请求过来以后,首先自建的验证机制会对token进行校验,去请求参数里面找,参数也没有就返回校验失败,找到token以后就去判断这个token是否存在和过期,token无效、过期的话,返回校验失败,只有校验成功以后,才会放行请求到接口上,此时接口才会真正处理用户的请求。

需要做的事

  1. 新建一个表
  2. 新建一个创建token的方法
  3. 新建一个校验token的装饰器
  4. 接口上使用装饰器即可

以上在你的项目中一个models和views编写即可,编写完以后,哪里需要使用,导入这个装饰器即可。

实操步骤

以下都运行在python3.5.6 和 django 1.11.13 上,无问题。

创建数据库
from django.db import models

# Create your models here.
from pisces import settings
from datetime import timedelta,datetime
from django.contrib.auth.models import User
import pytz
import random
#tzutc_8=pytz.FixedOffset(480)
#tzutc_8=pytz.timezone("UTC")
tzutc_8=pytz.timezone('Asia/Shanghai')

class user_token(models.Model):
    user = models.OneToOneField(User,related_name='user_profile')
    utype = models.CharField(max_length=15, default="guest") # ?????? guest access admin
    utoken = models.CharField(max_length=256, null=True,blank=True)
    utokenCreateTime = models.DateTimeField(null=True,blank=True)
    utokenExpire = models.DateTimeField(null=True,blank=True)

    def getRandomChar(self,start=None,end=None,length=30):
        '''
        get a random int and then translate to char
        :param start:
        :param end:
        :param length:
        :return:  return a token
        '''
        if ( start and not start.isdigit() ) or not start:
            start = 65
        if ( end and not end.isdigit() ) or not end:
            end = 122
        randomChar = ""
        for i in range(length):
            x = random.randint(start,end)
            # >>> chr(91)
            # '['
            # >>> chr(92)
            # '\\'
            # >>> chr(93)
            # ']'
            # >>> chr(94)
            # '^'
            # >>> chr(95)
            # '_'
            # >>> chr(96)
            # '`'

            if x == 91 or x == 92 or x == 93 or x == 94 or x == 95 or x == 96:
                continue
            randomChar = randomChar + chr(x)
        return randomChar

    def updateToken(self):
        '''
        update the user's token ,if the token is exist,it will be update ,else the token is not exist,it will be create
        :return:
        '''
        utokenDuration = settings.UTOKEN_DURATION
        if self.utokenCreateTime:  # means it has created,
            if not  self.utokenExpire:
                self.utokenExpire = datetime.now() + timedelta(seconds=utokenDuration)
                self.save()
            differenceTime = self.utokenExpire - self.utokenCreateTime
            if differenceTime.seconds - utokenDuration <= 60: # update the token before 1 min
                randomChar = self.getRandomChar()
                self.utoken = randomChar
                self.utokenExpire = datetime.now()   + timedelta(seconds=utokenDuration)
                self.save()
                return {"code":200,"msg":"update utoken successfully!"}
            else:
                return {"code":301,"msg":"the utoken is not expire!so won't update! "}
        else:
            randomChar = self.getRandomChar()
            self.utoken = randomChar
            self.utokenCreateTime = datetime.now()
            self.utokenExpire = datetime.now()  + timedelta(seconds=utokenDuration)
            self.save()
            return {"code":200,"msg":"create utoken successfully!"}

    def checkUtokenIsValid(self,utoken):
        '''
        check the utoken weather is valid or not!
        :return:
        '''
        if self.utokenCreateTime and self.utokenExpire:  # means it has created,
            #print(self.utokenCreateTime.astimezone(tzutc_8),  self.utokenCreateTime, datetime.now())
            if self.utokenExpire.astimezone(tzutc_8) <=  pytz.utc.localize(datetime.now()): # update the token before 1 min
                return {"code":400,"msg":"the utoken was expire! please update it! exipre time:%s , system time:%s"%(self.utokenExpire.astimezone(tzutc_8).strftime("%Y-%m-%d %H:%M:%S"),pytz.utc.localize(datetime.now()))}
            else:
                if utoken == self.utoken:
                    return {"code":200,"msg":"correct"}
                else:
                    return {"code":401,"msg":"not correct"}
        else:
            return {"code":400,"msg":"the utoken is not create! please authenticate! "}


    def __str__(self):
        return "%s-%s"%(self.user.username,self.utype)

编写装饰器与创建token的方法
创建token的方法
from django.shortcuts import render,redirect
from django.contrib.auth import authenticate
from django.contrib.auth.models import User
from center.models import user_token
from django.http import JsonResponse
from django.contrib.auth import login as auth_login, logout as auth_logout
from django.views.decorators.csrf import csrf_exempt
from pisces.mylog import writeDebug,writeInfo,writeErr

@csrf_exempt
def get_utoken(request):
    '''
    get the user's token
    :param request:
    :return:
    '''
    if request.method == 'POST':
        username = request.POST.get('username', '')
        password = request.POST.get('password', '')
        user = authenticate(username=username, password=password)
        if user:
            writeDebug("center/views","get_utoken","the user[%s] login successfully"%username)
            auth_login(request, user, )
            userObj = User.objects.filter(username=username)
            if not userObj:
                writeDebug("center/views","get_utoken","no userObj[%s],begin to create the user"%username)
                ux = User.objects.create_user(username,"%s@chanjet.com"%username,password)
                utk = user_token(user=ux)
                utk.save()
                utkobj = user_token.objects.filter(user=ux).first()
                updateTokenMsg = utkobj.updateToken()
                utoken = utkobj.utoken
            else:
                utkobj = user_token.objects.filter(user=userObj)
                if not utkobj:
                    utkobj_save = user_token(user=userObj.first())
                    utkobj_save.save()
                    #utkObj = user_token.objects.filter(user=userObj)
                updateTokenMsg = utkobj.first().updateToken()
                utkobj.first().save()

                utoken = utkobj.first().utoken
            writeInfo("frame/views","login","the username[ {username} ] login and create token successfully!".format(username=username))
            return JsonResponse({"utoken":utoken,"code":200,"msg":updateTokenMsg.get("msg")})
        writeInfo("frame/views","login","the username[ {username} ] or password is invalied,can't login!".format(username=username))
        return JsonResponse({"code":401,"msg":"the username or password is not correct!"})
    else:
        return JsonResponse({"code":402,"msg":"only post method is support"})
校验token的装饰器
from django.shortcuts import render,redirect
from django.contrib.auth import authenticate
from django.contrib.auth.models import User
from center.models import user_token
from django.http import JsonResponse
from django.contrib.auth import login as auth_login, logout as auth_logout
from django.views.decorators.csrf import csrf_exempt
from pisces.mylog import writeDebug,writeInfo,writeErr
def checkUserToken(func):
    '''
    check if the user has the token
    :param func:
    :return:
    '''
    def warper(*args,**kwargs):
        try:
            utoken = args[0].POST.get("utoken")
            if utoken is None:
                utoken = args[0].GET.get("utoken")
        except BaseException as e:
            try:
                utoken = args[0].GET.get("utoken")
            except BaseException as e:
                return JsonResponse({"code":400,"msg":"can't get utoken! please check params"})

        if not utoken:
            return JsonResponse({"code":400,"msg":"can't get utoken! please check params"})
            
        userObj = user_token.objects.filter(utoken=utoken)
        if userObj:
            ux = userObj.first()
            result = ux.checkUtokenIsValid(utoken)
            if result.get("code") == 200:
                kwargs['username'] = ux.user.username
                return func(*args,**kwargs)
            else:
                return JsonResponse(result)
        else:
            return JsonResponse({"code":404,"msg":"can't find the user by utoken--> %s"%(utoken)})
    return warper

使用装饰器


from center.views import checkUserToken
from django.http.response import JsonResponse

@checkUserToken
@csrf_exempt
def create_zk_with_zkjob(request):
    '''
    
    :param request:
    :return:
    '''
    // 请求参数该怎么获取就怎么获取,并没有改变参数的获取方式
    // 以下替换成你的逻辑代码即可。
    post_env = request.POST.get("env")
    post_cxxx = request.POST.get("cxxx")
    post_project_name = request.POST.get("project_enname")
    zkop = core.zkopreation(post_env)
    zknodename = "%s%s_%s/foo"%(zkop.zkRootPath,post_cxxx,post_project_name)
    zkop_result = zkop.createPath(zknodename)
    return JsonResponse(zkop_result)

推荐阅读