Screaming Loud

日々是精進

fastAPIのbackgroundTasks内での同期処理にご注意

最近Pythonを書いており、Goの雰囲気でfastAPIのbackgroundTasks内で重い同期処理を実行するコードを書いたら、他のレスポンスが止まる実装になってしまって困ったので、その際の備忘録として残しておきます。

結論

  • backgroundTasksは async def を渡すか def を渡すかで変わる
  • そもそもasyncはシングルスレッド

backgroundTasksに def を渡すか async def を渡すか

とりあえず重い処理はあとに回して、レスポンスを返したいときに使えるbackgroundTasksです

バックグラウンドタスク - FastAPI

実はbackgroundTasksは async def を渡すか def を渡すかで処理が変わります。 以下のコードはas_asyncパラメータによってどちらかを渡すかの処理になっています。

import threading
from fastapi import BackgroundTasks, Depends

router = FastAPI()
@router.get("/")
async def heavy(as_async: int, background_tasks: BackgroundTasks):
    print(f"main thread: {threading.get_ident()}")
    if as_async:
        background_tasks.add_task(async_heavy_task)
    else:
        background_tasks.add_task(heavy_task)
    return "ok"


async def async_heavy_task():
    print(f"async bg thread: {threading.get_ident()}")
    time.sleep(10)

def heavy_task():
    print(f"sync bg thread: {threading.get_ident()}")
    time.sleep(10)

実行してみると以下のようにasyncで関数を渡しているほうがレスポンスに10sかかっています。

$ time curl "http://localhost:80/?as_async=0"
"ok"curl "http://localhost:80/?as_async=0"  0.00s user 0.00s system 41% cpu 0.019 total

$ time curl "http://localhost:80/?as_async=1"
"ok"curl "http://localhost:80/?as_async=1"  0.00s user 0.01s system 0% cpu 10.047 total

これをprintしたログを見てみると以下のように def で渡した場合は別スレッドで実行していますが、 async def で渡した場合はmainスレッドで実行しています。

## async defの場合
 main thread: 281473547505696
 async bg thread: 281473547505696
## defの場合
 main thread: 281473547505696
 sync bg thread: 281473494217088

以上のことからasyncでない重い処理を async def としてメインスレッドが固まってレスポンスが遅くなります。 またメインスレッドを止めているので、他の処理も詰まってしまうのでAPI全体がブロックされてしまうので、非常に危険です。

こちらの記事が参考になりました。

qiita.com

zenn.dev

余談

上記のことがわかったあと、同期関数にするよう実装を変更しようとしました。 SQLAlchemyの AsyncSession をつかっていたので、同期の Session を作成しようと新しくコネクションを作成する実装を書いたのですが、以下のエラーが出てどうしても繋がりませんでした。

MissingGreenlet("greenlet_spawn has not been called; can't call await_only() here. Was IO attempted in an unexpected place?"

この文言で検索しても、大体eager load周りの話ばかりで、そもそもjoinしてないクエリも通らないからおかしいと思っていました。

が、結論は dsnが間違っていただけでした。

async_sessionは postgresql+asyncpg であるのに対し、 同期の session は postgresql+psycopg2 です。(他にもコネクタはいくつかありますが)

参考: PostgreSQL — SQLAlchemy 2.0 Documentation

これを気づくのに結構時間がかかってしまったので誰かの助けになれば。