亲宝软件园·资讯

展开

SpringCloud+Tornado实现请求安全校验 SpringCloud+Tornado基于jwt实现请求安全校验功能

95.8℃ 人气:0

项目背景

在实际项目中,Tornado项目作为一个微服务纳入SpringCloud体系,该过程中涉及到TornadoSpring体系的安全验证,也就是权限调用校验,在该项目中Tornado是通过SpringCloud中的Feign调用的,经过一系列实验,最后选用jwt来实现这个权限效验的过程。

实现思路

用户进行登陆认证(后台微服务),认证成功后调用Tornado项目的认证接口生成token,该值返回到后台微服务保存在会话中,下一次请求时带上该token值让服务器进行校验,校验成功则返回正常的响应,否则返回错误信息。

项目结构

在这里插入图片描述

common - authServer.py是认证接口
common - basicServer.py是示例接口
handlers - baseHandler.py中放了两种校验方式,会在basicServer.py的调用示例中贴出
utils - jwtUtils.py是生成与校验token的
utils - responseUtils.py是返回结果工具类

具体实现

jwtUtils.py

# -*- coding:utf-8 -*-
import jwt
import datetime
from jwt import exceptions
from utils.responseUtils import JsonUtils

JWT_SALT = '1qazxdr5'


def create_token(payload, timeout=12):
 """
 创建token
 :param payload: 例如:{'user_id':1,'username':'xxx@xxx.xx'}用户信息
 :param timeout: token的过期时间,默认20分钟
 :return:
 """
 headers = {
  'typ': 'jwt',
  'alg': 'HS256'
 }
 payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(minutes=timeout)
 result = jwt.encode(payload=payload, key=JWT_SALT, algorithm="HS256", headers=headers).decode('utf-8')
 return result


def parse_payload(token):
 """
 对token进行校验并获取payload
 :param token:
 :return:
 """
 try:
  verified_payload = jwt.decode(token, JWT_SALT, True)
  print(verified_payload)
  return JsonUtils.success('认证通过')
 except exceptions.ExpiredSignatureError:
  return JsonUtils.noAuth('token已失效')
 except jwt.DecodeError:
  return JsonUtils.noAuth('token认证失败')
 except jwt.InvalidTokenError:
  return JsonUtils.noAuth('非法的token')

baseHandler.py

# -*- coding:utf-8 -*-
import functools
import json
import tornado.web
from utils import jwtUtils
from utils.responseUtils import JsonUtils


# 方式一:authenticated 装饰器
def authenticated(method):
 @functools.wraps(method)
 def wrapper(self, *args, **kwargs):
  """
  这里调用的是 current_user 的 get 方法(property装饰),
  """
  # 通过token请求头传递token
  head = self.request.headers
  token = head.get("token", "")
  if not token:
   self.write(JsonUtils.noAuth("未获取到token请求头"))
   self.set_header('Content-Type', 'application/json')
   return

  result = json.loads(jwtUtils.parse_payload(token)) # 将json解码
  print(result)
  token_msg = json.dumps(result)

  if result['sta'] != '00':
   self.write(token_msg)
   self.set_header('Content-Type', 'application/json')
   return

  return method(self, *args, **kwargs)
 return wrapper


# 方式二:进行预设 继承tornado的RequestHandler
class BaseHandler(tornado.web.RequestHandler):
 def prepare(self):
  super(BaseHandler, self).prepare()

 def set_default_headers(self):
  super().set_default_headers()


# 进行token校验,继承上面的BaseHandler
class TokenHandler(BaseHandler):
 def prepare(self):
  # 通过token请求头传递token
  head = self.request.headers
  token = head.get("token","")
  if not token:
   self.authMsg = json.dumps(JsonUtils.noAuth("未获取到token请求头"))

  result = json.loads(jwtUtils.parse_payload(token)) # 将json解码
  print(result)

  if result['sta'] != '00':
   self.isAuth = False
  else:
   self.isAuth = True

  self.authMsg = json.dumps(result)

authServer.py

import tornado.web
from utils import jwtUtils
from utils.responseUtils import JsonUtils


class authHandler(tornado.web.RequestHandler):
 def post(self, *args, **kwargs):
  """
  安全认证接口
  :param args:
  :param kwargs:
  :return:
  """
  username = self.get_argument("username")
  print("authHandler:" + username)

  if not username:
   self.write(JsonUtils.error("参数异常"))
  else:
   token = jwtUtils.create_token({"username": username})
   print("token:" + token)
   self.write(JsonUtils.success(token))

  self.set_header('Content-Type', 'application/json')

basicServer.py

import tornado.web
import json
from pandas.core.frame import DataFrame
from handlers import baseHandler
from utils.responseUtils import JsonUtils
from handlers.baseHandler import authenticated


class StringHandler(baseHandler.TokenHandler, tornado.web.RequestHandler):
 """
 *** TokenHandler验证,对应baseHandler.py中的方式二 ***
 """
 def get(self):
  username = self.get_argument('username', 'Hello')

  # 权限认证通过
  if self.isAuth:
   self.write(JsonUtils.success(username))
  else:
   self.write(self.authMsg))

  self.set_header('Content-Type', 'application/json')


class TestHandler(tornado.web.RequestHandler):
 """
 *** authenticated验证,对应baseHandler.py中的方式一 ***
 """
 @authenticated
 def post(self):
  username = self.get_argument('username', 'Hello')
  self.write(JsonUtils.success(username))
  self.set_header('Content-Type', 'application/json')

responseUtils.py

from tornado.escape import json_encode, utf8


class JsonUtils(object):
 @staticmethod
 def success(response):
  """
  正确返回
  :param response: 返回结果
  :return: string, {"message": "ok", "sta": "00", "data": }
  """
  return json_encode({"message": "ok", "sta": "00", "data": response})

 @staticmethod
 def info(message):
  """
  提示返回
  :param message: 提示信息
  :return: string,
  """
  return json_encode({"message": str(message), "sta": "99001", "data": None})

 @staticmethod
 def error(message):
  """
  错误返回
  :param message: 错误信息
  :return: string,
  """
  return json_encode({"message": str(message), "sta": "9999", "data": None})

 @staticmethod
 def noAuth(message):
  """
  无权限返回
  :param message: 错误信息
  :return: string,
  """
  return json_encode({"message": str(message), "sta": "403", "data": None})

下面是一些调用的结果图示:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

.end

加载全部内容

相关教程
猜你喜欢
用户评论