FastAPIでJWTによる認証機能を実装する方法

スポンサーリンク

以前、 FastAPIでユーザ認証機能を実装する方法 としてユーザ認証機能の実装を紹介しましたが、JWTを発行する機会があったのでそちらを紹介します。
実装はこれをベースに追記していく形で行います。

JWTとは

JWTとは JSON Web Token の略で、トークン内に情報を保持し、それを署名、暗号化する事ができる技術です。
署名を検証する事でユーザの認証が可能であり、内容に認可の情報を含める事で認証から認可まで行う事が可能になります。

RFC7519

JWTの発行

ユーザ認証が完了した際、サーバ側に保存されている秘密鍵を使用し、トークンに署名を行います。
トークン内には認証後に必要な情報を含めておくことで、ユーザのトークン情報をデータベースで管理しなくても、認証、認可を行う事が可能になります。

秘密鍵/公開鍵の作成

JWTの署名に使用する秘密鍵を作成します。
JWKを作成するために同時に公開鍵も作成しておきます。

openssl genrsa 2048 > private_key.pem
openssl rsa -in private_key.pem -pubout -out public_key.pem

JWKの作成

JWKとは

JWKとは JSON Web Key の略で、暗号鍵をJSONを使用して表現するための方法です。
JWKは暗号化の方法や、公開鍵などがJSON形式で保存されます。
JWTを使用して認証するサービスは、認証プロバイダが公開しているJWKを使用し、署名の検証を行うことで正しく認証プロバイダが発行したトークンかどうかを判断する事ができます。

RFC7517

PEMからJWKを作成

作成した公開鍵(PEM形式)をJWKに変換します。
今回は pem-jwk を使用して作成します。

npm install -g pem-jwk
pem-jwk public_key.pem

コマンドで生成された値を、以下の内容に組み込みます。

{
    "keys": [
        {
            "alg": "RS256",
            "kid": "{ランダムに生成された文字列(鍵のID)}",
            "n": "{pem-jwkで作成された値}", 
            "use": "sig",
            "e": "{pem-jwkで作成された値}",
            "kty": "{pem-jwkで作成された値}"
        }
    ]
}

設定ファイルとして実装

作成したこれらの内容を、Pythonから呼び出せるように組み込んでいきます。

  • app/certs/private_key.pem
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAvT+gy8DXpqERHPISXXj9ZjleSeCnabcZ96LP4k9LiMyd3JSR
LUPGNJ8/NckD+e/x/qK9LJfrTVAzXDV1HmIbq4zcb3lqWl+ZgroqeyhzlHnUuIKc
...
vYc6kPGEJq4rRcD9rINAfoRO6TFX98e4H0M/0CJsXwQSlMMqYaPSe0rXfoDIFgHW
ie2XZJV/zZNOmhOD/DtDxwYl/I5kL2hiEEtTETEmjerX0vq742eFyg==
-----END RSA PRIVATE KEY-----
  • app/settings.py
import datetime

PRIVATE_PEM = open("app/certs/private_key.pem", "r").read()

TOKEN_EXPIRE = datetime.datetime.utcnow() + datetime.timedelta(minutes=60)

JWKS = {
    "keys": [
        {
            "alg": "RS256",
            "kid": "{ランダムに生成された文字列(鍵のID)}",
            "n": "{pem-jwkで作成された値}", 
            "use": "sig",
            "e": "{pem-jwkで作成された値}",
            "kty": "{pem-jwkで作成された値}"
        }
    ]
}

ユーザの認証

ユーザの認証を行った際、JWTを発行し返却します。
今回は PyJWT を使用してJWTの発行、検証を行います。

※ コードは抜粋、詳細はGitHubから

  • app/lib/authenticate.py
from app.settings import JWKS, PRIVATE_PEM, TOKEN_EXPIRE

import jwt

async def authenticate_user(
        db: Session = Depends(get_db),
        credentials: HTTPBasicCredentials = Depends(security)):
    user = crud.authenticate_user(db,
                                  credentials.username,
                                  credentials.password)
    if user:
        payload = {
            "exp": TOKEN_EXPIRE,
            "email": user.email,
            "user": user.name
        }
        token = jwt.encode(payload,
                           PRIVATE_PEM,
                           algorithm=JWKS["keys"][0]["alg"],
                           headers={"kid": JWKS["keys"][0]["kid"]})

        return {"access_token": token}
    else:
        raise HTTPException(
                status_code=401,
                detail="Incorrect email or password",
                headers={"WWW-Authenticate": "Basic"})

payloadには認証後に使用する情報を含めておきます。
今回はメールアドレスとユーザ名を含めています。

※ 値は任意に指定できますが、予約されたものが存在します (expなど)、詳細は予約済みクレーム名 を参照ください。

  • app/user/view.py
from app.lib.authenticate import authenticate_user

from fastapi import APIRouter, Depends

router = APIRouter()

@router.post("/user/_authenticate", tags=["user"])
async def authenticate(token: str = Depends(authenticate_user)):
    return token

JWTによる認証

JWTの署名を公開鍵を使用して検証する事で、その鍵が正しく認証プロバイダから発行されたかどうかを検証します。
今回は発行したJWTを外部サービスから利用する事も想定し、JWKを使用して認証を行います。

  • app/lib/authorize.py
import json

from app.settings import JWKS

from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import APIKeyHeader

import jwt

router = APIRouter()

api_key = APIKeyHeader(name="Authorization", auto_error=False)

async def authorize_user(
        authorization: str = Depends(api_key)):
    credentials_exception = HTTPException(
        status_code=401,
        detail="Could not validate token",
        headers={"WWW-Authenticate": authorization},
    )

    if authorization:
        auth = authorization.split(" ")
    if len(auth) != 2:
        raise credentials_exception
    if auth[0] != "Bearer":
        raise credentials_exception

    id_token = auth[1]

    public_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(JWKS["keys"][0]))
    try:
        payload = jwt.decode(
                id_token,
                public_key,
                algorituhms=JWKS["keys"][0]["alg"])
    except Exception:
        raise credentials_exception

    return {"user": payload["user"], "email": payload["email"]}
  • app/user/view.py
from app.lib.authenticate import authenticate_user
from app.lib.authorize import authorize_user

from fastapi import APIRouter, Depends

router = APIRouter()

@router.post("/user/_authenticate", tags=["user"])
async def authenticate(token: str = Depends(authenticate_user)):
    return token

@router.get("/user", tags=["user"])
async def authorize(user: bool = Depends(authorize_user)):
    return user

あとがき

JWTを使用した認証方法を記述してきました。
今回はシンプルにJWTの発行、検証だけでしたが、OAuth 2.0 フローに従えば OAuth 2.0の認証プロバイダを作成する事もできます。
サービスの規模や必要に応じて、JWTだけを使用したシンプルな仕組みにするか、OAuth 2.0 のような仕組みを活用するか、判断していけば良いと思います。

今回のコードはGitHubに動くコードして上がっているので、参考にしてみてください。

GitHub - fealone/auth_example_with_fastapi: Authenticate example with FastAPI.
Authenticate example with FastAPI. Contribute to fealone/auth_example_with_fastapi development by creating an account on GitHub.