以前、 FastAPIでユーザ認証機能を実装する方法 としてユーザ認証機能の実装を紹介しましたが、JWTを発行する機会があったのでそちらを紹介します。
実装はこれをベースに追記していく形で行います。
JWTとは
JWTとは JSON Web Token
の略で、トークン内に情報を保持し、それを署名、暗号化する事ができる技術です。
署名を検証する事でユーザの認証が可能であり、内容に認可の情報を含める事で認証から認可まで行う事が可能になります。
JWTの発行
ユーザ認証が完了した際、サーバ側に保存されている秘密鍵を使用し、トークンに署名を行います。
トークン内には認証後に必要な情報を含めておくことで、ユーザのトークン情報をデータベースで管理しなくても、認証、認可を行う事が可能になります。
秘密鍵/公開鍵の作成
JWTの署名に使用する秘密鍵を作成します。
JWKを作成するために同時に公開鍵も作成しておきます。
openssl genrsa 2048 > private_key.pem
openssl rsa -in private_key.pem -pubout -out public_key.pem
JWKの作成
JWKとは
JWKとは JSON Web Key
の略で、暗号鍵をJSONを使用して表現するための方法です。
JWKは暗号化の方法や、公開鍵などがJSON形式で保存されます。
JWTを使用して認証するサービスは、認証プロバイダが公開しているJWKを使用し、署名の検証を行うことで正しく認証プロバイダが発行したトークンかどうかを判断する事ができます。
PEMからJWKを作成
作成した公開鍵(PEM形式)をJWKに変換します。
今回は pem-jwk
を使用して作成します。
npm install -g pem-jwk
pem-jwk public_key.pem
コマンドで生成された値を、以下の内容に組み込みます。
{
"keys": [
{
"alg": "RS256",
"kid": "{ランダムに生成された文字列(鍵のID)}",
"n": "{pem-jwkで作成された値}",
"use": "sig",
"e": "{pem-jwkで作成された値}",
"kty": "{pem-jwkで作成された値}"
}
]
}
設定ファイルとして実装
作成したこれらの内容を、Pythonから呼び出せるように組み込んでいきます。
- app/certs/private_key.pem
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAvT+gy8DXpqERHPISXXj9ZjleSeCnabcZ96LP4k9LiMyd3JSR
LUPGNJ8/NckD+e/x/qK9LJfrTVAzXDV1HmIbq4zcb3lqWl+ZgroqeyhzlHnUuIKc
...
vYc6kPGEJq4rRcD9rINAfoRO6TFX98e4H0M/0CJsXwQSlMMqYaPSe0rXfoDIFgHW
ie2XZJV/zZNOmhOD/DtDxwYl/I5kL2hiEEtTETEmjerX0vq742eFyg==
-----END RSA PRIVATE KEY-----
- app/settings.py
import datetime
PRIVATE_PEM = open("app/certs/private_key.pem", "r").read()
TOKEN_EXPIRE = datetime.datetime.utcnow() + datetime.timedelta(minutes=60)
JWKS = {
"keys": [
{
"alg": "RS256",
"kid": "{ランダムに生成された文字列(鍵のID)}",
"n": "{pem-jwkで作成された値}",
"use": "sig",
"e": "{pem-jwkで作成された値}",
"kty": "{pem-jwkで作成された値}"
}
]
}
ユーザの認証
ユーザの認証を行った際、JWTを発行し返却します。
今回は PyJWT
を使用してJWTの発行、検証を行います。
※ コードは抜粋、詳細はGitHubから
- app/lib/authenticate.py
from app.settings import JWKS, PRIVATE_PEM, TOKEN_EXPIRE
import jwt
async def authenticate_user(
db: Session = Depends(get_db),
credentials: HTTPBasicCredentials = Depends(security)):
user = crud.authenticate_user(db,
credentials.username,
credentials.password)
if user:
payload = {
"exp": TOKEN_EXPIRE,
"email": user.email,
"user": user.name
}
token = jwt.encode(payload,
PRIVATE_PEM,
algorithm=JWKS["keys"][0]["alg"],
headers={"kid": JWKS["keys"][0]["kid"]})
return {"access_token": token}
else:
raise HTTPException(
status_code=401,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Basic"})
payloadには認証後に使用する情報を含めておきます。
今回はメールアドレスとユーザ名を含めています。
※ 値は任意に指定できますが、予約されたものが存在します (expなど)、詳細は予約済みクレーム名 を参照ください。
- app/user/view.py
from app.lib.authenticate import authenticate_user
from fastapi import APIRouter, Depends
router = APIRouter()
@router.post("/user/_authenticate", tags=["user"])
async def authenticate(token: str = Depends(authenticate_user)):
return token
JWTによる認証
JWTの署名を公開鍵を使用して検証する事で、その鍵が正しく認証プロバイダから発行されたかどうかを検証します。
今回は発行したJWTを外部サービスから利用する事も想定し、JWKを使用して認証を行います。
- app/lib/authorize.py
import json
from app.settings import JWKS
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import APIKeyHeader
import jwt
router = APIRouter()
api_key = APIKeyHeader(name="Authorization", auto_error=False)
async def authorize_user(
authorization: str = Depends(api_key)):
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate token",
headers={"WWW-Authenticate": authorization},
)
if authorization:
auth = authorization.split(" ")
if len(auth) != 2:
raise credentials_exception
if auth[0] != "Bearer":
raise credentials_exception
id_token = auth[1]
public_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(JWKS["keys"][0]))
try:
payload = jwt.decode(
id_token,
public_key,
algorituhms=JWKS["keys"][0]["alg"])
except Exception:
raise credentials_exception
return {"user": payload["user"], "email": payload["email"]}
- app/user/view.py
from app.lib.authenticate import authenticate_user
from app.lib.authorize import authorize_user
from fastapi import APIRouter, Depends
router = APIRouter()
@router.post("/user/_authenticate", tags=["user"])
async def authenticate(token: str = Depends(authenticate_user)):
return token
@router.get("/user", tags=["user"])
async def authorize(user: bool = Depends(authorize_user)):
return user
あとがき
JWTを使用した認証方法を記述してきました。
今回はシンプルにJWTの発行、検証だけでしたが、OAuth 2.0 フローに従えば OAuth 2.0の認証プロバイダを作成する事もできます。
サービスの規模や必要に応じて、JWTだけを使用したシンプルな仕組みにするか、OAuth 2.0 のような仕組みを活用するか、判断していけば良いと思います。
今回のコードはGitHubに動くコードして上がっているので、参考にしてみてください。