1. 程式人生 > >Token-Based Authentication With Flask

Token-Based Authentication With Flask

This tutorial takes a test-first approach to implementing token-based authentication in a Flask app using JSON Web Tokens (JWTs).

Updates:

Objectives

By the end of this tutorial, you will be able to…

  1. Discuss the benefits of using JWTs versus sessions and cookies for authentication
  2. Implement user authentication with JWTs
  3. Blacklist user tokens when necessary
  4. Write tests to create and verify JWTs and user authentication
  5. Practice test-driven development

Introduction

JSON Web Tokens (or JWTs) provide a means of transmitting information from the client to the server in a stateless

, secure way.

On the server, JWTs are generated by signing user information via a secret key, which are then securely stored on the client. This form of auth works well with modern, single page applications. For more on this, along with the pros and cons of using JWTs vs. session and cookie-based auth, please review the following articles:

NOTE: Keep in mind that since a JWT is signed rather than encrypted it should never contain sensitive information like a user’s password.

Getting Started

Enough theory, let’s start implementing some code!

Project Setup

Start by cloning the project boilerplate and then create a new branch:

$ git clone https://github.com/realpython/flask-jwt-auth.git
$ cd flask-jwt-auth
$ git checkout tags/1.0.0 -b jwt-auth

Create and activate a virtualenv and install the dependencies:

$ python3.6 -m venv env
$ source env/bin/activate
(env)$ pip install -r requirements.txt

This is optional, but it’s a good idea to create a new Github repository and update the remote:

(env)$ git remote set-url origin <newurl>

Database Setup

Let’s set up Postgres.

NOTE: If you’re on a Mac, check out Postgres app.

Once the local Postgres server is running, create two new databases from psql that share the same name as your project name:

(env)$ psql
# create database flask_jwt_auth;
CREATE DATABASE
# create database flask_jwt_auth_test;
CREATE DATABASE
# \q

NOTE: There may be some variation on the above commands, for creating a database, based upon your version of Postgres. Check for the correct command in the Postgres documentation.

Before applying the database migrations we need to update the config file found in project/server/config.py. Simply update the database_name:

database_name = 'flask_jwt_auth'

Set the environment variables in the terminal:

(env)$ export APP_SETTINGS="project.server.config.DevelopmentConfig"

Update the following tests in project/tests/test__config.py:

class TestDevelopmentConfig(TestCase):
    def create_app(self):
        app.config.from_object('project.server.config.DevelopmentConfig')
        return app

    def test_app_is_development(self):
        self.assertTrue(app.config['DEBUG'] is True)
        self.assertFalse(current_app is None)
        self.assertTrue(
            app.config['SQLALCHEMY_DATABASE_URI'] == 'postgresql://postgres:@localhost/flask_jwt_auth'
        )


class TestTestingConfig(TestCase):
    def create_app(self):
        app.config.from_object('project.server.config.TestingConfig')
        return app

    def test_app_is_testing(self):
        self.assertTrue(app.config['DEBUG'])
        self.assertTrue(
            app.config['SQLALCHEMY_DATABASE_URI'] == 'postgresql://postgres:@localhost/flask_jwt_auth_test'
        )

Run them to ensure they still pass:

(env)$ python manage.py test

You should see:

test_app_is_development (test__config.TestDevelopmentConfig) ... ok
test_app_is_production (test__config.TestProductionConfig) ... ok
test_app_is_testing (test__config.TestTestingConfig) ... ok

----------------------------------------------------------------------
Ran 3 tests in 0.007s

OK

Migrations

Add a models.py file to the “server” directory:

# project/server/models.py

import datetime

from project.server import app, db, bcrypt


class User(db.Model):
    """ User Model for storing user related details """
    __tablename__ = "users"

    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    email = db.Column(db.String(255), unique=True, nullable=False)
    password = db.Column(db.String(255), nullable=False)
    registered_on = db.Column(db.DateTime, nullable=False)
    admin = db.Column(db.Boolean, nullable=False, default=False)

    def __init__(self, email, password, admin=False):
        self.email = email
        self.password = bcrypt.generate_password_hash(
            password, app.config.get('BCRYPT_LOG_ROUNDS')
        ).decode()
        self.registered_on = datetime.datetime.now()
        self.admin = admin

In the above snippet, we define a basic user model, which uses the Flask-Bcrypt extension to hash the password.

Install psycopg2 to connect to Postgres:

(env)$ pip install psycopg2==2.6.2
(env)$ pip freeze > requirements.txt

Within manage.py change-

from project.server import app, db

To-

from project.server import app, db, models

Apply the migration:

(env)$ python manage.py create_db
(env)$ python manage.py db init
(env)$ python manage.py db migrate

Sanity Check

Did it work?

(env)$ psql
# \c flask_jwt_auth
You are now connected to database "flask_jwt_auth" as user "michael.herman".
# \d

               List of relations
 Schema |      Name       |   Type   |  Owner
--------+-----------------+----------+----------
 public | alembic_version | table    | postgres
 public | users           | table    | postgres
 public | users_id_seq    | sequence | postgres
(3 rows)

JWT Setup

The auth workflow works as follows:

  • Client provides email and password, which is sent to the server
  • Server then verifies that email and password are correct and responds with an auth token
  • Client stores the token and sends it along with all subsequent requests to the API
  • Server decodes the token and validates it

This cycle repeats until the token expires or is revoked. In the latter case, the server issues a new token.

The tokens themselves are divided into three parts:

  • Header
  • Payload
  • Signature

We’ll dive a bit deeper into the payload, but if you’re curious, you can read more about each part from the Introduction to JSON Web Tokens article.

To work with JSON Web Tokens in our app, install the PyJWT package:

(env)$ pip install pyjwt==1.4.2
(env)$ pip freeze > requirements.txt

Encode Token

Add the following method to the User() class in project/server/models.py:

def encode_auth_token(self, user_id):
    """
    Generates the Auth Token
    :return: string
    """
    try:
        payload = {
            'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, seconds=5),
            'iat': datetime.datetime.utcnow(),
            'sub': user_id
        }
        return jwt.encode(
            payload,
            app.config.get('SECRET_KEY'),
            algorithm='HS256'
        )
    except Exception as e:
        return e

Don’t forget to add the import:

import jwt

So, given a user id, this method creates and returns a token from the payload and the secret key set in the config.py file. The payload is where we add metadata about the token and information about the user. This info is often referred to as JWT Claims. We utilize the following “claims”:

  • exp: expiration date of the token
  • iat: the time the token is generated
  • sub: the subject of the token (the user whom it identifies)

The secret key must be random and only accessible server-side. Use the Python interpreter to generate a key:

>>>
>>> import os
>>> os.urandom(24)
b"\xf9'\xe4p(\xa9\x12\x1a!\x94\x8d\x1c\x99l\xc7\xb7e\xc7c\x86\x02MJ\xa0"

Set the key as an environment variable:

(env)$ export SECRET_KEY="\xf9'\xe4p(\xa9\x12\x1a!\x94\x8d\x1c\x99l\xc7\xb7e\xc7c\x86\x02MJ\xa0"

Add this key to the SECRET_KEY within the BaseConfig() class in project/server/config.py:

SECRET_KEY = os.getenv('SECRET_KEY', 'my_precious')

Update the tests within project/tests/test__config.py to ensure the variable is set correctly:

def test_app_is_development(self):
    self.assertFalse(app.config['SECRET_KEY'] is 'my_precious')
    self.assertTrue(app.config['DEBUG'] is True)
    self.assertFalse(current_app is None)
    self.assertTrue(
        app.config['SQLALCHEMY_DATABASE_URI'] == 'postgresql://postgres:@localhost/flask_jwt_auth'
    )


class TestTestingConfig(TestCase):
    def create_app(self):
        app.config.from_object('project.server.config.TestingConfig')
        return app

    def test_app_is_testing(self):
        self.assertFalse(app.config['SECRET_KEY'] is 'my_precious')
        self.assertTrue(app.config['DEBUG'])
        self.assertTrue(
            app.config['SQLALCHEMY_DATABASE_URI'] == 'postgresql://postgres:@localhost/flask_jwt_auth_test'
        )

Before moving on, let’s write a quick unit test for the user model. Add the following code to a new file called test_user_model.py in “project/tests”:

# project/tests/test_user_model.py

import unittest

from project.server import db
from project.server.models import User
from project.tests.base import BaseTestCase


class TestUserModel(BaseTestCase):

    def test_encode_auth_token(self):
        user = User(
            email='[email protected]',
            password='test'
        )
        db.session.add(user)
        db.session.commit()
        auth_token = user.encode_auth_token(user.id)
        self.assertTrue(isinstance(auth_token, bytes))

if __name__ == '__main__':
    unittest.main()

Run the tests. They all should pass.

Decode Token

Similarly, to decode a token, add the following method to the User() class:

@staticmethod
def decode_auth_token(auth_token):
    """
    Decodes the auth token
    :param auth_token:
    :return: integer|string
    """
    try:
        payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'))
        return payload['sub']
    except jwt.ExpiredSignatureError:
        return 'Signature expired. Please log in again.'
    except jwt.InvalidTokenError:
        return 'Invalid token. Please log in again.'

We need to decode the auth token with every API request and verify its signature to be sure of the user’s authenticity. To verify the auth_token, we used the same SECRET_KEY used to encode a token.

If the auth_token is valid, we get the user id from the sub index of the payload. If invalid, there could be two exceptions:

  1. Expired Signature: When the token is used after it’s expired, it throws a ExpiredSignatureError exception. This means the time specified in the payload’s exp field has expired.
  2. Invalid Token: When the token supplied is not correct or malformed, then an InvalidTokenError exception is raised.

NOTE: We have used a static method since it does not relate to the class’s instance.

Add a test to test_user_model.py:

def test_decode_auth_token(self):
    user = User(
        email='[email protected]',
        password='test'
    )
    db.session.add(user)
    db.session.commit()
    auth_token = user.encode_auth_token(user.id)
    self.assertTrue(isinstance(auth_token, bytes))
    self.assertTrue(User.decode_auth_token(auth_token) == 1)

Make sure the tests pass before moving on.

NOTE: We will handle invalid tokens by blacklisting them later.

Route Setup

Now we can configure the auth routes using a test-first approach:

  • /auth/register
  • /auth/login
  • /auth/logout
  • /auth/user

Start by creating a new folder called “auth” in “project/server”. Then, within “auth” add two files, __init__.py and views.py. Finally, add the following code to views.py:

# project/server/auth/views.py

from flask import Blueprint, request, make_response, jsonify
from flask.views import MethodView

from project.server import bcrypt, db
from project.server.models import User

auth_blueprint = Blueprint('auth', __name__)

To register the new Blueprint with the app, add the following to the bottom of project/server/__init__.py:

from project.server.auth.views import auth_blueprint
app.register_blueprint(auth_blueprint)

Now, add a new file called test_auth.py within “project/tests” to hold all our tests for this Blueprint:

# project/tests/test_auth.py

import unittest

from project.server import db
from project.server.models import User
from project.tests.base import BaseTestCase


class TestAuthBlueprint(BaseTestCase):
    pass


if __name__ == '__main__':
    unittest.main()

Register Route

Start with a test:

def test_registration(self):
    """ Test for user registration """
    with self.client:
        response = self.client.post(
            '/auth/register',
            data=json.dumps(dict(
                email='[email protected]',
                password='123456'
            )),
            content_type='application/json'
        )
        data = json.loads(response.data.decode())
        self.assertTrue(data['status'] == 'success')
        self.assertTrue(data['message'] == 'Successfully registered.')
        self.assertTrue(data['auth_token'])
        self.assertTrue(response.content_type == 'application/json')
        self.assertEqual(response.status_code, 201)

Make sure to add the import:

import json

Run the tests. You should see the following error:

raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

Now, let’s write the code to get the test to pass. Add the following to project/server/auth/views.py:

class RegisterAPI(MethodView):
    """
    User Registration Resource
    """

    def post(self):
        # get the post data
        post_data = request.get_json()
        # check if user already exists
        user = User.query.filter_by(email=post_data.get('email')).first()
        if not user:
            try:
                user = User(
                    email=post_data.get('email'),
                    password=post_data.get('password')
                )

                # insert the user
                db.session.add(user)
                db.session.commit()
                # generate the auth token
                auth_token = user.encode_auth_token(user.id)
                responseObject = {
                    'status': 'success',
                    'message': 'Successfully registered.',
                    'auth_token': auth_token.decode()
                }
                return make_response(jsonify(responseObject)), 201
            except Exception as e:
                responseObject = {
                    'status': 'fail',
                    'message': 'Some error occurred. Please try again.'
                }
                return make_response(jsonify(responseObject)), 401
        else:
            responseObject = {
                'status': 'fail',
                'message': 'User already exists. Please Log in.',
            }
            return make_response(jsonify(responseObject)), 202

# define the API resources
registration_view = RegisterAPI.as_view('register_api')

# add Rules for API Endpoints
auth_blueprint.add_url_rule(
    '/auth/register',
    view_func=registration_view,
    methods=['POST']
)

Here, we register a new user and generate a new auth token for further requests, which we send back to the client.

Run the tests to ensure they all pass:

Ran 6 tests in 0.132s

OK

Next, let’s add one more test to ensure the registration fails if the user already exists:

def test_registered_with_already_registered_user(self):
    """ Test registration with already registered email"""
    user = User(
        email='[email protected]',
        password='test'
    )
    db.session.add(user)
    db.session.commit()
    with self.client:
        response = self.client.post(
            '/auth/register',
            data=json.dumps(dict(
                email='[email protected]',
                password='123456'
            )),
            content_type='application/json'
        )
        data = json.loads(response.data.decode())
        self.assertTrue(data['status'] == 'fail')
        self.assertTrue(
            data['message'] == 'User already exists. Please Log in.')
        self.assertTrue(response.content_type == 'application/json')
        self.assertEqual(response.status_code, 202)

Run the tests again before moving on to the next route. All should pass.

Login Route

Again, start with a test. To verify the login API, let’s test for two cases:

  1. Registered user login
  2. Non-registered user login

Registered user login

def test_registered_user_login(self):
    """ Test for login of registered-user login """
    with self.client:
        # user registration
        resp_register = self.client.post(
            '/auth/register',
            data=json.dumps(dict(
                email='[email protected]',
                password='123456'
            )),
            content_type='application/json',
        )
        data_register = json.loads(resp_register.data.decode())
        self.assertTrue(data_register['status'] == 'success')
        self.assertTrue(
            data_register['message'] == 'Successfully registered.'
        )
        self.assertTrue(data_register['auth_token'])
        self.assertTrue(resp_register.content_type == 'application/json')
        self.assertEqual(resp_register.status_code, 201)
        # registered user login
        response = self.client.post(
            '/auth/login',
            data=json.dumps(dict(
                email='[email protected]',
                password='123456'
            )),
            content_type='application/json'
        )
        data = json.loads(response.data.decode())
        self.assertTrue(data['status'] == 'success')
        self.assertTrue(data['message'] == 'Successfully logged in.')
        self.assertTrue(data['auth_token'])
        self.assertTrue(response.content_type == 'application/json')
        self.assertEqual(response.status_code, 200)

In this test case, the registered user tries to log in and, as expected, our application should allow this.

Run the tests. They should fail. Now write the code:

class LoginAPI(MethodView):
    """
    User Login Resource
    """
    def post(self):
        # get the post data
        post_data = request.get_json()
        try:
            # fetch the user data
            user = User.query.filter_by(
                email=post_data.get('email')
              ).first()
            auth_token = user.encode_auth_token(user.id)
            if auth_token:
                responseObject = {
                    'status': 'success',
                    'message': 'Successfully logged in.',
                    'auth_token': auth_token.decode()
                }
                return make_response(jsonify(responseObject)), 200
        except Exception as e:
            print(e)
            responseObject = {
                'status': 'fail',
                'message': 'Try again'
            }
            return make_response(jsonify(responseObject)), 500
# define the API resources
registration_view = RegisterAPI.as_view('register_api')
login_view = LoginAPI.as_view('login_api')

# add Rules for API Endpoints
auth_blueprint.add_url_rule(
    '/auth/register',
    view_func=registration_view,
    methods=['POST']
)
auth_blueprint.add_url_rule(
    '/auth/login',
    view_func=login_view,
    methods=['POST']
)

Run the tests again. Do they pass? They should. Don’t move on until all tests pass.

Non-Registered user login

Add the test:

def test_non_registered_user_login(self):
    """ Test for login of non-registered user """
    with self.client:
        response = self.client.post(
            '/auth/login',
            data=json.dumps(dict(
                email='[email protected]',
                password='123456'
            )),
            content_type='application/json'
        )
        data = json.loads(response.data.decode())
        self.assertTrue(data['status'] == 'fail')
        self.assertTrue