1. 程式人生 > 實用技巧 >認證,許可權,頻率,分頁,路由

認證,許可權,頻率,分頁,路由

rest-framework之認證元件

一 認證簡介

只有認證通過的使用者才能訪問指定的url地址,比如:查詢課程資訊,需要登入之後才能檢視,沒有登入,就不能檢視,這時候需要用到認證元件

二 區域性使用

(1)models層:

class User(models.Model):
    username=models.CharField(max_length=32)
    password=models.CharField(max_length=32)
    user_type=models.IntegerField(choices=((1,'超級使用者'),(2,'普通使用者'),(3,'
二筆使用者'))) class UserToken(models.Model): user=models.OneToOneField(to='User') token=models.CharField(max_length=64)
View Code

(2)新建認證類(驗證通過return兩個引數)

from rest_framework.authentication import BaseAuthentication
class TokenAuth():
    def authenticate(self, request):
        token = request.GET.get('
token') token_obj = models.UserToken.objects.filter(token=token).first() if token_obj: return else: raise AuthenticationFailed('認證失敗') def authenticate_header(self,request): pass
View Code

(3)view層

def get_random(name):
    import hashlib
    
import time md=hashlib.md5() md.update(bytes(str(time.time()),encoding='utf-8')) md.update(bytes(name,encoding='utf-8')) return md.hexdigest() class Login(APIView): def post(self,reuquest): back_msg={'status':1001,'msg':None} try: name=reuquest.data.get('name') pwd=reuquest.data.get('pwd') user=models.User.objects.filter(username=name,password=pwd).first() if user: token=get_random(name) models.UserToken.objects.update_or_create(user=user,defaults={'token':token}) back_msg['status']='1000' back_msg['msg']='登入成功' back_msg['token']=token else: back_msg['msg'] = '使用者名稱或密碼錯誤' except Exception as e: back_msg['msg']=str(e) return Response(back_msg) class Course(APIView): authentication_classes = [TokenAuth, ] def get(self, request): return HttpResponse('get') def post(self, request): return HttpResponse('post')
View Code

附:不存資料庫的token驗證

def get_token(id,salt='123'):
    import hashlib
    md=hashlib.md5()
    md.update(bytes(str(id),encoding='utf-8'))
    md.update(bytes(salt,encoding='utf-8'))

    return md.hexdigest()+'|'+str(id)

def check_token(token,salt='123'):
    ll=token.split('|')
    import hashlib
    md=hashlib.md5()
    md.update(bytes(ll[-1],encoding='utf-8'))
    md.update(bytes(salt,encoding='utf-8'))
    if ll[0]==md.hexdigest():
        return True
    else:
        return False

class TokenAuth():
    def authenticate(self, request):
        token = request.GET.get('token')
        success=check_token(token)
        if success:
            return
        else:
            raise AuthenticationFailed('認證失敗')
    def authenticate_header(self,request):
        pass
class Login(APIView):
    def post(self,reuquest):
        back_msg={'status':1001,'msg':None}
        try:
            name=reuquest.data.get('name')
            pwd=reuquest.data.get('pwd')
            user=models.User.objects.filter(username=name,password=pwd).first()
            if user:
                token=get_token(user.pk)
                # models.UserToken.objects.update_or_create(user=user,defaults={'token':token})
                back_msg['status']='1000'
                back_msg['msg']='登入成功'
                back_msg['token']=token
            else:
                back_msg['msg'] = '使用者名稱或密碼錯誤'
        except Exception as e:
            back_msg['msg']=str(e)
        return Response(back_msg)
from rest_framework.authentication import BaseAuthentication
class TokenAuth():
    def authenticate(self, request):
        token = request.GET.get('token')
        token_obj = models.UserToken.objects.filter(token=token).first()
        if token_obj:
            return
        else:
            raise AuthenticationFailed('認證失敗')
    def authenticate_header(self,request):
        pass

class Course(APIView):
    authentication_classes = [TokenAuth, ]

    def get(self, request):
        return HttpResponse('get')

    def post(self, request):
        return HttpResponse('post')
View Code

總結:區域性使用,只需要在檢視類里加入:

authentication_classes = [TokenAuth, ]

三 全域性使用

REST_FRAMEWORK={
    "DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",]
}

四 原始碼分析

#Request物件的user方法
@property
def user(self):
the authentication classes provided to the request.
        if not hasattr(self, '_user'):
            with wrap_attributeerrors():
                self._authenticate()
        return self._user

def _authenticate(self):
        for authenticator in self.authenticators:
            try:
                user_auth_tuple = authenticator.authenticate(self)
            except exceptions.APIException:
                self._not_authenticated()
                raise
            #認證成功,可以返回一個元組,但必須是最後一個驗證類才能返回
            if user_auth_tuple is not None:
                self._authenticator = authenticator
                self.user, self.auth = user_auth_tuple
                return

        self._not_authenticated()
View Code

self.authenticators

    def get_authenticators(self):
        return [auth() for auth in self.authentication_classes]
View Code

認證類使用順序:先用檢視類中的驗證類,再用settings裡配置的驗證類,最後用預設的驗證類

rest-framework之許可權元件

一 許可權簡介

只用超級使用者才能訪問指定的資料,普通使用者不能訪問,所以就要有許可權元件對其限制

二 區域性使用

from rest_framework.permissions import BasePermission
class UserPermission(BasePermission):
    message = '不是超級使用者,檢視不了'
    def has_permission(self, request, view):
        # user_type = request.user.get_user_type_display()
        # if user_type == '超級使用者':
        user_type = request.user.user_type
        print(user_type)
        if user_type == 1:
            return True
        else:
            return False
class Course(APIView):
    authentication_classes = [TokenAuth, ]
    permission_classes = [UserPermission,]

    def get(self, request):
        return HttpResponse('get')

    def post(self, request):
        return HttpResponse('post')
View Code

區域性使用只需要在檢視類里加入:

permission_classes = [UserPermission,]

三 全域性使用

REST_FRAMEWORK={
    "DEFAULT_AUTHENTICATION_CLASSES":["app01.service.auth.Authentication",],
    "DEFAULT_PERMISSION_CLASSES":["app01.service.permissions.SVIPPermission",]
}

四 原始碼分析

def check_permissions(self, request):
    for permission in self.get_permissions():
        if not permission.has_permission(request, self):
            self.permission_denied(
                request, message=getattr(permission, 'message', None)
                )
View Code

self.get_permissions()

def get_permissions(self):
     return [permission() for permission in self.permission_classes]
View Code

許可權類使用順序:先用檢視類中的許可權類,再用settings裡配置的許可權類,最後用預設的許可權類

rest-framework之頻率控制

一 頻率簡介

為了控制使用者對某個url請求的頻率,比如,一分鐘以內,只能訪問三次

二 自定義頻率類,自定義頻率規則

自定義的邏輯

#(1)取出訪問者ip
# (2)判斷當前ip不在訪問字典裡,新增進去,並且直接返回True,表示第一次訪問,在字典裡,繼續往下走
# (3)迴圈判斷當前ip的列表,有值,並且當前時間減去列表的最後一個時間大於60s,把這種資料pop掉,這樣列表中只有60s以內的訪問時間,
# (4)判斷,當列表小於3,說明一分鐘以內訪問不足三次,把當前時間插入到列表第一個位置,返回True,順利通過
# (5)當大於等於3,說明一分鐘內訪問超過三次,返回False驗證失敗

程式碼實現:

class MyThrottles():
    VISIT_RECORD = {}
    def __init__(self):
        self.history=None
    def allow_request(self,request, view):
        #(1)取出訪問者ip
        # print(request.META)
        ip=request.META.get('REMOTE_ADDR')
        import time
        ctime=time.time()
        # (2)判斷當前ip不在訪問字典裡,新增進去,並且直接返回True,表示第一次訪問
        if ip not in self.VISIT_RECORD:
            self.VISIT_RECORD[ip]=[ctime,]
            return True
        self.history=self.VISIT_RECORD.get(ip)
        # (3)迴圈判斷當前ip的列表,有值,並且當前時間減去列表的最後一個時間大於60s,把這種資料pop掉,這樣列表中只有60s以內的訪問時間,
        while self.history and ctime-self.history[-1]>60:
            self.history.pop()
        # (4)判斷,當列表小於3,說明一分鐘以內訪問不足三次,把當前時間插入到列表第一個位置,返回True,順利通過
        # (5)當大於等於3,說明一分鐘內訪問超過三次,返回False驗證失敗
        if len(self.history)<3:
            self.history.insert(0,ctime)
            return True
        else:
            return False
    def wait(self):
        import time
        ctime=time.time()
        return 60-(ctime-self.history[-1])
View Code

三 內建頻率類及區域性使用

寫一個類,繼承自SimpleRateThrottle,(根據ip限制)問:要根據使用者現在怎麼寫

from rest_framework.throttling import SimpleRateThrottle
class VisitThrottle(SimpleRateThrottle):
    scope = 'luffy'
    def get_cache_key(self, request, view):
        return self.get_ident(request)

在setting裡配置:(一分鐘訪問三次)

REST_FRAMEWORK = {
    'DEFAULT_THROTTLE_RATES':{
        'luffy':'3/m'
    }
}

在檢視類裡使用

throttle_classes = [MyThrottles,]

錯誤資訊的中文提示:

class Course(APIView):
    authentication_classes = [TokenAuth, ]
    permission_classes = [UserPermission, ]
    throttle_classes = [MyThrottles,]

    def get(self, request):
        return HttpResponse('get')

    def post(self, request):
        return HttpResponse('post')
    def throttled(self, request, wait):
        from rest_framework.exceptions import Throttled
        class MyThrottled(Throttled):
            default_detail = '傻逼啊'
            extra_detail_singular = '還有 {wait} second.'
            extra_detail_plural = '出了 {wait} seconds.'
        raise MyThrottled(wait)
View Code

內建頻率限制類:

BaseThrottle是所有類的基類:方法:def get_ident(self, request)獲取標識,其實就是獲取ip,自定義的需要繼承它

AnonRateThrottle:未登入使用者ip限制,需要配合auth模組用

SimpleRateThrottle:重寫此方法,可以實現頻率現在,不需要咱們手寫上面自定義的邏輯

UserRateThrottle:登入使用者頻率限制,這個得配合auth模組來用

ScopedRateThrottle:應用在區域性檢視上的(忽略)

四 內建頻率類及全域性使用

REST_FRAMEWORK = {
    'DEFAULT_THROTTLE_CLASSES':['app01.utils.VisitThrottle',],
    'DEFAULT_THROTTLE_RATES':{
        'luffy':'3/m'
    }
}

五 原始碼分析

    def check_throttles(self, request):
        for throttle in self.get_throttles():
            if not throttle.allow_request(request, self):
                self.throttled(request, throttle.wait())
    def throttled(self, request, wait):
        #拋異常,可以自定義異常,實現錯誤資訊的中文顯示
        raise exceptions.Throttled(wait)
View Code
class SimpleRateThrottle(BaseThrottle):
    # 咱自己寫的放在了全域性變數,他的在django的快取中
    cache = default_cache
    # 獲取當前時間,跟咱寫的一樣
    timer = time.time
    # 做了一個字串格式化,
    cache_format = 'throttle_%(scope)s_%(ident)s'
    scope = None
    # 從配置檔案中取DEFAULT_THROTTLE_RATES,所以咱配置檔案中應該配置,否則報錯
    THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES

    def __init__(self):
        if not getattr(self, 'rate', None):
            # 從配置檔案中找出scope配置的名字對應的值,比如咱寫的‘3/m’,他取出來
            self.rate = self.get_rate()
        #     解析'3/m',解析成 3      m
        self.num_requests, self.duration = self.parse_rate(self.rate)
    # 這個方法需要重寫
    def get_cache_key(self, request, view):
        """
        Should return a unique cache-key which can be used for throttling.
        Must be overridden.

        May return `None` if the request should not be throttled.
        """
        raise NotImplementedError('.get_cache_key() must be overridden')
    
    def get_rate(self):
        if not getattr(self, 'scope', None):
            msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
                   self.__class__.__name__)
            raise ImproperlyConfigured(msg)

        try:
            # 獲取在setting裡配置的字典中的之,self.scope是 咱寫的luffy
            return self.THROTTLE_RATES[self.scope]
        except KeyError:
            msg = "No default throttle rate set for '%s' scope" % self.scope
            raise ImproperlyConfigured(msg)
    # 解析 3/m這種傳參
    def parse_rate(self, rate):
        """
        Given the request rate string, return a two tuple of:
        <allowed number of requests>, <period of time in seconds>
        """
        if rate is None:
            return (None, None)
        num, period = rate.split('/')
        num_requests = int(num)
        # 只取了第一位,也就是 3/mimmmmmmm也是代表一分鐘
        duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
        return (num_requests, duration)
    # 邏輯跟咱自定義的相同
    def allow_request(self, request, view):
        """
        Implement the check to see if the request should be throttled.

        On success calls `throttle_success`.
        On failure calls `throttle_failure`.
        """
        if self.rate is None:
            return True

        self.key = self.get_cache_key(request, view)
        if self.key is None:
            return True

        self.history = self.cache.get(self.key, [])
        self.now = self.timer()

        # Drop any requests from the history which have now passed the
        # throttle duration
        while self.history and self.history[-1] <= self.now - self.duration:
            self.history.pop()
        if len(self.history) >= self.num_requests:
            return self.throttle_failure()
        return self.throttle_success()
    # 成功返回true,並且插入到快取中
    def throttle_success(self):
        """
        Inserts the current request's timestamp along with the key
        into the cache.
        """
        self.history.insert(0, self.now)
        self.cache.set(self.key, self.history, self.duration)
        return True
    # 失敗返回false
    def throttle_failure(self):
        """
        Called when a request to the API has failed due to throttling.
        """
        return False

    def wait(self):
        """
        Returns the recommended next request time in seconds.
        """
        if self.history:
            remaining_duration = self.duration - (self.now - self.history[-1])
        else:
            remaining_duration = self.duration

        available_requests = self.num_requests - len(self.history) + 1
        if available_requests <= 0:
            return None

        return remaining_duration / float(available_requests)
SimpleRateThrottle原始碼分析

rest-framework之分頁器

一 簡單分頁(檢視第n頁,每頁顯示n條)

from rest_framework.pagination import PageNumberPagination
# 一 基本使用:url=url=http://127.0.0.1:8000/pager/?page=2&size=3,size無效
class  Pager(APIView):
    def get(self,request,*args,**kwargs):
        # 獲取所有資料
        ret=models.Book.objects.all()
        # 建立分頁物件
        page=PageNumberPagination()
        # 在資料庫中獲取分頁的資料
        page_list=page.paginate_queryset(ret,request,view=self)
        # 對分頁進行序列化
        ser=BookSerializer1(instance=page_list,many=True)
        return Response(ser.data)
# 二 自定製 url=http://127.0.0.1:8000/pager/?page=2&size=3
# size=30,無效,最多5條
class Mypage(PageNumberPagination):
    page_size = 2
    page_query_param = 'page'
    # 定製傳參
    page_size_query_param = 'size'
    # 最大一頁的資料
    max_page_size = 5
class  Pager(APIView):
    def get(self,request,*args,**kwargs):
        # 獲取所有資料
        ret=models.Book.objects.all()
        # 建立分頁物件
        page=Mypage()
        # 在資料庫中獲取分頁的資料
        page_list=page.paginate_queryset(ret,request,view=self)
        # 對分頁進行序列化
        ser=BookSerializer1(instance=page_list,many=True)
        # return Response(ser.data)
        # 這個也是返回Response物件,但是比基本的多了上一頁,下一頁,和總資料條數(瞭解即可)
        return page.get_paginated_response(ser.data)

setting裡

REST_FRAMEWORK = {
    # 每頁顯示兩條
    'PAGE_SIZE':2
}

路由:

url(r'^pager/$', views.Pager.as_view()),

Serializers

class BookSerializer1(serializers.ModelSerializer):
    class Meta:
        model=models.Book
        # fields="__all__"
        exclude=('authors',)

二 偏移分頁(在第n個位置,向後檢視n條資料)

# http://127.0.0.1:8000/pager/?offset=4&limit=3
from rest_framework.pagination import LimitOffsetPagination
# 也可以自定製,同簡單分頁
class  Pager(APIView):
    def get(self,request,*args,**kwargs):
        # 獲取所有資料
        ret=models.Book.objects.all()
        # 建立分頁物件
        page=LimitOffsetPagination()
        # 在資料庫中獲取分頁的資料
        page_list=page.paginate_queryset(ret,request,view=self)
        # 對分頁進行序列化
        ser=BookSerializer1(instance=page_list,many=True)
        # return page.get_paginated_response(ser.data)
        return Response(ser.data)

三 CursorPagination(加密分頁,只能看上一頁和下一頁,速度快)

思考:不重寫類,修改類屬性?

from rest_framework.pagination import CursorPagination
# 看原始碼,是通過sql查詢,大於id和小於id
class  Pager(APIView):
    def get(self,request,*args,**kwargs):
        # 獲取所有資料
        ret=models.Book.objects.all()
        # 建立分頁物件
        page=CursorPagination()
        page.ordering='nid'
        # 在資料庫中獲取分頁的資料
        page_list=page.paginate_queryset(ret,request,view=self)
        # 對分頁進行序列化
        ser=BookSerializer1(instance=page_list,many=True)
        # 可以避免頁碼被猜到
        return page.get_paginated_response(ser.data)

rest-framework之url控制

一 自定義路由(原始方式)

from django.conf.urls import url
from app01 import views
urlpatterns = [
    url(r'^books/$', views.BookView.as_view()),
    url(r'^books/(?P<pk>\d+)$', views.BookDetailView.as_view()),
]
class BookView(APIView):

    def get(self, request):
        book_list = models.Book.objects.all()
        bs = BookSerializers(book_list, many=True)
        return Response(bs.data)

    def post(self, request):
        # 新增一條資料
        print(request.data)

        bs=BookSerializers(data=request.data)
        if bs.is_valid():
            bs.save()  # 生成記錄
            return Response(bs.data)
        else:

            return Response(bs.errors)

class BookDetailView(APIView):
    def get(self,request,pk):
        book_obj=models.Book.objects.filter(pk=pk).first()
        bs=BookSerializers(book_obj,many=False)
        return Response(bs.data)
    def put(self,request,pk):
        book_obj = models.Book.objects.filter(pk=pk).first()

        bs=BookSerializers(data=request.data,instance=book_obj)
        if bs.is_valid():
            bs.save() # update
            return Response(bs.data)
        else:
            return Response(bs.errors)
    def delete(self,request,pk):
        models.Book.objects.filter(pk=pk).delete()

        return Response("")

二 半自動路由(檢視類繼承ModelViewSet

from django.conf.urls import url
from app01 import views
urlpatterns = [
    url(r'^publish/$', views.PublishView.as_view({'get':'list','post':'create'})),
    url(r'^publish/(?P<pk>\d+)/$', views.PublishView.as_view({'get':'retrieve','put':'update','delete':'destroy'})),

]
from rest_framework.viewsets import ModelViewSet
class PublishView(ModelViewSet):
    queryset=models.Publish.objects.all()
    serializer_class=PublishSerializers

三 全自動路由(自動生成路由)

from django.conf.urls import url,include
from app01 import views
from rest_framework import routers
router=routers.DefaultRouter()
# 兩個引數,一個是匹配的路由,一個是檢視中寫的CBV的類
router.register('publish',views.PublishView)
urlpatterns = [
    # http://127.0.0.1:8000/publish/format=json(渲染器通過這個判斷,返回渲染的頁面)
    # url(r'^publish/', views.PublishView.as_view({'get':'list','post':'create'})),
    # http://127.0.0.1:8000/publish.json(渲染器通過這個判斷,返回渲染的頁面)
    # url(r'^publish\.(?P<format>\w+)$', views.PublishView.as_view({'get':'list','post':'create'})),
    
    # 可以用 以下方式訪問
    # 1 http://127.0.0.1:8000/publish/
    # 2 http://127.0.0.1:8000/publish.json
    # 3 http://127.0.0.1:8000/publish/3
    # 4 http://127.0.0.1:8000/publish/3.json   
    url(r'',include(router.urls))
]
from rest_framework.viewsets import ModelViewSet
class PublishView(ModelViewSet):
    queryset=models.Publish.objects.all()
    serializer_class=PublishSerializers