-drf-認證許可權頻率
阿新 • • 發佈:2020-11-18
一 認證Authentication
1 自定義認證方案
1.1 編寫models
class User(models.Model): username = models.CharField(max_length=32) password = models.CharField(max_length=32) class UserToken(models.Model): user = models.OneToOneField(to='User', on_delete=models.CASCADE) token = models.CharField(max_length=64)
1.2 編寫登入認證類
class TokenAuth(): def authenticate(self, request): token = request.data.get('token') # token生成後再資料庫儲存一份,再發送到客戶端,存到cookie中,如果是APP則獨立存在 # 然後每次登陸都會帶著這個token前來校驗 token_obj = models.UserToken.objects.filter(token=token) # 使用者登入時該使用者自帶的token校驗是否存在,因為token唯一所以有則成功不會匹配到重複的 if token_obj: return # 可以是一個元組 else: raise AuthenticationFailed('該使用者不存在') def authenticate_header(self, request): pass # 報相關錯誤資訊 ''' 注意: 1.認證類,認證通過可以返回一個元組,有兩個值,第一個值會給request.user,第二個值會給request.auth 2.認證類可以配置多個,按照從前向後的順序執行,如果前面有返回值則結束 '''
1.3 編寫檢視
def get_random(name): import hashlib import time md = hashlib.md5() md.update(bytes(str(time.time()), encoding='utf-8')) md.update(bytes(name, encoding='utf-8')) # 通過唯一時間戳以及唯一username加密得到唯一的token值 return md.hexdigest() class Register(APIView): def post(self, request): res = {'status': 100, 'msg': '儲存成功'} # 反序列化稱為一個物件 # print(request.data['publish_id']) ser = serializer.UserModelSerializer(data=request.data) if ser.is_valid(): ser.save() res['result'] = ser.data # 序列化輸出 else: res['code'] = 109 res['msg'] = str(ser.errors) return Response(res) class Login(APIView): def post(self, request): res = {'status': 100, 'msg': '登入成功'} username = request.data.get('username') password = request.data.get('password') user = models.User.objects.filter(username=username, password=password) if user: user = user.first() token = get_random(username) models.UserToken.objects.update_or_create(user=user, defaults={'token':token}) # 當前登入使用者有token,則是更新token的操作,如果沒有token,則是建立token並且與相應 使用者進行一對一關聯的操作 res['token'] = token else: res['status'] = 101 res['msg'] = '登入失敗' return Response(res) class Course(APIView): authentication_classes = [TokenAuth, ] # 登入認證,並且校驗了token,可以放多個認證 def get(self, request): return Response({'msg': '檢視課程成功'}) def post(self, request): return Response({'msg': '建立課程成功'})
建立登入成功
1.4 全域性使用
REST_FRAMEWORK = {
"DEFAULT_AUTHENTICATION_CLASSES": ["app10.myauth.TokenAuth", ]
}
# 區域性禁用即區域性不用校驗
# 在檢視類下定義一個
authentication_classes = []
# 相當於重寫了校驗,然後設定為空不需要校驗
# 注意:全域性時TokenAuth不要寫在views,寫入另外一個檔案
1.5 區域性使用
#區域性使用,只需要在檢視類里加入:
authentication_classes = [TokenAuth, ]
2 內建認證方案(需要配合許可權使用)
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': (
'rest_framework.authentication.SessionAuthentication', # session認證
'rest_framework.authentication.BasicAuthentication', # 基本認證
)
}
也可以在每個檢視中通過設定authentication_classess屬性來設定
from rest_framework.authentication import SessionAuthentication, BasicAuthentication
from rest_framework.views import APIView
class ExampleView(APIView):
# 類屬性
authentication_classes = [SessionAuthentication, BasicAuthentication]
...
認證失敗會有兩種可能的返回值:
- 401 Unauthorized 未認證
- 403 Permission Denied 許可權被禁止
二 許可權Permissions
許可權控制可以限制使用者對於檢視的訪問和對於具體資料物件的訪問
- 在執行檢視的dispatch()方法前(例項化request物件),會進行檢視訪問許可權的判斷
- 在通過get_object()獲取具體物件時,會進行模型物件訪問許可權的判斷
登入成功以後,超級使用者可以幹某些事,普通使用者不能幹。超級使用者可以檢視某些介面,普通使用者不能
注意:
許可權校驗是在登入校驗的基礎之上,不然會因為在一個'AnonymousUser' object物件中找不到校驗許可權的欄位而報錯
如下:
AttributeError: 'AnonymousUser' object has no attribute 'user_type'
1 自定義許可權
1.1 編寫許可權類
class SuperPermission(BaseAuthentication):
message = '不是超級使用者,沒有許可權訪問' # 重寫message,本來是英文重寫成中文提示
def has_permission(self, request, view):
# 許可權在認證之後所以能取到request.user
if request.user.user_type == '1':
return True
else:
return False
1.2 全域性使用
REST_FRAMEWORK = {
"DEFAULT_PERMISSION_CLASSES": ["app10.MyAuth.SuperPermission", ]
}
-區域性經用,部分檢視類沒有許可權限制
permission_classes = [] # 重寫為空即可
1.3 區域性使用
# 區域性使用只需要在檢視類里加入:
permission_classes = [MyAuth.SuperPermission]
1.4 說明
如需自定義許可權,需繼承rest_framework.permissions.BasePermission父類,並實現以下兩個任何一個方法或全部
- `.has_permission(self, request, view)`
是否可以訪問檢視, view表示當前檢視物件
- `.has_object_permission(self, request, view, obj)`
是否可以訪問資料物件, view表示當前檢視, obj為資料物件
2 內建許可權
2.1 內建許可權類
from rest_framework.permissions import AllowAny,IsAuthenticated,IsAdminUser,IsAuthenticatedOrReadOnly
- AllowAny 允許所有使用者
- IsAuthenticated 僅通過認證的使用者
- IsAdminUser 僅管理員使用者
- IsAuthenticatedOrReadOnly 已經登陸認證的使用者可以對資料進行增刪改操作,沒有登陸認證的只能檢視資料。
2.2 全域性使用
可以在配置檔案中全域性設定預設的許可權管理類,如
REST_FRAMEWORK = {
....
'DEFAULT_PERMISSION_CLASSES': (
'rest_framework.permissions.IsAuthenticated',
)
}
如果未指明,則採用如下預設配置
REST_FRAMEWORK = {
....
'DEFAULT_PERMISSION_CLASSES': (
'rest_framework.permissions.AllowAny',
)
}
2.3 區域性使用
也可以在具體的檢視中通過permission_classes屬性來設定,如
from rest_framework.permissions import IsAuthenticated
from rest_framework.views import APIView
class ExampleView(APIView):
permission_classes = (IsAuthenticated,)
...
三 限流Throttling
可以對介面訪問的頻次進行限制,以減輕伺服器壓力。
一般用於付費購買次數,投票以及反爬等場景使用
1 自定義頻率類
1.1 編寫頻率類
# 自定義的邏輯
#(1)取出訪問者ip
#(2)判斷當前ip不在訪問字典裡,新增進去,並且直接返回True,表示第一次訪問,在字典裡,繼續往下走
#(3)迴圈判斷當前ip的列表,有值,並且當前時間減去列表的最後一個時間大於60s,把這種資料pop掉,這樣列表中只有60s以內的訪問時間,
#(4)判斷,當列表小於3,說明一分鐘以內訪問不足三次,把當前時間插入到列表第一個位置,返回True,順利通過
#(5)當大於等於3,說明一分鐘內訪問超過三次,返回False驗證失敗
class MyThrottles():
VISIT_RECORD = {}
def __init__(self):
self.history=None
def allow_request(self,request, view):
#(1)取出訪問者ip
# print(request.META)
ip=request.META.get('REMOTE_ADDR')
import time
ctime=time.time()
# (2)判斷當前ip不在訪問字典裡,新增進去,並且直接返回True,表示第一次訪問
if ip not in self.VISIT_RECORD:
self.VISIT_RECORD[ip]=[ctime,]
return True
self.history=self.VISIT_RECORD.get(ip)
# (3)迴圈判斷當前ip的列表,有值,並且當前時間減去列表的最後一個時間大於60s,把這種資料pop掉,這樣列表中只有60s以內的訪問時間,在原始碼中這個並沒有寫死,而是通過
'''
if rate is None:
return (None, None)
num, period = rate.split('/')
num_requests = int(num)
duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
'''
# 從配置中獲得了限制時間
while self.history and ctime-self.history[-1]>60: # duration
self.history.pop()
# (4)判斷,當列表小於3,說明一分鐘以內訪問不足三次,把當前時間插入到列表第一個位置,返回True,順利通過
# (5)當大於等於3,說明一分鐘內訪問超過三次,返回False驗證失敗
# 這個限制訪問次數也是通過
'''
if len(self.history) >= self.num_requests:
return self.throttle_failure()
return self.throttle_success()
'''
if len(self.history)<3: # 相當於配置的request_num
self.history.insert(0,ctime)
return True
else:
return False
def wait(self):
import time
ctime=time.time()
return 60-(ctime-self.history[-1])
# 所以在原始碼中相當於是把訪問時間以及訪問次數寫活了,
# 然後直接在setting中配置即可
1.2 全域性使用
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_CLASSES':['app01.utils.VisitThrottles',],
}
1.3 區域性使用
#在檢視類裡使用
throttle_classes = [MyThrottles,]
2 內建頻率類
2.1 根據使用者ip限制
#寫一個類,繼承自SimpleRateThrottle,(根據ip限制)
class VisitThrottle(SimpleRateThrottle):
scope = 'luffy'
def get_cache_key(self, request, view):
return self.get_ident(request) # 返回xxf或是addr
# 在setting裡配置:(一分鐘訪問三次)
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_RATES':{ # 全域性使用繼續在此字典中寫即可
'luffy':'3/m' # key要跟類中的scop對應
}
}
# 可以全域性使用,也可以區域性使用
# 在setting.py中配置,全域性使用
'DEFAULT_THROTTLE_CLASSES':['app10.myauth.VisitThrottles',]
# 區域性禁用,為空即可
throttle_classes = []
# 區域性使用
# 在檢視下定義
throttle_classes = [myauth.VisitThrottle, ]
2.2 返回中文資訊
class BookInfo(ModelViewSet):
queryset = models.Book.objects.all()
serializer_class = serializer.BookModelSerializer
authentication_classes = [myauth.TokenAuth, ]
throttle_classes = [myauth.VisitThrottle, ]
def throttled(self, request, wait):
from rest_framework.exceptions import Throttled
class VisitThrottled(Throttled):
class MyThrottled(Throttled):
default_detail = '你被限制了,寶貝'
extra_detail_singular = f'還有 {int(wait)} second.'
# 限制之後剩餘的時間
extra_detail_plural = f'已經用了 {int(wait)} seconds.'
# 使用了多長時間被限制
raise MyThrottled(wait)
2.3 限制匿名使用者(根據ip限制)
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_CLASSES': ( # 全域性使用,可以註釋然後區域性使用
'rest_framework.throttling.AnonRateThrottle',
),
'DEFAULT_THROTTLE_RATES': { # 配置的匿名使用者,訪問時間與次數的限制
'anon': '3/m',
}
}
# 使用 `second`, `minute`, `hour` 或`day`來指明週期。
# 可以全域性使用,區域性使用
2.4 限制登入使用者(根據id限制)
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_CLASSES': ( # 全域性使用,可以註釋然後區域性使用
'rest_framework.throttling.UserRateThrottle'
),
'DEFAULT_THROTTLE_RATES': { # 配置的登入使用者,訪問時間與次數的限制
'user': '10/m'
}
}
# 可以全域性使用,區域性使用
2.5 其他
1) AnonRateThrottle
限制所有匿名未認證使用者,使用IP區分使用者。
使用DEFAULT_THROTTLE_RATES['anon']
來設定頻次
2)UserRateThrottle
限制認證使用者,使用User id 來區分。
使用DEFAULT_THROTTLE_RATES['user']
來設定頻次
3)ScopedRateThrottle
限制使用者對於每個檢視的訪問頻次,使用ip或user id。
例如:
class ContactListView(APIView):
throttle_scope = 'contacts'
...
class ContactDetailView(APIView):
throttle_scope = 'contacts'
...
class UploadView(APIView):
throttle_scope = 'uploads'
...
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_CLASSES': (
'rest_framework.throttling.ScopedRateThrottle',
),
'DEFAULT_THROTTLE_RATES': {
'contacts': '1000/day',
'uploads': '20/day'
}
}
模型層choice欄位使用
# 在模型類
class User(models.Model):
username = models.CharField(max_length=32)
password = models.CharField(max_length=32)
user_type = models.CharField(max_length=1, default='0')
gender = models.SmallIntegerField(choices=((1, '男'), (2, '女'), (3, '未知')), default=3)
# 在檢視類中,在序列化類中
-get_欄位名_dispaly()的方法,該方法獲得choice欄位對應的資料
class UserModelSerializer(serializers.ModelSerializer):
gender = serializers.CharField(source='get_gender_display')
# 重寫gender欄位,然後用source執行此函式
class Meta:
model = models.User
fields = '__all__'
練習:在一個介面中實現登入使用者與匿名使用者不同的限制
需要處理登入認證才能檢視介面然後匿名使用者也能檢視介面的矛盾,那就是認證校驗如果不通過自定義不要報錯,return None,如果校驗通過則request.user有值,一但有值,則不經過ip的限制,而是經過user.id的限制,即不走匿名使用者限制的介面,走登入使用者限制的介面。
# auth.py
from rest_framework.authentication import BaseAuthentication
from rest_framework.throttling import SimpleRateThrottle
class TokenAuth(BaseAuthentication):
def authenticate(self, request):
token = request.data.get('token')
token_obj = models.UserToken.objects.filter(token=token)
# 使用者登入時該使用者自帶的token校驗是否存在
if token_obj:
token_obj = token_obj.first()
return (token_obj.user, token_obj) # 可以是一個元組,user與auth接收,形成了request.user方便進行許可權校驗
else:
return None # 不報錯,只起了賦值給request.user的作用
class VisitThrottle(SimpleRateThrottle):
scope = 'luffy'
def get_cache_key(self, request, view):
if not request.user.id:
return self.get_ident(request) # 如果沒有登入使用者就是以Ip限制
else:
return None #
class LoginThrottle(SimpleRateThrottle):
scope = 'login'
def get_cache_key(self, request, view):
return request.user.id
# 直接以user_id為限制,如果沒有登入則是返回None,代表沒有進行此操作
# setting.py
# 進行相關配置
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_RATES': {
'luffy': '3/m', # key要跟類中的scop對應,自定義限制訪問
'login': '10/m'
}
}
# views.py
# 實現一個介面能夠識別登入與匿名的狀態,從而產生不同的限制
class PublishInfo(ModelViewSet):
queryset = models.Publish.objects.all()
serializer_class = serializer.PublishModelSerializer
authentication_classes = [myauth.TokenAuth, ]
# 先進行登入認證,但是不會報錯,方便繼續進行user_id的限制訪問
throttle_classes = [myauth.VisitThrottle, myauth.LoginThrottle]