Python Flask Restful token验证
服务端:
#!/usr/bin/env python
import os
import time
from flask import Flask, abort, request, jsonify, g, url_for
from flask_sqlalchemy import SQLAlchemy
from flask_httpauth import HTTPBasicAuth
import jwt
from werkzeug.security import generate_password_hash, check_password_hash
# initialization
app = Flask(__name__)
app.config['SECRET_KEY'] = 'the quick brown fox jumps over the lazy dog'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///db.sqlite'
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
# extensions
db = SQLAlchemy(app)
auth = HTTPBasicAuth()
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(32), index=True)
password_hash = db.Column(db.String(128))
def hash_password(self, password):
self.password_hash = generate_password_hash(password)
def verify_password(self, password):
return check_password_hash(self.password_hash, password)
def generate_auth_token(self, expires_in=600):
return jwt.encode(
{'id': self.id, 'exp': time.time() + expires_in},
app.config['SECRET_KEY'], algorithm='HS256')
@staticmethod
def verify_auth_token(token):
try:
data = jwt.decode(token, app.config['SECRET_KEY'],
algorithms=['HS256'])
except:
return
return User.query.get(data['id'])
@auth.verify_password
def verify_password(username_or_token, password):
# first try to authenticate by token
user = User.verify_auth_token(username_or_token)
if not user:
# try to authenticate with username/password
user = User.query.filter_by(username=username_or_token).first()
if not user or not user.verify_password(password):
return False
g.user = user
return True
@app.route('/api/users', methods=['POST'])
def new_user():
username = request.json.get('username')
password = request.json.get('password')
if username is None or password is None:
abort(400) # missing arguments
if User.query.filter_by(username=username).first() is not None:
abort(400) # existing user
user = User(username=username)
user.hash_password(password)
db.session.add(user)
db.session.commit()
return (jsonify({'username': user.username}), 201,
{'Location': url_for('get_user', id=user.id, _external=True)})
@app.route('/api/users/<int:id>')
def get_user(id):
user = User.query.get(id)
if not user:
abort(400)
return jsonify({'username': user.username})
@app.route('/api/token')
@auth.login_required
def get_auth_token():
token = g.user.generate_auth_token(600)
return jsonify({'token': token, 'duration': 600})
@app.route('/api/resource')
@auth.login_required
def get_resource():
return jsonify({'data': 'Hello, %s!' % g.user.username})
@app.route('/api/hello')
def sayhello():
return "hello,jcstone"
if __name__ == '__main__':
if not os.path.exists('db.sqlite'):
db.create_all()
app.run(debug=True)
客户端:
import requests, base64
import json
urltoken = "http://127.0.0.1:5000/api/token"
payload = {}
name = "jcstone"
pwd = "123456"
# auth = str(base64.b64encode(f'{name}:{pwd}'.encode('utf-8')), 'utf-8')
# headers = {
# 'Content-Type': 'application/json',
# 'Authorization': 'Basic %s' % auth
# }
# response = requests.request("GET", urltoken, headers=headers, data=payload)
# 获取token
headers = {'Content-Type': 'application/json'}
response = requests.get(urltoken, auth=(name, pwd),headers=headers, data=payload)
reptokens = json.loads(response.text.encode('utf8'))
print(reptokens["token"])
# 利用token验证访问restful api
urlapi = 'http://127.0.0.1:5000/api/resource'
token = reptokens["token"]
headers = {
# 将User-Agent改成 Chrome 浏览器
'User-Agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36',
}
response = requests.get(urlapi, auth=(token, ''), headers=headers)
print(response.text.encode('utf8'))