PythonのCelery × Redisで作る非同期タスクキュー入門

PythonのCelery × Redisで作る非同期タスクキュー入門 | mohablog

Webリクエストの裏で5秒以上かかる処理を抱えると、リクエストはタイムアウトするかワーカースレッドを食い潰します。Celery と Redis を組み合わせれば、HTTPレイヤーから重い仕事を切り離してバックグラウンドで走らせられます。本記事は Celery 5.6.3(2026年1月リリース)を前提に、最小構成から本番運用に必要な設定、5.5系で入った Pydantic 引数検証までまとめます。

目次

Celeryでバックグラウンド処理を切り出す理由

同期処理のままだと何が止まるか

FastAPI や Django で重い処理を同期で書くと、対象リクエストが返らないだけでは済みません。Uvicorn のワーカープロセスがブロックされ、後続もキューに溜まります。workers=4 で 10秒の処理が並ぶと、5本目以降は待ち行列。

同期で済ませてよい線引きは P95レイテンシで300ms以内。それを超える処理(画像変換、PDF生成、外部API集計、メール送信)はバックグラウンドに逃がします。

CeleryとRedisを選ぶ判断軸

Python製の非同期タスクキューは Celery、RQ、Dramatiq、Huey が主要な選択肢。Celery を選ぶ理由は次のとおり。

項目Celery 5.6RQDramatiq
ブローカー選択肢Redis / RabbitMQ / SQS / GCP Pub/SubRedisのみRedis / RabbitMQ
定期実行Celery Beat 同梱rq-scheduler 別パッケージ標準サポート
Pydantic引数検証5.5以降ネイティブ対応未対応middleware自作
学習コスト高い(設定項目が多い)低い中程度

規模が小さければ RQ で十分です。ただし、ブローカー差し替えや Beat による定期実行、Pydantic で型検証された引数が必要なら Celery 一択。

最小構成のworkerを起動するまで

必要なものを3行で揃える

Redis をローカルで動かし、Celery 5.6.3 と redis ライブラリを入れます。

docker run -d -p 6379:6379 --name local-redis redis:7
uv add 'celery[redis]==5.6.3'
uv add redis

実行結果:

$ celery --version
5.6.3 (immunity)

Celery 5.6 は Python 3.9 以上が必須です。3.8 系のままで pip install celery すると 5.4 系がインストールされ、Pydantic サポートを欠く構成になります。

tasks.pyとworker起動

公式ドキュメントの “First Steps with Celery” にある最小構成をそのまま動かします。

# tasks.py
from celery import Celery

app = Celery(
    'demo',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1',
)

@app.task
def add(x: int, y: int) -> int:
    return x + y

worker を立ち上げます。

celery -A tasks worker --loglevel=info --concurrency=4

実行結果(抜粋):

 -------------- celery@host v5.6.3 (immunity)
--- ***** -----
-- ******* ---- Linux-6.10
- ** ---------- [config]
- ** ---------- .> app:         demo:0x10b0f8d70
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/1
- *** --- * --- .> concurrency: 4 (prefork)
[tasks]
  . tasks.add
[2026-05-11 14:02:11,233: INFO/MainProcess] celery@host ready.

別シェルから呼び出すとキュー経由で結果が返ってきます。

>>> from tasks import add
>>> r = add.delay(2, 3)
>>> r.get(timeout=5)
5

broker_urlとresult_backendを分ける

上の例で brokerbackend に異なる Redis DB(0番と1番)を当てています。狙いは次のとおり。

  • FLUSHDB でブローカーキューだけクリアしたいとき、結果ストアと混ざっていると一発で消えてしまう
  • 結果ストアは TTL を短くしたいケースが多く、ブローカーとはライフサイクルが違う

本番では Redis インスタンス自体を分けるのが安全。同じインスタンスで運用する場合でも、DBは必ず分けます。

Pydanticで引数の型を保証する

5.5で入った”Argument validation with Pydantic”

Celery 5.5(2025年3月)で pydantic=True オプションが入りました。公式ドキュメント “Argument validation with Pydantic” セクションに記載があります。型ヒントに Pydantic モデルを書いておくと、ワーカー側で自動的に BaseModel.model_validate() を通します。

from celery import Celery
from pydantic import BaseModel, EmailStr

app = Celery('demo', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

class MailJob(BaseModel):
    to: EmailStr
    subject: str
    body: str

@app.task(pydantic=True)
def send_mail(job: MailJob) -> dict:
    # job は MailJob 型で確実に到着する
    return {'sent_to': job.to, 'len': len(job.body)}

呼び出し側は dict で渡せます。シリアライズは Celery が裏で面倒を見てくれます。

>>> send_mail.delay({'to': 'a@example.com', 'subject': 'hi', 'body': 'hello'}).get(timeout=5)
{'sent_to': 'a@example.com', 'len': 5}

不正な値を投げ込むと、ワーカー側で ValidationError が立ち上がりタスクは失敗扱いになります。

>>> send_mail.delay({'to': 'not-an-email', 'subject': 'x', 'body': 'y'}).get(timeout=5)
Traceback (most recent call last):
  ...
pydantic_core._pydantic_core.ValidationError: 1 validation error for MailJob
to
  value is not a valid email address ...

strict=Trueで暗黙変換を止める

デフォルトは strict=False で、文字列 "3" を int に変換するなど lenient な挙動になります。型を厳密に守らせたいときは strict を有効化。

@app.task(pydantic=True, pydantic_strict=True)
def calc(value: int) -> int:
    return value * 2

strict 有効下で calc.delay("3") を呼ぶと ValidationError で弾かれます。型ヒントで決めた契約をワーカー受信時にも担保できるので、本番では strict=True を既定にします。

autoretry_forとacks_lateで取りこぼしを防ぐ

外部API失敗を自動リトライ

外部APIを叩くタスクは、ネットワーク瞬断やレートリミットで定期的にコケます。autoretry_forretry_backoff を組み合わせると、指数バックオフでリトライしてくれます。

import httpx

@app.task(
    autoretry_for=(httpx.HTTPError,),
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True,
    max_retries=5,
)
def fetch_status(url: str) -> int:
    return httpx.get(url, timeout=5).status_code

retry_backoff=True で初回1秒・2秒・4秒…と指数で増えます。retry_backoff_max=600 で上限を10分にキャップ。retry_jitter はリトライ時刻をランダムにずらして同時リトライの集中(thundering herd)を回避します。

acks_lateで処理中worker死亡に備える

Celeryのデフォルト挙動では、ワーカーがメッセージを受信した瞬間に Redis から削除します。タスク実行中に worker が OOM Kill されると、そのタスクは消えます。acks_late=True にすると、タスク完了後に ack を返す挙動に変わり、途中で死んだら別workerが再実行します。

app.conf.task_acks_late = True
app.conf.worker_prefetch_multiplier = 1

業務で acks_late を入れずに動かしていた頃、Pod 再起動と重なってメール送信ジョブが10件ほど蒸発し、再送対応に半日溶かしたことがあります。at-least-once セマンティクスを取るなら acks_late は最初から有効

ただし副作用として、タスクは冪等(同じ入力で何度実行しても結果が変わらない)である必要があります。決済や課金など二重実行が許されない処理は、idempotency key を組み合わせて多重実行をガードします。

5.6で入ったメモリと安全性の改善

worker_eta_task_limitでメモリ枯渇を止める

Celery では apply_async(eta=...)countdown= でスケジュール済みタスクを大量に積むと、worker プロセスが全件メモリに保持してしまい OOM を起こします。5.6.0 で worker_eta_task_limit 設定が追加され、保持件数の上限を指定できるようになりました。

app.conf.worker_eta_task_limit = 10000

この上限を超えると、それ以降の ETA タスクはブローカー側に戻されます。スケジュール済みジョブが万単位で積まれるバッチ系では入れておきます。

ログのブローカー認証情報サニタイズ

5.6 ではブローカー URL に含まれるパスワードがログ出力時に自動でマスクされます。redis://:secret@host/0 を使っていても、ログに secret がそのまま出ません。自前で URL マスクのフィルタを書いていた現場では、5.6 アップグレードで消せる実装です。

soft shutdownで実行中タスクを守る

5.5で入った worker_soft_shutdown_timeout は、SIGTERM 受信後に実行中タスクの完了を待つ時間を秒で指定します。

app.conf.worker_soft_shutdown_timeout = 30

Kubernetes の terminationGracePeriodSeconds と揃えておくのがコツ。Pod 終了前に進行中タスクをきれいに終わらせ、acks_late と組み合わせれば、デプロイ起因のタスクロスをほぼゼロに抑えられます。

FastAPIから呼び出すときの定番

delay/apply_asyncの使い分け

FastAPI のエンドポイントから Celery タスクを呼ぶ最小コード。.delay().apply_async(args=()) のショートカットです。

from fastapi import FastAPI
from tasks import send_mail

api = FastAPI()

@api.post('/mails')
def enqueue_mail(payload: dict):
    result = send_mail.apply_async(
        args=[payload],
        queue='mail',
        priority=5,
    )
    return {'task_id': result.id}

レスポンス例:

{"task_id": "f1b9c4a2-9f33-4b8e-8df1-6c2a1e3f0a72"}

キュー名や優先度を指定したいときは apply_async。引数だけで投げるなら delay で十分です。

結果取得は別エンドポイントで

呼び出し側は task_id だけ受け取り、結果はポーリングする設計が定番。同じリクエスト内で result.get() を呼ぶと、せっかく非同期にした意味が消えます。

from celery.result import AsyncResult
from tasks import app as celery_app

@api.get('/mails/{task_id}')
def get_mail_status(task_id: str):
    r = AsyncResult(task_id, app=celery_app)
    return {
        'state': r.state,  # PENDING / STARTED / SUCCESS / FAILURE
        'result': r.result if r.ready() else None,
    }

リアルタイム性が要るなら WebSocket か SSE を被せます。HTTP ポーリングは2〜3秒間隔。これより詰めると AsyncResult ごとの Redis GET が比例して増え、Redis 側の CPU 使用率が跳ねます。

Celery Beatで定期実行

cron 的な定期実行も Beat で書けます。crontab 文字列を Python で記述する形。

from celery.schedules import crontab

app.conf.beat_schedule = {
    'cleanup-every-night': {
        'task': 'tasks.cleanup',
        'schedule': crontab(hour='3', minute='15'),
    },
}

Beat プロセスを worker と別に起動します。

celery -A tasks beat --loglevel=info

実行結果:

[2026-05-11 03:15:00,011: INFO/MainProcess] Scheduler: Sending due task cleanup-every-night (tasks.cleanup)

Beat は単一プロセスで動かす制約があります。複数 Pod でデプロイすると同じジョブが二重起動するので、replicas=1 を厳守。

本番運用で踏みやすい罠

worker_prefetch_multiplierの初期値

デフォルトの worker_prefetch_multiplier=4 は短時間タスク向け。長時間タスクが混じると、prefetch されたまま実行を待つタスクが大量に発生し、片方の worker だけ詰まる現象が起きます。長時間タスク中心なら 1 に下げます。

app.conf.worker_prefetch_multiplier = 1
app.conf.task_acks_late = True

result_backendを放置するとRedisが膨らむ

結果ストアの TTL を設定しないと、結果データが Redis に永久に残ります。result_expires を秒で指定。

app.conf.result_expires = 3600  # 1時間で消す

結果取得をしないタスク(fire-and-forget)が混ざるなら、タスクごとに ignore_result=True を付けて結果保存自体を止めます。

Flowerでキュー詰まりを可視化

Celery の状態を眺める純正の管理UIが Flower。

uv add flower
celery -A tasks flower --port=5555

http://localhost:5555 でキューの長さ、worker の生死、リトライ件数が一覧できます。本番投入前に必ず動かしておきます。

まとめ

Celery 5.6.3 + Redis でPythonの非同期タスクキューを組むときに覚えておきたい設定。

  • broker と result_backend は別の Redis DB に分け、result_expires を必ず指定する
  • 5.5 以降は pydantic=True でタスク引数を型検証し、本番は strict も有効にする
  • autoretry_for + retry_backoff + retry_jitter で外部API依存タスクの瞬断耐性を確保
  • task_acks_late=True と冪等設計のセットで at-least-once を実現する
  • 5.6 から使える worker_eta_task_limitworker_soft_shutdown_timeout で本番事故を予防
  • Celery Beat は replicas=1 厳守、Flower でキュー詰まりを常時監視

Redis と RabbitMQ で迷ったら、まず Redis で組んで運用感を掴みます。スループットが Redis の限界(数万msg/sec)を超えてきた段階で RabbitMQ や SQS への乗り換えを検討すれば十分間に合います。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!
目次