光润真空完成首轮天使轮融资,无锡天使基金独家完成
06-18
/ python生产实践60秒系统安全认证实践/上一节主要讲解了目前主流的认证规范/协议并对JWT进行了深入的研究和分析,最后给出了解决方案在生产环境中如何生成有效的Token,以及如何基于Python语言在生产环境中进行有效的安全认证?在上一节中,我们还对基于JWT Token认证流程的登录认证和请求认证进行了理论分析,并以图形方式呈现了数据的流程。本节我们将从代码层面再次带大家过一遍这个过程,一方面加深大家对上一节理论部分的理解,另一方面也为大家提供了一套“模板”以便在工程过程中快速应用到项目中。
1 准备工具 1.1 密码安全 为了数据安全,我们使用PassLib对数据库中存储的用户密码进行加密,推荐的加密算法为“Bcrypt”。我们需要安装依赖包: 代码语言:javascript copy pip install passlibpip install bcrypt 我们简单介绍一下这两个库: 1. passlib 是 python2&3 的密码哈希库。
它提供了 30 多种密码哈希算法的跨平台实现。并作为管理现有密码哈希的框架。
它被设计用于广泛的任务,从验证 /etc/shadow 中找到的哈希值到为多用户应用程序提供全强度密码哈希值。 2. bcrypt模块是Python中用于生成强哈希的库。
注:如果您对以上两个库的详细信息感兴趣,可以去Python官网查看相关信息。本节的核心是实现整个流程。
这个具体的库内容就不详细介绍了。 2 用户登录流程 用户通过终端将用户名和密码发送到后端。
后端收到数据后,进行以下操作: 1、用户信息验证:检查当前系统中是否存在该用户,密码是否正确。 2. 如果用户存在,则生成JWT token并返回。
JWT 有效负载可以携带自定义内容。数据代码语言: javascript copy# -*- 编码: utf-8 -*-from datetime import datetime, timedeltafrom Typing importOptionalfrom fastapi import Depends, FastAPI, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormimport jwtfrom pydantic import BaseModelfrom passlib.context import CryptContext# 得到如下字符串:# openssl rand -hex 32SECRET_KEY = "09d25efaa6cac66b7ab93ff6f0f4caa6cf63b88e8d3e7"ALGORITHM = "HS" ACCESS_TOKEN_EXPIRE_MINUTES = 25# 模拟数据库数据 fake_users_db = { "hiashiniu" : { "用户名": "hiashiniu", " full_name": "HaiShiNiu", "email": "", "hashed_pa??ssword": "$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC", "disabled": False, }}class Token(BaseModel): access_token: str token_type: strclass User(BaseModel): 用户名: str 电子邮件: 可选[str] = None full_name: 可选[str] = None 禁用: 可选[bool] = Noneclass UserInDB( User): hashed_pa??ssword: strpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")app = FastAPI()# 验证密码 def verify_password(plain_password, hashed_pa??ssword) : return pwd_context.verify(plain_password, hashed_pa??ssword)# 密码哈希 def get_password_hash(password): # pwd_context.hash('9') # '$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC' return pwd_context.hash(password) #模拟从数据库读取用户信息 def get_user(db, username: str): user_dict = db.get(username,None) if user_dict: return UserInDB(**user_dict)# 用户信息验证:用户名和密码分别验证 def verify_user( fake_db,用户名:str,密码:str):user = get_user(fake_db, username) print(user) if not user: return False if not verify_password(password, user.hashed_pa??ssword): return False return user# 生成带过期时间的 token def create_access_token(data: dict, expires_delta:Optional[ timedelta] = None): to_encode = data.copy() # 默认有效时间为25分钟 if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(分钟 =15 )to_encode.update({“exp”:过期})encoded_jwt = jwt.encode(to_encode,SECRET_KEY,算法=算法)返回(“/ token”,response_model = Token)async def login_for_access_token(form_data:OAuth2PasswordRequestForm = Depends()): # 首先验证用户信息 print(form_data.password) print(form_data.username) user =authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP__UNAUTHORIZED, detail="用户名或密码不正确", headers={"WWW-Authenticate": "Bearer"}, ) # 生成并返回token信息 access_token_expires = timedelta(分钟=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data= {"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}3 请求数据流端获取token信息后,必须包含在Authorization header中后续请求的信息只有持有 Bearer token 才可以被允许访问。
我们添加验证函数来验证请求的合法性,读取token内容,解析并验证。代码语言:javascript copy# -*- 编码:utf-8 -*-async def get_current_user(token: str = Depends(oauth2_scheme)):credentials_exception = HTTPException( status_code=status.HTTP__UNAUTHORIZED,detail="无法验证凭据", headers={"WWW-Authenticate": "Bearer"}, ) 尝试:payload = jwt.decode(token, SECRET_KEY,algorithms=[ALGORITHM]) username: str = Payload.get("sub") 如果用户名为 None:引发凭证异常,除了 jwt.PyJWTError 作为 ex: print(ex) 引发凭证异常 user = get_user(fake_users_db, username=username) 如果用户为 None:引发凭证异常 return ("/users/me/", response_model=User )async def read_users_me(current_user: User = Depends(get_current_user)): return current_user4 效果展示本节我们整合上面的代码来展示效果。
代码语言:javascript copy# -*- 编码:utf-8 -*-from datetime import datetime, timedeltafrom getting a string like this run:# openssl rand -hex 32SECRET_KEY = "09d25efaa6cac66b7ab93ff6f0f4caa6 cf63b88e8d3e7 "ALGORITHM = "HS"ACCESS_TOKEN_EXPIRE_MINUTES = 25# 模拟数据库数据fake_users_db = { "hiashiniu": { "username": "hiashiniu", "full_name": "HaiShiNiu", "email": "", " hashed_pa??ssword": "$2b$12$uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC", "已禁用": False, }}类令牌(BaseModel): access_token: str token_type: strclass 用户(BaseModel): 用户名: str 电子邮件: 可选[str ] = 无 full_na我:可选[str] = None 禁用:可选[bool] = Noneclass UserInDB(用户):hashed_pa??ssword:strpwd_context = CryptContext(schemes = [“bcrypt”],不推荐=“auto”)oauth2_scheme = OAuth2PasswordBearer(tokenUrl =“/ token” ")app = FastAPI()# 验证密码 def verify_password(plain_password, hashed_pa??ssword): return pwd_context.verify(plain_password, hashed_pa??ssword)# 密码哈希 def get_password_hash(password): # pwd_context.hash('9') # '$2 b$12 $uG4n5CK4.kCid/MfQ3ns9.uRslWuWr9EL5FR4HYNg6SNo6wEn8OPC' return pwd_context.hash(password)# 模拟从数据库读取用户信息 def get_user(db, username: str): user_dict = db.get(username,None) if user_dict: return UserInDB( **user_dict)# 用户信息验证:分别验证用户名和密码 defauthenticate_user(fake_db, username:str,password:str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user .hashed_pa??ssword): 返回 Falsereturn user# 生成带过期时间的 token def create_access_token(data: dict,expires_delta:Optional[timedelta] = None): to_encode = data.copy() # 有效期默认时间为 25 分钟 if expires_delta:expire = datetime .utcnow () +expires_delta else:expire = datetime.utcnow() + timedelta(分钟=15) to_encode.update({"exp":expire})encoded_jwt = jwt.encode(to_encode, SECRET_KEY,algorithm=ALGORITHM) returnencoded_jwt @app .post("/token", response_model=Token)async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): # 首先验证用户信息 print(form_data.password) print(form_data.username) user =authenticate_user(fake_users_db , form_data .username, form_data.password) 如果不是用户:引发 HTTPException( status_code=status.HTTP__UNAUTHORIZED, detail="用户名或密码不正确", headers={"WWW-Authenticate": "Bearer"},) # 生成并返回token信息 access_token_expires = timedelta(分钟=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer" }async def get_current_user(token: str = Depends(oauth2_scheme)):credentials_exception = HTTPException(status_code=status.HTTP__UNAUTHORIZED,detail="无法验证凭据", headers={"WWW-Authenticate": "Bearer"},) 尝试:payload = jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])用户名:str = Payload.get(“sub”)如果用户名为None:引发凭证_异常,除了jwt.PyJWTError作为ex:print(ex)引发凭证_异常用户= get_user(fake_users_db, username=username) 如果用户为 None: raise凭证异常返回(“/ users / me /”,response_model = User)async def read_users_me(current_user:User = Depends(get_current_user)):返回current_user1。使用命令启动项目: 代码语言: javascript copy uvicorn main ;对原创性有新的认识并不容易。
只希望对那些需要这些内容的同事或者刚入行的人有所帮助。你们的每一次点赞和分享都是我继续创作的动力。
希望能够在路上推广Python技术。我会尽我所能,尽我所能。
欢迎大家在评论区向我提问。我将一一解答。
版权声明:本文内容由互联网用户自发贡献,本站不拥有所有权,不承担相关法律责任。如果发现本站有涉嫌抄袭的内容,欢迎发送邮件 举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。
标签:
相关文章
06-18
06-17
06-17
06-08
06-18
06-18
06-18
最新文章
【玩转GPU】ControlNet初学者生存指南
【实战】获取小程序中用户的城市信息(附源码)
包雪雪简单介绍Vue.js:开学
Go进阶:使用Gin框架简单实现服务端渲染
线程池介绍及实际案例分享
JMeter 注释 18 - JMeter 常用配置组件介绍
基于Sentry的大数据权限解决方案
【云+社区年度征文集】GPE监控介绍及使用