SpringCloud+Tornado基于jwt实现请求安全校验功能
项目背景
在实际项目中,Tornado项目作为一个微服务纳入SpringCloud体系,该过程中涉及到Tornado与Spring体系的安全验证,也就是权限调用校验,在该项目中Tornado是通过SpringCloud中的Feign调用的,经过一系列实验,最后选用jwt来实现这个权限效验的过程。
实现思路
用户进行登陆认证(后台微服务),认证成功后调用Tornado项目的认证接口生成token,该值返回到后台微服务保存在会话中,下一次请求时带上该token值让服务器进行校验,校验成功则返回正常的响应,否则返回错误信息。
项目结构
common - authServer.py是认证接口
common - basicServer.py是示例接口
handlers - baseHandler.py中放了两种校验方式,会在basicServer.py的调用示例中贴出
utils - jwtUtils.py是生成与校验token的
utils - responseUtils.py是返回结果工具类
具体实现
jwtUtils.py
# -*- coding:utf-8 -*-
import jwt
import datetime
from jwt import exceptions
from utils.responseUtils import JsonUtils
JWT_SALT = '1qazxdr5'
def create_token(payload, timeout=12):
"""
创建token
:param payload: 例如:{'user_id':1,'username':'xxx@xxx.xx'}用户信息
:param timeout: token的过期时间,默认20分钟
:return:
"""
headers = {
'typ': 'jwt',
'alg': 'HS256'
}
payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(minutes=timeout)
result = jwt.encode(payload=payload, key=JWT_SALT, algorithm="HS256", headers=headers).decode('utf-8')
return result
def parse_payload(token):
"""
对token进行校验并获取payload
:param token:
:return:
"""
try:
verified_payload = jwt.decode(token, JWT_SALT, True)
print(verified_payload)
return JsonUtils.success('认证通过')
except exceptions.ExpiredSignatureError:
return JsonUtils.noAuth('token已失效')
except jwt.DecodeError:
return JsonUtils.noAuth('token认证失败')
except jwt.InvalidTokenError:
return JsonUtils.noAuth('非法的token')
baseHandler.py
# -*- coding:utf-8 -*-
import functools
import json
import tornado.web
from utils import jwtUtils
from utils.responseUtils import JsonUtils
# 方式一:authenticated 装饰器
def authenticated(method):
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
"""
这里调用的是 current_user 的 get 方法(property装饰),
"""
# 通过token请求头传递token
head = self.request.headers
token = head.get("token", "")
if not token:
self.write(JsonUtils.noAuth("未获取到token请求头"))
self.set_header('Content-Type', 'application/json')
return
result = json.loads(jwtUtils.parse_payload(token)) # 将json解码
print(result)
token_msg = json.dumps(result)
if result['sta'] != '00':
self.write(token_msg)
self.set_header('Content-Type', 'application/json')
return
return method(self, *args, **kwargs)
return wrapper
# 方式二:进行预设 继承tornado的RequestHandler
class BaseHandler(tornado.web.RequestHandler):
def prepare(self):
super(BaseHandler, self).prepare()
def set_default_headers(self):
super().set_default_headers()
# 进行token校验,继承上面的BaseHandler
class TokenHandler(BaseHandler):
def prepare(self):
# 通过token请求头传递token
head = self.request.headers
token = head.get("token","")
if not token:
self.authMsg = json.dumps(JsonUtils.noAuth("未获取到token请求头"))
result = json.loads(jwtUtils.parse_payload(token)) # 将json解码
print(result)
if result['sta'] != '00':
self.isAuth = False
else:
self.isAuth = True
self.authMsg = json.dumps(result)
authServer.py
import tornado.web
from utils import jwtUtils
from utils.responseUtils import JsonUtils
class authHandler(tornado.web.RequestHandler):
def post(self, *args, **kwargs):
"""
安全认证接口
:param args:
:param kwargs:
:return:
"""
username = self.get_argument("username")
print("authHandler:" + username)
if not username:
self.write(JsonUtils.error("参数异常"))
else:
token = jwtUtils.create_token({"username": username})
print("token:" + token)
self.write(JsonUtils.success(token))
self.set_header('Content-Type', 'application/json')
basicServer.py
import tornado.web
import json
from pandas.core.frame import DataFrame
from handlers import baseHandler
from utils.responseUtils import JsonUtils
from handlers.baseHandler import authenticated
class StringHandler(baseHandler.TokenHandler, tornado.web.RequestHandler):
"""
*** TokenHandler验证,对应baseHandler.py中的方式二 ***
"""
def get(self):
username = self.get_argument('username', 'Hello')
# 权限认证通过
if self.isAuth:
self.write(JsonUtils.success(username))
else:
self.write(self.authMsg))
self.set_header('Content-Type', 'application/json')
class TestHandler(tornado.web.RequestHandler):
"""
*** authenticated验证,对应baseHandler.py中的方式一 ***
"""
@authenticated
def post(self):
username = self.get_argument('username', 'Hello')
self.write(JsonUtils.success(username))
self.set_header('Content-Type', 'application/json')
responseUtils.py
from tornado.escape import json_encode, utf8
class JsonUtils(object):
@staticmethod
def success(response):
"""
正确返回
:param response: 返回结果
:return: string, {"message": "ok", "sta": "00", "data": }
"""
return json_encode({"message": "ok", "sta": "00", "data": response})
@staticmethod
def info(message):
"""
提示返回
:param message: 提示信息
:return: string,
"""
return json_encode({"message": str(message), "sta": "99001", "data": None})
@staticmethod
def error(message):
"""
错误返回
:param message: 错误信息
:return: string,
"""
return json_encode({"message": str(message), "sta": "9999", "data": None})
@staticmethod
def noAuth(message):
"""
无权限返回
:param message: 错误信息
:return: string,
"""
return json_encode({"message": str(message), "sta": "403", "data": None})
下面是一些调用的结果图示:
.end
到此这篇关于SpringCloud+Tornado基于jwt实现请求安全校验的文章就介绍到这了,更多相关SpringCloud+Tornado实现请求安全校验内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
以上是 SpringCloud+Tornado基于jwt实现请求安全校验功能 的全部内容, 来源链接: utcz.com/z/317631.html