SpringCloud+Tornado基于jwt实现请求安全校验功能

项目背景

在实际项目中,Tornado项目作为一个微服务纳入SpringCloud体系,该过程中涉及到Tornado与Spring体系的安全验证,也就是权限调用校验,在该项目中Tornado是通过SpringCloud中的Feign调用的,经过一系列实验,最后选用jwt来实现这个权限效验的过程。

实现思路

用户进行登陆认证(后台微服务),认证成功后调用Tornado项目的认证接口生成token,该值返回到后台微服务保存在会话中,下一次请求时带上该token值让服务器进行校验,校验成功则返回正常的响应,否则返回错误信息。

项目结构

common - authServer.py是认证接口

common - basicServer.py是示例接口

handlers - baseHandler.py中放了两种校验方式,会在basicServer.py的调用示例中贴出

utils - jwtUtils.py是生成与校验token的

utils - responseUtils.py是返回结果工具类

具体实现

jwtUtils.py

# -*- coding:utf-8 -*-

import jwt

import datetime

from jwt import exceptions

from utils.responseUtils import JsonUtils

JWT_SALT = '1qazxdr5'

def create_token(payload, timeout=12):

"""

创建token

:param payload: 例如:{'user_id':1,'username':'xxx@xxx.xx'}用户信息

:param timeout: token的过期时间,默认20分钟

:return:

"""

headers = {

'typ': 'jwt',

'alg': 'HS256'

}

payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(minutes=timeout)

result = jwt.encode(payload=payload, key=JWT_SALT, algorithm="HS256", headers=headers).decode('utf-8')

return result

def parse_payload(token):

"""

对token进行校验并获取payload

:param token:

:return:

"""

try:

verified_payload = jwt.decode(token, JWT_SALT, True)

print(verified_payload)

return JsonUtils.success('认证通过')

except exceptions.ExpiredSignatureError:

return JsonUtils.noAuth('token已失效')

except jwt.DecodeError:

return JsonUtils.noAuth('token认证失败')

except jwt.InvalidTokenError:

return JsonUtils.noAuth('非法的token')

baseHandler.py

# -*- coding:utf-8 -*-

import functools

import json

import tornado.web

from utils import jwtUtils

from utils.responseUtils import JsonUtils

# 方式一:authenticated 装饰器

def authenticated(method):

@functools.wraps(method)

def wrapper(self, *args, **kwargs):

"""

这里调用的是 current_user 的 get 方法(property装饰),

"""

# 通过token请求头传递token

head = self.request.headers

token = head.get("token", "")

if not token:

self.write(JsonUtils.noAuth("未获取到token请求头"))

self.set_header('Content-Type', 'application/json')

return

result = json.loads(jwtUtils.parse_payload(token)) # 将json解码

print(result)

token_msg = json.dumps(result)

if result['sta'] != '00':

self.write(token_msg)

self.set_header('Content-Type', 'application/json')

return

return method(self, *args, **kwargs)

return wrapper

# 方式二:进行预设 继承tornado的RequestHandler

class BaseHandler(tornado.web.RequestHandler):

def prepare(self):

super(BaseHandler, self).prepare()

def set_default_headers(self):

super().set_default_headers()

# 进行token校验,继承上面的BaseHandler

class TokenHandler(BaseHandler):

def prepare(self):

# 通过token请求头传递token

head = self.request.headers

token = head.get("token","")

if not token:

self.authMsg = json.dumps(JsonUtils.noAuth("未获取到token请求头"))

result = json.loads(jwtUtils.parse_payload(token)) # 将json解码

print(result)

if result['sta'] != '00':

self.isAuth = False

else:

self.isAuth = True

self.authMsg = json.dumps(result)

authServer.py

import tornado.web

from utils import jwtUtils

from utils.responseUtils import JsonUtils

class authHandler(tornado.web.RequestHandler):

def post(self, *args, **kwargs):

"""

安全认证接口

:param args:

:param kwargs:

:return:

"""

username = self.get_argument("username")

print("authHandler:" + username)

if not username:

self.write(JsonUtils.error("参数异常"))

else:

token = jwtUtils.create_token({"username": username})

print("token:" + token)

self.write(JsonUtils.success(token))

self.set_header('Content-Type', 'application/json')

basicServer.py

import tornado.web

import json

from pandas.core.frame import DataFrame

from handlers import baseHandler

from utils.responseUtils import JsonUtils

from handlers.baseHandler import authenticated

class StringHandler(baseHandler.TokenHandler, tornado.web.RequestHandler):

"""

*** TokenHandler验证,对应baseHandler.py中的方式二 ***

"""

def get(self):

username = self.get_argument('username', 'Hello')

# 权限认证通过

if self.isAuth:

self.write(JsonUtils.success(username))

else:

self.write(self.authMsg))

self.set_header('Content-Type', 'application/json')

class TestHandler(tornado.web.RequestHandler):

"""

*** authenticated验证,对应baseHandler.py中的方式一 ***

"""

@authenticated

def post(self):

username = self.get_argument('username', 'Hello')

self.write(JsonUtils.success(username))

self.set_header('Content-Type', 'application/json')

responseUtils.py

from tornado.escape import json_encode, utf8

class JsonUtils(object):

@staticmethod

def success(response):

"""

正确返回

:param response: 返回结果

:return: string, {"message": "ok", "sta": "00", "data": }

"""

return json_encode({"message": "ok", "sta": "00", "data": response})

@staticmethod

def info(message):

"""

提示返回

:param message: 提示信息

:return: string,

"""

return json_encode({"message": str(message), "sta": "99001", "data": None})

@staticmethod

def error(message):

"""

错误返回

:param message: 错误信息

:return: string,

"""

return json_encode({"message": str(message), "sta": "9999", "data": None})

@staticmethod

def noAuth(message):

"""

无权限返回

:param message: 错误信息

:return: string,

"""

return json_encode({"message": str(message), "sta": "403", "data": None})

下面是一些调用的结果图示:



.end

到此这篇关于SpringCloud+Tornado基于jwt实现请求安全校验的文章就介绍到这了,更多相关SpringCloud+Tornado实现请求安全校验内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

以上是 SpringCloud+Tornado基于jwt实现请求安全校验功能 的全部内容, 来源链接: utcz.com/z/317631.html

回到顶部