FastAPIでユーザ認証機能を実装する方法【認証機能編】

FastAPIを使用し、Basic認証を実現するための方法を紹介します。
前回は最低限動作するFastAPI環境を作成したので、今回は認証機能編となります。
認証機能の実装から動作の確認まで行います。

環境構築編

ユーザ認証の実装

データベースとの接続

データベースを接続するためのライブラリを作成します。
接続先の情報はdocker-composeでMySQLに設定したものを使用します。
通常であれば設定ファイルや環境変数に接続先情報を外だしするべきですが、今回は簡略化のためリテラルで書き込みます。

  • src/app/lib/database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DB_URL = "mysql+pymysql://example:password@mysql/example?charset=utf8"

engine = create_engine(
        SQLALCHEMY_DB_URL,
        connect_args={}
        )

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

モデルの作成

ユーザの認証に使用するモデルを作成します。
今回はログインに使用するIDをメールアドレスとします。

  • src/app/user/models.py
from sqlalchemy import Boolean, Column, Integer, String
from sqlalchemy.orm import relationship

from app.lib.database import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(254), unique=True, index=True)
    name = Column(String(128), unique=True, index=True)
    hashed_password = Column(String(128))
    is_active = Column(Boolean, default=True)

認証機能の作成

モデルを作成できたので、次は認証機能を作成します。
まずはユーザのCRUD操作として認証のロジック部分を作成します。

  • src/app/user/crud.py
from passlib.context import CryptContext
from sqlalchemy.orm import Session

from . import models

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def authenticate_user(
        db: Session,
        email: str,
        password: str,
        expire: int,
        reuse: bool):
    user = get_user_by_email(db, email)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return True

def get_password_hash(password):
    return pwd_context.hash(password)

次に、他のAPIからも認証機能を呼び出せるようにライブラリとして認証機能を作成します。

  • src/app/lib/authenticate.py
from fastapi import Depends, HTTPException
from sqlalchemy.orm import Session
from fastapi import APIRouter
from fastapi.security import (
        HTTPBasic,
        HTTPBasicCredentials)

from app.user import crud, models
from app.lib.database import SessionLocal, engine

router = APIRouter()

security = HTTPBasic()

models.Base.metadata.create_all(bind=engine)

def get_db():
    try:
        db = SessionLocal()
        yield db
    finally:
        db.close()

async def authenticate_user(
        db: Session = Depends(get_db),
        credentials: HTTPBasicCredentials = Depends(security)):
    if crud.authenticate_user(db,
                              credentials.username,
                              credentials.password):
        return True
    else:
        raise HTTPException(
                status_code=401,
                detail="Incorrect email or password",
                headers={"WWW-Authenticate": "Basci"})

HelloWorldの実装

HelloWorldAPIの実装

最後に、動作確認するためのHelloWorldのAPIを作成します。

  • src/app/user/view.py
from fastapi import Depends
from fastapi import APIRouter
from app.lib.authenticate import authenticate_user

router = APIRouter()

@router.get("/user/helloworld", tags=["user"])
async def get_helloworld(is_auth: bool = Depends(authenticate_user)):
    return "HelloWorld"

HelloWorldAPIを追加

APIを実装したので、FastAPIにAPIを追加します。
最初に作成したFastAPIの実行ファイルを修正します。

  • src/app/main.py
 from fastapi import FastAPI
+from .user import view as user

 app = FastAPI()
+app.include_router(user.router)

動作テスト

ユーザの作成

認証機能の実装が終わったので、動作確認を行うためにユーザを作成します。
ユーザ登録用のAPIを作成しても良いのですが、特権管理機能を作成していないので今回はスクリプトを作成して登録します。
docker-composeでMySQLのポートをホストに開放していないので、DockerコマンドからIPアドレスを確認し、直接接続するようにします。

CONTAINER_ID=$(docker ps --filter name=examplefastapi_mysql_1 -q)
TARGET_IP=$(docker inspect ${CONTAINER_ID}  -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}')
echo $TARGET_IP
mysql -h ${TARGET_IP} -uexample -ppassword example -e"insert into users (email,name,hashed_password) values('example@example.com','example','$2b$12$sUi6SEyXzXH8qR9wXRHjPeahHPPf4It0nrfL/7c2xRcKdwek7OCKq');"

ハッシュ化されたパスワードの作成方法

パスワードを変えたい場合は、Pythonで以下の方法で作成する事ができます。
パスワードの値を任意に作成したものに置き換えてください。

from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
pwd_context.hash("password")

openAPIによる動作テスト

[http://localhost/docs]にアクセスし、データベースに登録したユーザで動作確認を行います。
HelloWorld がレスポンスとして帰ってきたら正常に動作完了です。

終わりに

FastAPIの環境構築から認証機能の実装まで説明してきました。
通常であればトークンを発行し認可の機能も必要ですが、認可の機能はロールなど考慮する点が多いため今回は省略しました。
機会があれば、認可の実装も解説していきたいと思います。

追記

JWTを発行する記事も書きました。
https://lonesec.com/2020/12/18/user-auth-with-jwt-fastapi/

GitHubへのサンプル公開

GitHubへサンプルを公開しています。

fealone/auth_example_with_fastapi
Authenticate example with FastAPI. Contribute to fealone/auth_example_with_fastapi development by creating an account on GitHub.