首页 > 其他 > 详细

rest_framework组件之认证,权限,访问频率

时间:2018-10-22 22:01:17      阅读:36      评论:0      收藏:0      [点我收藏+]

标签:from   use   ==   msg   ren   内置   取ip   rem   meta   

共用的models
技术分享图片
 1 from django.db import models
 2 
 3 
 4 # Create your models here.
 5 
 6 
 7 class User(models.Model):
 8     username = models.CharField(max_length=32)
 9     password = models.CharField(max_length=32)
10     user_type = models.IntegerField(choices=((1, 超级用户), (2, 普通用户), (3, 第三方用户)))
11 
12 
13 class UserToken(models.Model):
14     user = models.OneToOneField(to=User)
15     token = models.CharField(max_length=64)
View Code

urls

技术分享图片
1 from django.conf.urls import url
2 from django.contrib import admin
3 from app01 import views
4 urlpatterns = [
5     url(r^admin/, admin.site.urls),
6     url(r^course/,views.Course.as_view()),
7     url(r^login/,views.Login.as_view()),
8     url(r^get_ip/,views.GetIp.as_view()),
9 ]
View Code

views

技术分享图片
 1 from django.shortcuts import render, HttpResponse
 2 
 3 # Create your views here.
 4 
 5 import json
 6 from django.views import View
 7 from rest_framework.views import APIView
 8 from app01 import models
 9 from utils.common import *
10 from rest_framework.response import Response
11 
12 
13 class Login(APIView):
14     def post(self, request, *args, **kwargs):
15         response = MyResponse()
16         name = request.data.get(username)
17         pwd = request.data.get(password)
18         user = models.User.objects.filter(username=name, password=pwd).first()
19         if user:
20             token = get_token(name)
21             ret = models.UserToken.objects.update_or_create(user=user, defaults={token: token})
22             response.status = 100
23             response.msg = 登陆成功
24             response.token = token
25 
26         else:
27             response.msg = 用户名或密码错误
28 
29         return Response(response.get_dic())
30 
31 
32 class Course(APIView):
33     authentication_classes = [MyAuth, ]
34     permission_classes = [MyPermission, ]
35     throttle_classes = [VisitThrottle, ]
36 
37     def get(self, request):
38         print(request.user)
39         print(request.auth)
40         return HttpResponse(json.dumps({name: Python}))
View Code

 



认证组件

1 写一个认证类
from rest_framework.authentication import BaseAuthentication
class MyAuth(BaseAuthentication):
def authenticate(self,request):
# request 是封装后的
token = request.query_params.get(‘token‘)
ret = models.UserToken.objects.filter(token=token).first()
if ret:
# 认证通过
return
else:
raise AuthenticationFailed(‘认证失败‘)
#可以不写了
def authenticate_header(self,ss):
pass
2 局部使用
authentication_classes=[MyAuth,MyAuth2]
3 全局使用
查找顺序:自定义的APIView里找---》项目settings里找---》内置默认的
REST_FRAMEWORK={
‘DEFAULT_AUTHENTICATION_CLASSES‘:[‘utils.common.MyAuth‘,]

}

认证:
技术分享图片
 1 import hashlib
 2 import time
 3 
 4 
 5 class MyResponse():
 6     def __init__(self):
 7         self.status = 1001
 8         self.msg = None
 9 
10     def get_dic(self):
11         return self.__dict__
12 
13 
14 # res = MyResponse()
15 #
16 # print(res.get_dic())
17 
18 def get_token(name):
19     md = hashlib.md5()
20     md.update(name.encode(utf-8))
21     md.update(str(time.time()).encode(utf-8))
22 
23     return md.hexdigest()  # 返回的是包含name和time的hash值
24 
25 
26 from rest_framework.authentication import BaseAuthentication
27 from app01 import models
28 from rest_framework.exceptions import APIException, AuthenticationFailed
29 
30 
31 class MyAuth(BaseAuthentication):
32     def authenticate(self, request):
33         token = request.query_params.get(token)
34         ret = models.UserToken.objects.filter(token=token).first()
35         if ret:
36 
37             return ret.user,ret
38         else:
39 
40             raise AuthenticationFailed("认证失败")
View Code

 


权限组件
1 写一个类
class MyPermission():
def has_permission(self,request,view):
token=request.query_params.get(‘token‘)
ret=models.UserToken.objects.filter(token=token).first()
if ret.user.type==2:
# 超级用户可以访问
return True
else:
return False
2 局部使用:
permission_classes=[MyPermission,]
3 全局使用:
REST_FRAMEWORK={
‘DEFAULT_PERMISSION_CLASSES‘:[‘utils.common.MyPermission‘,]
}

权限组件:
技术分享图片
 1 from rest_framework.permissions import BasePermission
 2 
 3 class MyPermission(BasePermission):
 4     message = "不是超级用户,查看不了"
 5     def has_permission(self, request, view):
 6         token = request.query_params.get("token")
 7         ret=models.UserToken.objects.filter(token=token).first()
 8         print(ret.user.get_user_type_display())
 9         if ret.user.user_type == 1:
10             return True
11         else:
12             return False
View Code



频率组件
1 写一个类:
from rest_framework.throttling import SimpleRateThrottle
class VisitThrottle(SimpleRateThrottle):
scope = ‘xxx‘
def get_cache_key(self, request, view):
return self.get_ident(request)
2 在setting里配置:
‘DEFAULT_THROTTLE_RATES‘:{
‘xxx‘:‘5/h‘,
}
3 局部使用
throttle_classes=[VisitThrottle,]
4 全局使用
REST_FRAMEWORK={
‘DEFAULT_THROTTLE_CLASSES‘:[‘utils.common.MyPermission‘,]
}
访问频率组件:
技术分享图片
1 from rest_framework.throttling import SimpleRateThrottle
2 
3 
4 class VisitThrottle(SimpleRateThrottle):
5     scope = da peng ya
6 
7     def get_cache_key(self, request, view):
8         return self.get_ident(request)
View Code



作业:
1 认证类,不存数据库的token验证
token=‘idfjasfsadfas|userid‘
技术分享图片
 1 def get_token(id,salt=123):
 2     import hashlib
 3     md=hashlib.md5()
 4     md.update(bytes(str(id),encoding=utf-8))
 5     md.update(bytes(salt,encoding=utf-8))
 6 
 7     return md.hexdigest()+|+str(id)
 8 
 9 def check_token(token,salt=123):
10     ll=token.split(|)
11     import hashlib
12     md=hashlib.md5()
13     md.update(bytes(ll[-1],encoding=utf-8))
14     md.update(bytes(salt,encoding=utf-8))
15     if ll[0]==md.hexdigest():
16         return True
17     else:
18         return False
19 
20 class TokenAuth():
21     def authenticate(self, request):
22         token = request.GET.get(token)
23         success=check_token(token)
24         if success:
25             return
26         else:
27             raise AuthenticationFailed(认证失败)
28     def authenticate_header(self,request):
29         pass
30 class Login(APIView):
31     def post(self,reuquest):
32         back_msg={status:1001,msg:None}
33         try:
34             name=reuquest.data.get(name)
35             pwd=reuquest.data.get(pwd)
36             user=models.User.objects.filter(username=name,password=pwd).first()
37             if user:
38                 token=get_token(user.pk)
39                 # models.UserToken.objects.update_or_create(user=user,defaults={‘token‘:token})
40                 back_msg[status]=1000
41                 back_msg[msg]=登录成功
42                 back_msg[token]=token
43             else:
44                 back_msg[msg] = 用户名或密码错误
45         except Exception as e:
46             back_msg[msg]=str(e)
47         return Response(back_msg)
48 from rest_framework.authentication import BaseAuthentication
49 class TokenAuth():
50     def authenticate(self, request):
51         token = request.GET.get(token)
52         token_obj = models.UserToken.objects.filter(token=token).first()
53         if token_obj:
54             return
55         else:
56             raise AuthenticationFailed(认证失败)
57     def authenticate_header(self,request):
58         pass
59 
60 class Course(APIView):
61     authentication_classes = [TokenAuth, ]
62 
63     def get(self, request):
64         return HttpResponse(get)
65 
66     def post(self, request):
67         return HttpResponse(post)
View Code


2 自己写一个每分钟限制三次访问的频率类
ip:拿到ip
全局变量:{ip:[时间1,时间2,时间3]}

获取IP
1     def get(self, request):
2         x_forwarded_for = request.META.get(HTTP_X_FORWARDED_FOR)
3         if x_forwarded_for:
4             ip = x_forwarded_for.split(,)[0]  # 所以这里是真实的ip
5         else:
6             ip = request.META.get(REMOTE_ADDR)  # 这里获得代理ip
7         return ip

 

待补充。。








rest_framework组件之认证,权限,访问频率

标签:from   use   ==   msg   ren   内置   取ip   rem   meta   

原文:https://www.cnblogs.com/Roc-Atlantis/p/9833238.html

(0)
(0)
   
举报
评论 一句话评论(0
0条  
登录后才能评论!
© 2014 bubuko.com 版权所有 鲁ICP备09046678号-4
打开技术之扣,分享程序人生!
             

鲁公网安备 37021202000002号