Webリクエストの裏で5秒以上かかる処理を抱えると、リクエストはタイムアウトするかワーカースレッドを食い潰します。Celery と Redis を組み合わせれば、HTTPレイヤーから重い仕事を切り離してバックグラウンドで走らせられます。本記事は Celery 5.6.3(2026年1月リリース)を前提に、最小構成から本番運用に必要な設定、5.5系で入った Pydantic 引数検証までまとめます。
Celeryでバックグラウンド処理を切り出す理由
同期処理のままだと何が止まるか
FastAPI や Django で重い処理を同期で書くと、対象リクエストが返らないだけでは済みません。Uvicorn のワーカープロセスがブロックされ、後続もキューに溜まります。workers=4 で 10秒の処理が並ぶと、5本目以降は待ち行列。
同期で済ませてよい線引きは P95レイテンシで300ms以内。それを超える処理(画像変換、PDF生成、外部API集計、メール送信)はバックグラウンドに逃がします。
CeleryとRedisを選ぶ判断軸
Python製の非同期タスクキューは Celery、RQ、Dramatiq、Huey が主要な選択肢。Celery を選ぶ理由は次のとおり。
| 項目 | Celery 5.6 | RQ | Dramatiq |
|---|---|---|---|
| ブローカー選択肢 | Redis / RabbitMQ / SQS / GCP Pub/Sub | Redisのみ | Redis / RabbitMQ |
| 定期実行 | Celery Beat 同梱 | rq-scheduler 別パッケージ | 標準サポート |
| Pydantic引数検証 | 5.5以降ネイティブ対応 | 未対応 | middleware自作 |
| 学習コスト | 高い(設定項目が多い) | 低い | 中程度 |
規模が小さければ RQ で十分です。ただし、ブローカー差し替えや Beat による定期実行、Pydantic で型検証された引数が必要なら Celery 一択。
最小構成のworkerを起動するまで
必要なものを3行で揃える
Redis をローカルで動かし、Celery 5.6.3 と redis ライブラリを入れます。
docker run -d -p 6379:6379 --name local-redis redis:7
uv add 'celery[redis]==5.6.3'
uv add redis
実行結果:
$ celery --version
5.6.3 (immunity)
Celery 5.6 は Python 3.9 以上が必須です。3.8 系のままで pip install celery すると 5.4 系がインストールされ、Pydantic サポートを欠く構成になります。
tasks.pyとworker起動
公式ドキュメントの “First Steps with Celery” にある最小構成をそのまま動かします。
# tasks.py
from celery import Celery
app = Celery(
'demo',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
)
@app.task
def add(x: int, y: int) -> int:
return x + y
worker を立ち上げます。
celery -A tasks worker --loglevel=info --concurrency=4
実行結果(抜粋):
-------------- celery@host v5.6.3 (immunity)
--- ***** -----
-- ******* ---- Linux-6.10
- ** ---------- [config]
- ** ---------- .> app: demo:0x10b0f8d70
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost:6379/1
- *** --- * --- .> concurrency: 4 (prefork)
[tasks]
. tasks.add
[2026-05-11 14:02:11,233: INFO/MainProcess] celery@host ready.
別シェルから呼び出すとキュー経由で結果が返ってきます。
>>> from tasks import add
>>> r = add.delay(2, 3)
>>> r.get(timeout=5)
5
broker_urlとresult_backendを分ける
上の例で broker と backend に異なる Redis DB(0番と1番)を当てています。狙いは次のとおり。
- FLUSHDB でブローカーキューだけクリアしたいとき、結果ストアと混ざっていると一発で消えてしまう
- 結果ストアは TTL を短くしたいケースが多く、ブローカーとはライフサイクルが違う
本番では Redis インスタンス自体を分けるのが安全。同じインスタンスで運用する場合でも、DBは必ず分けます。
Pydanticで引数の型を保証する
5.5で入った”Argument validation with Pydantic”
Celery 5.5(2025年3月)で pydantic=True オプションが入りました。公式ドキュメント “Argument validation with Pydantic” セクションに記載があります。型ヒントに Pydantic モデルを書いておくと、ワーカー側で自動的に BaseModel.model_validate() を通します。
from celery import Celery
from pydantic import BaseModel, EmailStr
app = Celery('demo', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
class MailJob(BaseModel):
to: EmailStr
subject: str
body: str
@app.task(pydantic=True)
def send_mail(job: MailJob) -> dict:
# job は MailJob 型で確実に到着する
return {'sent_to': job.to, 'len': len(job.body)}
呼び出し側は dict で渡せます。シリアライズは Celery が裏で面倒を見てくれます。
>>> send_mail.delay({'to': 'a@example.com', 'subject': 'hi', 'body': 'hello'}).get(timeout=5)
{'sent_to': 'a@example.com', 'len': 5}
不正な値を投げ込むと、ワーカー側で ValidationError が立ち上がりタスクは失敗扱いになります。
>>> send_mail.delay({'to': 'not-an-email', 'subject': 'x', 'body': 'y'}).get(timeout=5)
Traceback (most recent call last):
...
pydantic_core._pydantic_core.ValidationError: 1 validation error for MailJob
to
value is not a valid email address ...
strict=Trueで暗黙変換を止める
デフォルトは strict=False で、文字列 "3" を int に変換するなど lenient な挙動になります。型を厳密に守らせたいときは strict を有効化。
@app.task(pydantic=True, pydantic_strict=True)
def calc(value: int) -> int:
return value * 2
strict 有効下で calc.delay("3") を呼ぶと ValidationError で弾かれます。型ヒントで決めた契約をワーカー受信時にも担保できるので、本番では strict=True を既定にします。
autoretry_forとacks_lateで取りこぼしを防ぐ
外部API失敗を自動リトライ
外部APIを叩くタスクは、ネットワーク瞬断やレートリミットで定期的にコケます。autoretry_for と retry_backoff を組み合わせると、指数バックオフでリトライしてくれます。
import httpx
@app.task(
autoretry_for=(httpx.HTTPError,),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
max_retries=5,
)
def fetch_status(url: str) -> int:
return httpx.get(url, timeout=5).status_code
retry_backoff=True で初回1秒・2秒・4秒…と指数で増えます。retry_backoff_max=600 で上限を10分にキャップ。retry_jitter はリトライ時刻をランダムにずらして同時リトライの集中(thundering herd)を回避します。
acks_lateで処理中worker死亡に備える
Celeryのデフォルト挙動では、ワーカーがメッセージを受信した瞬間に Redis から削除します。タスク実行中に worker が OOM Kill されると、そのタスクは消えます。acks_late=True にすると、タスク完了後に ack を返す挙動に変わり、途中で死んだら別workerが再実行します。
app.conf.task_acks_late = True
app.conf.worker_prefetch_multiplier = 1
業務で acks_late を入れずに動かしていた頃、Pod 再起動と重なってメール送信ジョブが10件ほど蒸発し、再送対応に半日溶かしたことがあります。at-least-once セマンティクスを取るなら acks_late は最初から有効。
ただし副作用として、タスクは冪等(同じ入力で何度実行しても結果が変わらない)である必要があります。決済や課金など二重実行が許されない処理は、idempotency key を組み合わせて多重実行をガードします。
5.6で入ったメモリと安全性の改善
worker_eta_task_limitでメモリ枯渇を止める
Celery では apply_async(eta=...) や countdown= でスケジュール済みタスクを大量に積むと、worker プロセスが全件メモリに保持してしまい OOM を起こします。5.6.0 で worker_eta_task_limit 設定が追加され、保持件数の上限を指定できるようになりました。
app.conf.worker_eta_task_limit = 10000
この上限を超えると、それ以降の ETA タスクはブローカー側に戻されます。スケジュール済みジョブが万単位で積まれるバッチ系では入れておきます。
ログのブローカー認証情報サニタイズ
5.6 ではブローカー URL に含まれるパスワードがログ出力時に自動でマスクされます。redis://:secret@host/0 を使っていても、ログに secret がそのまま出ません。自前で URL マスクのフィルタを書いていた現場では、5.6 アップグレードで消せる実装です。
soft shutdownで実行中タスクを守る
5.5で入った worker_soft_shutdown_timeout は、SIGTERM 受信後に実行中タスクの完了を待つ時間を秒で指定します。
app.conf.worker_soft_shutdown_timeout = 30
Kubernetes の terminationGracePeriodSeconds と揃えておくのがコツ。Pod 終了前に進行中タスクをきれいに終わらせ、acks_late と組み合わせれば、デプロイ起因のタスクロスをほぼゼロに抑えられます。
FastAPIから呼び出すときの定番
delay/apply_asyncの使い分け
FastAPI のエンドポイントから Celery タスクを呼ぶ最小コード。.delay() は .apply_async(args=()) のショートカットです。
from fastapi import FastAPI
from tasks import send_mail
api = FastAPI()
@api.post('/mails')
def enqueue_mail(payload: dict):
result = send_mail.apply_async(
args=[payload],
queue='mail',
priority=5,
)
return {'task_id': result.id}
レスポンス例:
{"task_id": "f1b9c4a2-9f33-4b8e-8df1-6c2a1e3f0a72"}
キュー名や優先度を指定したいときは apply_async。引数だけで投げるなら delay で十分です。
結果取得は別エンドポイントで
呼び出し側は task_id だけ受け取り、結果はポーリングする設計が定番。同じリクエスト内で result.get() を呼ぶと、せっかく非同期にした意味が消えます。
from celery.result import AsyncResult
from tasks import app as celery_app
@api.get('/mails/{task_id}')
def get_mail_status(task_id: str):
r = AsyncResult(task_id, app=celery_app)
return {
'state': r.state, # PENDING / STARTED / SUCCESS / FAILURE
'result': r.result if r.ready() else None,
}
リアルタイム性が要るなら WebSocket か SSE を被せます。HTTP ポーリングは2〜3秒間隔。これより詰めると AsyncResult ごとの Redis GET が比例して増え、Redis 側の CPU 使用率が跳ねます。
Celery Beatで定期実行
cron 的な定期実行も Beat で書けます。crontab 文字列を Python で記述する形。
from celery.schedules import crontab
app.conf.beat_schedule = {
'cleanup-every-night': {
'task': 'tasks.cleanup',
'schedule': crontab(hour='3', minute='15'),
},
}
Beat プロセスを worker と別に起動します。
celery -A tasks beat --loglevel=info
実行結果:
[2026-05-11 03:15:00,011: INFO/MainProcess] Scheduler: Sending due task cleanup-every-night (tasks.cleanup)
Beat は単一プロセスで動かす制約があります。複数 Pod でデプロイすると同じジョブが二重起動するので、replicas=1 を厳守。
本番運用で踏みやすい罠
worker_prefetch_multiplierの初期値
デフォルトの worker_prefetch_multiplier=4 は短時間タスク向け。長時間タスクが混じると、prefetch されたまま実行を待つタスクが大量に発生し、片方の worker だけ詰まる現象が起きます。長時間タスク中心なら 1 に下げます。
app.conf.worker_prefetch_multiplier = 1
app.conf.task_acks_late = True
result_backendを放置するとRedisが膨らむ
結果ストアの TTL を設定しないと、結果データが Redis に永久に残ります。result_expires を秒で指定。
app.conf.result_expires = 3600 # 1時間で消す
結果取得をしないタスク(fire-and-forget)が混ざるなら、タスクごとに ignore_result=True を付けて結果保存自体を止めます。
Flowerでキュー詰まりを可視化
Celery の状態を眺める純正の管理UIが Flower。
uv add flower
celery -A tasks flower --port=5555
http://localhost:5555 でキューの長さ、worker の生死、リトライ件数が一覧できます。本番投入前に必ず動かしておきます。
まとめ
Celery 5.6.3 + Redis でPythonの非同期タスクキューを組むときに覚えておきたい設定。
- broker と result_backend は別の Redis DB に分け、result_expires を必ず指定する
- 5.5 以降は
pydantic=Trueでタスク引数を型検証し、本番は strict も有効にする autoretry_for+retry_backoff+retry_jitterで外部API依存タスクの瞬断耐性を確保task_acks_late=Trueと冪等設計のセットで at-least-once を実現する- 5.6 から使える
worker_eta_task_limitとworker_soft_shutdown_timeoutで本番事故を予防 - Celery Beat は replicas=1 厳守、Flower でキュー詰まりを常時監視
Redis と RabbitMQ で迷ったら、まず Redis で組んで運用感を掴みます。スループットが Redis の限界(数万msg/sec)を超えてきた段階で RabbitMQ や SQS への乗り換えを検討すれば十分間に合います。

