This tutorial takes a test-first approach to implementing token-based authentication in a Flask app using JSON Web Tokens (JWTs).
Objectives
By the end of this tutorial, you will be able to…
Discuss the benefits of using JWTs versus sessions and cookies for authentication
Implement user authentication with JWTs
Blacklist user tokens when necessary
Write tests to create and verify JWTs and user authentication
Practice test-driven development
Introduction
JSON Web Tokens (or JWTs) provide a means of transmitting information from the client to the server in a stateless, secure way.
On the server, JWTs are generated by signing user information via a secret key, which are then securely stored on the client. This form of auth works well with modern, single page applications. For more on this, along with the pros and cons of using JWTs vs. session and cookie-based auth, please review the following articles:
NOTE: There may be some variation on the above commands, for creating a database, based upon your version of Postgres. Check for the correct command in the Postgres documentation.
Before applying the database migrations we need to update the config file found in project/server/config.py. Simply update the database_name:
test_app_is_development (test__config.TestDevelopmentConfig) ... ok
test_app_is_production (test__config.TestProductionConfig) ... ok
test_app_is_testing (test__config.TestTestingConfig) ... ok
----------------------------------------------------------------------
Ran 3 tests in 0.007s
OK
Migrations
Add a models.py file to the “server” directory:
12345678910111213141516171819202122232425
# project/server/models.pyimportdatetimefromproject.serverimportapp,db,bcryptclassUser(db.Model):""" User Model for storing user related details """__tablename__="users"id=db.Column(db.Integer,primary_key=True,autoincrement=True)email=db.Column(db.String(255),unique=True,nullable=False)password=db.Column(db.String(255),nullable=False)registered_on=db.Column(db.DateTime,nullable=False)admin=db.Column(db.Boolean,nullable=False,default=False)def__init__(self,email,password,admin=False):self.email=emailself.password=bcrypt.generate_password_hash(password,app.config.get('BCRYPT_LOG_ROUNDS')).decode()self.registered_on=datetime.datetime.now()self.admin=admin
In the above snippet, we define a basic user model, which uses the Flask-Bcrypt extension to hash the password.
(env)$ python manage.py create_db
(env)$ python manage.py db init
(env)$ python manage.py db migrate
Sanity Check
Did it work?
123456789101112
(env)$ psql
# c flask_jwt_authYou are now connected to database "flask_jwt_auth" as user "michael.herman".
# d List of relations
Schema | Name | Type | Owner
--------+-----------------+----------+----------
public | alembic_version | table | postgres
public | users | table | postgres
public | users_id_seq | sequence | postgres
(3 rows)
JWT Setup
The auth workflow works as follows:
Client provides email and password, which is sent to the server
Server then verifies that email and password are correct and responds with an auth token
Client stores the token and sends it along with all subsequent requests to the API
Server decodes the token and validates it
This cycle repeats until the token expires or is revoked. In the latter case, the server issues a new token.
The tokens themselves are divided into three parts:
Header
Payload
Signature
We’ll dive a bit deeper into the payload, but if you’re curious, you can read more about each part from the Introduction to JSON Web Tokens article.
To work with JSON Web Tokens in our app, install the PyJWT package:
Add the following method to the User() class in project/server/models.py:
123456789101112131415161718
defencode_auth_token(self,user_id):""" Generates the Auth Token :return: string """try:payload={'exp':datetime.datetime.utcnow()+datetime.timedelta(days=0,seconds=5),'iat':datetime.datetime.utcnow(),'sub':user_id}returnjwt.encode(payload,app.config.get('SECRET_KEY'),algorithm='HS256')exceptExceptionase:returne
Don’t forget to add the import:
1
importjwt
So, given a user id, this method creates and returns a token from the payload and the secret key set in the config.py file. The payload is where we add metadata about the token and information about the user. This info is often referred to as JWT Claims. We utilize the following “claims”:
exp: expiration date of the token
iat: the time the token is generated
sub: the subject of the token (the user whom it identifies)
The secret key must be random and only accessible server-side. Use the Python interpreter to generate a key:
12345
(env)$ python
>>> import os
>>> os.urandom(24)b"xf9'xe4p(xa9x12x1a!x94x8dx1cx99lxc7xb7exc7cx86x02MJxa0">>>
Similarly, to decode a token, add the following method to the User() class:
1234567891011121314
@staticmethoddefdecode_auth_token(auth_token):""" Decodes the auth token :param auth_token: :return: integer|string """try:payload=jwt.decode(auth_token,app.config.get('SECRET_KEY'))returnpayload['sub']exceptjwt.ExpiredSignatureError:return'Signature expired. Please log in again.'exceptjwt.InvalidTokenError:return'Invalid token. Please log in again.'
We need to decode the auth token with every API request and verify its signature to be sure of the user’s authenticity. To verify the auth_token, we used the same SECRET_KEY used to encode a token.
If the auth_token is valid, we get the user id from the sub index of the payload. If invalid, there could be two exceptions:
Expired Signature: When the token is used after it’s expired, it throws a ExpiredSignatureError exception. This means the time specified in the payload’s exp field has expired.
Invalid Token: When the token supplied is not correct or malformed, then an InvalidTokenError exception is raised.
NOTE: We have used a static method since it does not relate to the class’s instance.
NOTE: We will handle invalid tokens by blacklisting them later.
Route Setup
Now we can configure the auth routes using a test-first approach:
/auth/register
/auth/login
/auth/logout
/auth/user
Start by creating a new folder called “auth” in “project/server”. Then, within “auth” add two files, __init__.py and views.py. Finally, add the following code to views.py:
deftest_registration(self):""" Test for user registration """withself.client:response=self.client.post('/auth/register',data=json.dumps(dict(email='joe@gmail.com',password='123456')),content_type='application/json')data=json.loads(response.data.decode())self.assertTrue(data['status']=='success')self.assertTrue(data['message']=='Successfully registered.')self.assertTrue(data['auth_token'])self.assertTrue(response.content_type=='application/json')self.assertEqual(response.status_code,201)
Make sure to add the import:
1
importjson
Run the tests. You should see the following error:
12
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Now, let’s write the code to get the test to pass. Add the following to project/server/auth/views.py:
classRegisterAPI(MethodView):""" User Registration Resource """defpost(self):# get the post datapost_data=request.get_json()# check if user already existsuser=User.query.filter_by(email=post_data.get('email')).first()ifnotuser:try:user=User(email=post_data.get('email'),password=post_data.get('password'))# insert the userdb.session.add(user)db.session.commit()# generate the auth tokenauth_token=user.encode_auth_token(user.id)responseObject={'status':'success','message':'Successfully registered.','auth_token':auth_token.decode()}returnmake_response(jsonify(responseObject)),201exceptExceptionase:responseObject={'status':'fail','message':'Some error occurred. Please try again.'}returnmake_response(jsonify(responseObject)),401else:responseObject={'status':'fail','message':'User already exists. Please Log in.',}returnmake_response(jsonify(responseObject)),202# define the API resourcesregistration_view=RegisterAPI.as_view('register_api')# add Rules for API Endpointsauth_blueprint.add_url_rule('/auth/register',view_func=registration_view,methods=['POST'])
Here, we register a new user and generate a new auth token for further requests, which we send back to the client.
Run the tests to ensure they all pass:
123
Ran 6 tests in 0.132s
OK
Next, let’s add one more test to ensure the registration fails if the user already exists:
1234567891011121314151617181920212223
deftest_registered_with_already_registered_user(self):""" Test registration with already registered email"""user=User(email='joe@gmail.com',password='test')db.session.add(user)db.session.commit()withself.client:response=self.client.post('/auth/register',data=json.dumps(dict(email='joe@gmail.com',password='123456')),content_type='application/json')data=json.loads(response.data.decode())self.assertTrue(data['status']=='fail')self.assertTrue(data['message']=='User already exists. Please Log in.')self.assertTrue(response.content_type=='application/json')self.assertEqual(response.status_code,202)
Run the tests again before moving on to the next route. All should pass.
Login Route
Again, start with a test. To verify the login API, let’s test for two cases:
deftest_registered_user_login(self):""" Test for login of registered-user login """withself.client:# user registrationresp_register=self.client.post('/auth/register',data=json.dumps(dict(email='joe@gmail.com',password='123456')),content_type='application/json',)data_register=json.loads(resp_register.data.decode())self.assertTrue(data_register['status']=='success')self.assertTrue(data_register['message']=='Successfully registered.')self.assertTrue(data_register['auth_token'])self.assertTrue(resp_register.content_type=='application/json')self.assertEqual(resp_register.status_code,201)# registered user loginresponse=self.client.post('/auth/login',data=json.dumps(dict(email='joe@gmail.com',password='123456')),content_type='application/json')data=json.loads(response.data.decode())self.assertTrue(data['status']=='success')self.assertTrue(data['message']=='Successfully logged in.')self.assertTrue(data['auth_token'])self.assertTrue(response.content_type=='application/json')self.assertEqual(response.status_code,200)
In this test case, the registered user tries to log in and, as expected, our application should allow this.
Run the tests. They should fail. Now write the code:
123456789101112131415161718192021222324252627
classLoginAPI(MethodView):""" User Login Resource """defpost(self):# get the post datapost_data=request.get_json()try:# fetch the user datauser=User.query.filter_by(email=post_data.get('email')).first()auth_token=user.encode_auth_token(user.id)ifauth_token:responseObject={'status':'success','message':'Successfully logged in.','auth_token':auth_token.decode()}returnmake_response(jsonify(responseObject)),200exceptExceptionase:print(e)responseObject={'status':'fail','message':'Try again'}returnmake_response(jsonify(responseObject)),500
# define the API resourcesregistration_view=RegisterAPI.as_view('register_api')login_view=LoginAPI.as_view('login_api')# add Rules for API Endpointsauth_blueprint.add_url_rule('/auth/register',view_func=registration_view,methods=['POST'])auth_blueprint.add_url_rule('/auth/login',view_func=login_view,methods=['POST'])
Run the tests again. Do they pass? They should. Don’t move on until all tests pass.
Non-Registered user login
Add the test:
12345678910111213141516
deftest_non_registered_user_login(self):""" Test for login of non-registered user """withself.client:response=self.client.post('/auth/login',data=json.dumps(dict(email='joe@gmail.com',password='123456')),content_type='application/json')data=json.loads(response.data.decode())self.assertTrue(data['status']=='fail')self.assertTrue(data['message']=='User does not exist.')self.assertTrue(response.content_type=='application/json')self.assertEqual(response.status_code,404)
In this case, a non-registered user attempts to log in and, as expected, our application should not allow this.
classLoginAPI(MethodView):""" User Login Resource """defpost(self):# get the post datapost_data=request.get_json()try:# fetch the user datauser=User.query.filter_by(email=post_data.get('email')).first()ifuserandbcrypt.check_password_hash(user.password,post_data.get('password')):auth_token=user.encode_auth_token(user.id)ifauth_token:responseObject={'status':'success','message':'Successfully logged in.','auth_token':auth_token.decode()}returnmake_response(jsonify(responseObject)),200else:responseObject={'status':'fail','message':'User does not exist.'}returnmake_response(jsonify(responseObject)),404exceptExceptionase:print(e)responseObject={'status':'fail','message':'Try again'}returnmake_response(jsonify(responseObject)),500
What did we change? Do the tests pass? What if the email is correct but the password is incorrect? What happens? Write a test for this!
User Status Route
In order to get the user details of the currently logged in user, the auth token must be sent with the request within the header.
Start with a test:
12345678910111213141516171819202122232425
deftest_user_status(self):""" Test for user status """withself.client:resp_register=self.client.post('/auth/register',data=json.dumps(dict(email='joe@gmail.com',password='123456')),content_type='application/json')response=self.client.get('/auth/status',headers=dict(Authorization='Bearer '+json.loads(resp_register.data.decode())['auth_token']))data=json.loads(response.data.decode())self.assertTrue(data['status']=='success')self.assertTrue(data['data']isnotNone)self.assertTrue(data['data']['email']=='joe@gmail.com')self.assertTrue(data['data']['admin']is'true'or'false')self.assertEqual(response.status_code,200)
The test should fail. Now, in the handler class, we should:
extract the auth token and check its validity
grab the user id from the payload and get the user details (if the token is valid, of course)
classUserAPI(MethodView):""" User Resource """defget(self):# get the auth tokenauth_header=request.headers.get('Authorization')ifauth_header:auth_token=auth_header.split(" ")[1]else:auth_token=''ifauth_token:resp=User.decode_auth_token(auth_token)ifnotisinstance(resp,str):user=User.query.filter_by(id=resp).first()responseObject={'status':'success','data':{'user_id':user.id,'email':user.email,'admin':user.admin,'registered_on':user.registered_on}}returnmake_response(jsonify(responseObject)),200responseObject={'status':'fail','message':resp}returnmake_response(jsonify(responseObject)),401else:responseObject={'status':'fail','message':'Provide a valid auth token.'}returnmake_response(jsonify(responseObject)),401
So, if the token is valid and not expired, we get the user id from the token’s payload, which is then used to get the user data from the database.
NOTE: We still need to check if a token is blacklisted. We’ll get to this shortly.
deftest_valid_logout(self):""" Test for logout before token expires """withself.client:# user registrationresp_register=self.client.post('/auth/register',data=json.dumps(dict(email='joe@gmail.com',password='123456')),content_type='application/json',)data_register=json.loads(resp_register.data.decode())self.assertTrue(data_register['status']=='success')self.assertTrue(data_register['message']=='Successfully registered.')self.assertTrue(data_register['auth_token'])self.assertTrue(resp_register.content_type=='application/json')self.assertEqual(resp_register.status_code,201)# user loginresp_login=self.client.post('/auth/login',data=json.dumps(dict(email='joe@gmail.com',password='123456')),content_type='application/json')data_login=json.loads(resp_login.data.decode())self.assertTrue(data_login['status']=='success')self.assertTrue(data_login['message']=='Successfully logged in.')self.assertTrue(data_login['auth_token'])self.assertTrue(resp_login.content_type=='application/json')self.assertEqual(resp_login.status_code,200)# valid token logoutresponse=self.client.post('/auth/logout',headers=dict(Authorization='Bearer '+json.loads(resp_login.data.decode())['auth_token']))data=json.loads(response.data.decode())self.assertTrue(data['status']=='success')self.assertTrue(data['message']=='Successfully logged out.')self.assertEqual(response.status_code,200)
In this first test, we register a new user, log them in, and then attempt to log them out before the token expires.
deftest_invalid_logout(self):""" Testing logout after the token expires """withself.client:# user registrationresp_register=self.client.post('/auth/register',data=json.dumps(dict(email='joe@gmail.com',password='123456')),content_type='application/json',)data_register=json.loads(resp_register.data.decode())self.assertTrue(data_register['status']=='success')self.assertTrue(data_register['message']=='Successfully registered.')self.assertTrue(data_register['auth_token'])self.assertTrue(resp_register.content_type=='application/json')self.assertEqual(resp_register.status_code,201)# user loginresp_login=self.client.post('/auth/login',data=json.dumps(dict(email='joe@gmail.com',password='123456')),content_type='application/json')data_login=json.loads(resp_login.data.decode())self.assertTrue(data_login['status']=='success')self.assertTrue(data_login['message']=='Successfully logged in.')self.assertTrue(data_login['auth_token'])self.assertTrue(resp_login.content_type=='application/json')self.assertEqual(resp_login.status_code,200)# invalid token logouttime.sleep(6)response=self.client.post('/auth/logout',headers=dict(Authorization='Bearer '+json.loads(resp_login.data.decode())['auth_token']))data=json.loads(response.data.decode())self.assertTrue(data['status']=='fail')self.assertTrue(data['message']=='Signature expired. Please log in again.')self.assertEqual(response.status_code,401)
Like the last test, we register a user, log them in, and then attempt to log them out. In this case, the token is invalid since it has expired.
Add the import:
1
importtime
Now, the code must:
validate the auth token
blacklist the token (if valid, of course)
Before writing the route handler, let’s create a new model for blacklisting tokens…
Blacklist
Add the following code to project/server/models.py:
12345678910111213141516
classBlacklistToken(db.Model):""" Token Model for storing JWT tokens """__tablename__='blacklist_tokens'id=db.Column(db.Integer,primary_key=True,autoincrement=True)token=db.Column(db.String(500),unique=True,nullable=False)blacklisted_on=db.Column(db.DateTime,nullable=False)def__init__(self,token):self.token=tokenself.blacklisted_on=datetime.datetime.now()def__repr__(self):return'<id: token: {}'.format(self.token)
Then create and apply the migrations. Once done, your database should have the following tables:
12345678
Schema | Name | Type | Owner
--------+-------------------------+----------+----------
public | alembic_version | table | postgres
public | blacklist_tokens | table | postgres
public | blacklist_tokens_id_seq | sequence | postgres
public | users | table | postgres
public | users_id_seq | sequence | postgres
(5 rows)
classLogoutAPI(MethodView):""" Logout Resource """defpost(self):# get auth tokenauth_header=request.headers.get('Authorization')ifauth_header:auth_token=auth_header.split(" ")[1]else:auth_token=''ifauth_token:resp=User.decode_auth_token(auth_token)ifnotisinstance(resp,str):# mark the token as blacklistedblacklist_token=BlacklistToken(token=auth_token)try:# insert the tokendb.session.add(blacklist_token)db.session.commit()responseObject={'status':'success','message':'Successfully logged out.'}returnmake_response(jsonify(responseObject)),200exceptExceptionase:responseObject={'status':'fail','message':e}returnmake_response(jsonify(responseObject)),200else:responseObject={'status':'fail','message':resp}returnmake_response(jsonify(responseObject)),401else:responseObject={'status':'fail','message':'Provide a valid auth token.'}returnmake_response(jsonify(responseObject)),403# define the API resourcesregistration_view=RegisterAPI.as_view('register_api')login_view=LoginAPI.as_view('login_api')user_view=UserAPI.as_view('user_api')logout_view=LogoutAPI.as_view('logout_api')# add Rules for API Endpointsauth_blueprint.add_url_rule('/auth/register',view_func=registration_view,methods=['POST'])auth_blueprint.add_url_rule('/auth/login',view_func=login_view,methods=['POST'])auth_blueprint.add_url_rule('/auth/status',view_func=user_view,methods=['GET'])auth_blueprint.add_url_rule('/auth/logout',view_func=logout_view,methods=['POST'])
When a users logs out, the token is no longer valid so we add it to the blacklist.
NOTE: Often, larger applications have a way to renew blacklisted tokens every now and then so that the system does not run out of valid tokens.
Run the tests:
123
Ran 12 tests in 6.418s
OK
Refactoring
Finally, we need to ensure that a token has not been blacklisted, right after the token has been decoded – decode_auth_token() – within the logout and user status routes.
deftest_valid_blacklisted_token_logout(self):""" Test for logout after a valid token gets blacklisted """withself.client:# user registrationresp_register=self.client.post('/auth/register',data=json.dumps(dict(email='joe@gmail.com',password='123456')),content_type='application/json',)data_register=json.loads(resp_register.data.decode())self.assertTrue(data_register['status']=='success')self.assertTrue(data_register['message']=='Successfully registered.')self.assertTrue(data_register['auth_token'])self.assertTrue(resp_register.content_type=='application/json')self.assertEqual(resp_register.status_code,201)# user loginresp_login=self.client.post('/auth/login',data=json.dumps(dict(email='joe@gmail.com',password='123456')),content_type='application/json')data_login=json.loads(resp_login.data.decode())self.assertTrue(data_login['status']=='success')self.assertTrue(data_login['message']=='Successfully logged in.')self.assertTrue(data_login['auth_token'])self.assertTrue(resp_login.content_type=='application/json')self.assertEqual(resp_login.status_code,200)# blacklist a valid tokenblacklist_token=BlacklistToken(token=json.loads(resp_login.data.decode())['auth_token'])db.session.add(blacklist_token)db.session.commit()# blacklisted valid token logoutresponse=self.client.post('/auth/logout',headers=dict(Authorization='Bearer '+json.loads(resp_login.data.decode())['auth_token']))data=json.loads(response.data.decode())self.assertTrue(data['status']=='fail')self.assertTrue(data['message']=='Token blacklisted. Please log in again.')self.assertEqual(response.status_code,401)
In this test, we blacklist the token just before the logout route gets hit which makes our valid token unusable.
Now update the decode_auth_token function to handle already blacklisted tokens right after the decode and respond with appropriate message.
123456789101112131415161718
@staticmethoddefdecode_auth_token(auth_token):""" Validates the auth token :param auth_token: :return: integer|string """try:payload=jwt.decode(auth_token,app.config.get('SECRET_KEY'))is_blacklisted_token=BlacklistToken.check_blacklist(auth_token)ifis_blacklisted_token:return'Token blacklisted. Please log in again.'else:returnpayload['sub']exceptjwt.ExpiredSignatureError:return'Signature expired. Please log in again.'exceptjwt.InvalidTokenError:return'Invalid token. Please log in again.'
Finally, add the check_blacklist() function to project/server/models.py in the BlacklistToken class:
12345678
@staticmethoddefcheck_blacklist(auth_token):# check whether auth token has been blacklistedres=BlacklistToken.query.filter_by(token=auth_token).first()ifres:returnTrueelse:returnFalse
Before you run the test, update test_decode_auth_token to convert the bytes object to a string:
In a similar fashion, add one more test for the user status route.
12345678910111213141516171819202122232425262728
deftest_valid_blacklisted_token_user(self):""" Test for user status with a blacklisted valid token """withself.client:resp_register=self.client.post('/auth/register',data=json.dumps(dict(email='joe@gmail.com',password='123456')),content_type='application/json')# blacklist a valid tokenblacklist_token=BlacklistToken(token=json.loads(resp_register.data.decode())['auth_token'])db.session.add(blacklist_token)db.session.commit()response=self.client.get('/auth/status',headers=dict(Authorization='Bearer '+json.loads(resp_register.data.decode())['auth_token']))data=json.loads(response.data.decode())self.assertTrue(data['status']=='fail')self.assertTrue(data['message']=='Token blacklisted. Please log in again.')self.assertEqual(response.status_code,401)
Similar to the last test, we blacklisted the token before the user status route gets hit.
Run the tests for one final time:
123
Ran 14 tests in 10.206s
OK
Code Smell
Finally, take a look at test_auth.py. Notice the duplicate code? For example:
Now, anywhere you need to register a user, you can call the helper:
1
register_user(self,'joe@gmail.com','123456')
How about logging in a user? Refactor it on your own. What else can you refactor? Comment below.
Conclusion
In this tutorial, we went through the process of adding authentication to a Flask app with JSON Web Tokens. Turn back to the objectives from the beginning of this tutorial. Can you put each one into action? What did you learn?