PythonでWebSocketリアルタイム通信を実装する──サーバー・クライアント実装とよくある落とし穴

PythonでWebSocketリアルタイム通信を実装する──サーバー・クライアント実装とよくある落とし穴 | mohablog
目次

WebSocketを使ったリアルタイム通信が必要な場面

チャットアプリケーション、リアルタイム通知システム、ライブダッシュボードの実装を想像してください。HTTPの従来の「リクエスト→レスポンス」モデルでは、常にクライアント側からサーバーをポーリングする必要があり、レイテンシーが大きく、サーバー負荷も増加してしまいます。WebSocketなら、サーバーとクライアント間に双方向の永続接続を張ることで、低遅延でデータをやり取りできるんですね。

この記事では、Python 3.12とasyncio、websocketsライブラリ(12.0以上)を使ったWebSocket実装の実践的なパターンを紹介します。アンチパターンから始めて、なぜそれがダメなのか、そして本番環境で通用する実装方法まで、すべて解説していきます。

WebSocket通信の基礎──HTTPとの違い

最初、私もHTTPとWebSocketを混同していたんですが、WebSocketはプロトコルレベルで別の仕組みです。簡単に整理しておきますね。

要素 HTTP WebSocket
通信方向 リクエスト→レスポンス(単方向) 双方向(フルダプレックス)
接続 各リクエストごとに切断 接続を維持(永続接続)
レイテンシ 高い(接続オーバーヘッド) 低い
サーバープッシュ 不可(ポーリング必要) 可能
初期ハンドシェイク なし HTTPアップグレードリクエスト必須

WebSocketは初期段階ではHTTPでハンドシェイクし、その後プロトコルをアップグレードして双方向通信を開始します。公式ドキュメントによると、このハンドシェイク後のオーバーヘッドは非常に小さく、フレームレベルでデータを交換するため、HTTPよりもはるかに効率的だと言えます。

非同期処理の基本──asyncioとasync/await

WebSocket通信は複数のクライアントを同時に処理する必要があります。ここで重要なのがPythonの非同期I/O(asyncio)ですね。最初は違和感を感じるかもしれませんが、一度理解すると実装がぐっと楽になります。

asyncioは、ブロッキング操作(ネットワーク通信、ファイルI/Oなど)の間に他のタスクを実行することで、スレッドを使わずに高い並行性を実現する仕組みです。WebSocketサーバーは通常、asyncioベースで実装されます。

import asyncio

# アンチパターン: ブロッキング操作を使用
async def bad_example():
    # これはasync関数内での同期的な待機 → 他のタスクが実行されない
    import time
    time.sleep(5)  # NGです。ここで5秒間、他のタスクが待たされます
    print("終了")

# OKなパターン: await asyncio.sleep()を使用
async def good_example():
    # asyncioの非同期スリープ → 他のタスクが実行される
    await asyncio.sleep(5)
    print("終了")

この違いは現場で致命的になります。チャットサーバーで複数ユーザーが同時接続している場合、1人の接続処理でtime.sleep()を使うと、全員がブロックされてしまうんですね。

websocketsライブラリの選定と初期設定

PythonでWebSocketを実装するなら、websocketsライブラリがデファクトスタンダードです(バージョン12.0以上推奨)。公式ドキュメントが充実しており、asyncioネイティブに設計されているのが特徴ですね。

pip install websockets==12.0

websocketsは単なるプロトコル実装ではなく、接続管理、エラーハンドリング、プロトコルの自動ネゴシエーションを提供してくれるんです。最初は複雑に見えるかもしれませんが、一度理解すると再利用可能なパターンが見えてきます。

シンプルなWebSocketサーバーの実装

まず、最もシンプルなエコーサーバーから始めてみましょう。クライアントが送信したメッセージをそのまま返すサーバーです。

import asyncio
import websockets
from websockets.asyncio.server import serve

async def echo(websocket):
    """クライアントからのメッセージをエコーバックする"""
    try:
        async for message in websocket:
            print(f"受信: {message}")
            await websocket.send(f"エコー: {message}")
    except websockets.exceptions.ConnectionClosed:
        print("クライアントが切断しました")

async def main():
    async with serve(echo, "localhost", 8765):
        print("WebSocketサーバーがlocalhost:8765で起動しました")
        await asyncio.Future()  # 永遠に実行

if __name__ == "__main__":
    asyncio.run(main())

async for message in websocketイテレータプロトコルを使った記法ですね。クライアントがメッセージを送信するたびにループが実行され、非常に読みやすいです。

WebSocketクライアントの実装

次にクライアント側を実装してみます。サーバーにメッセージを送信し、応答を受け取る流れです。

import asyncio
import websockets

async def client():
    """WebSocketサーバーに接続してメッセージを送受信"""
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        # メッセージ送信
        await websocket.send("こんにちは")
        response = await websocket.recv()
        print(f"レスポンス: {response}")
        
        await websocket.send("テスト2")  
        response = await websocket.recv()
        print(f"レスポンス: {response}")

if __name__ == "__main__":
    asyncio.run(client())

websockets.connect()コンテキストマネージャーをサポートしているため、自動的に接続を閉じてくれるんです。これはメモリリーク防止に重要ですね。

複数クライアントの管理──ブロードキャスト実装

ここからが現場で必要なパターンです。チャットアプリのように、1つのメッセージを複数クライアントに送信する必要があることがあります。簡単そうに見えて、ハマるポイントがいくつかあるんですね。

アンチパターン: グローバル変数に接続を保存

最初、私はこんなコードを書いていました。

import asyncio
import websockets
from websockets.asyncio.server import serve

# アンチパターン: グローバル変数でセッション管理
connected_clients = set()

async def handler(websocket):
    connected_clients.add(websocket)
    try:
        async for message in websocket:
            # 全クライアントにブロードキャスト
            for client in connected_clients:
                try:
                    await client.send(message)
                except websockets.exceptions.ConnectionClosed:
                    # ここで削除するが、タイミング問題が発生
                    pass
    finally:
        connected_clients.discard(websocket)

async def main():
    async with serve(handler, "localhost", 8765):
        await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())

このパターンの問題点は、接続管理が煩雑で、race condition(競合状態)が発生しやすいことです。複数のasyncタスクが同時にconnected_clientsにアクセスするため、断続的にバグが発生してしまいます。

改善版: Set を使った安全な管理

公式ドキュメントを調べてみると、asyncio.Queueasyncio.Eventを使う方法が推奨されています。ただ、シンプルなブロードキャストなら、セットを同期プリミティブで保護する方が理解しやすいと思います。

import asyncio
import websockets
from websockets.asyncio.server import serve

class ConnectionManager:
    """WebSocket接続を管理するクラス"""
    def __init__(self):
        self.active_connections: set = set()
        self.lock = asyncio.Lock()
    
    async def connect(self, websocket):
        async with self.lock:
            self.active_connections.add(websocket)
    
    async def disconnect(self, websocket):
        async with self.lock:
            self.active_connections.discard(websocket)
    
    async def broadcast(self, message: str):
        """全クライアントにメッセージを送信"""
        # 接続中断を検出するため、失敗したクライアントを記録
        disconnected = set()
        
        async with self.lock:
            clients_copy = self.active_connections.copy()
        
        for client in clients_copy:
            try:
                await client.send(message)
            except websockets.exceptions.ConnectionClosed:
                disconnected.add(client)
        
        # 失敗したクライアントを削除
        for client in disconnected:
            await self.disconnect(client)

manager = ConnectionManager()

async def handler(websocket):
    await manager.connect(websocket)
    try:
        async for message in websocket:
            print(f"ブロードキャスト: {message}")
            await manager.broadcast(message)
    except websockets.exceptions.ConnectionClosed:
        pass
    finally:
        await manager.disconnect(websocket)

async def main():
    async with serve(handler, "localhost", 8765):
        print("WebSocketサーバー起動")
        await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())

ここで重要なのはasyncio.Lockを使った同期ですね。複数のタスクが同時にactive_connectionsにアクセスするのを防ぎます。また、クライアント一覧をコピーしてからループを回すことで、ループ中に接続が追加・削除されても安全です。

エラーハンドリングとタイムアウト

本番環境ではエラーハンドリングが重要になってきます。ネットワークは不安定で、クライアントがいつ切断されるか予測できませんからね。

接続タイムアウトの設定

長時間応答がないクライアントを切断する場合、asyncio.wait_for()を使うのが効果的です。

import asyncio
import websockets
from websockets.asyncio.server import serve

async def handler_with_timeout(websocket):
    try:
        while True:
            try:
                # 30秒のタイムアウトを設定
                message = await asyncio.wait_for(
                    websocket.recv(),
                    timeout=30.0
                )
                print(f"受信: {message}")
                await websocket.send(f"受信しました: {message}")
            except asyncio.TimeoutError:
                print("クライアントからの受信タイムアウト")
                await websocket.close(code=1000, reason="timeout")
                break
    except websockets.exceptions.ConnectionClosed:
        print("接続が閉じられました")

async def main():
    async with serve(handler_with_timeout, "localhost", 8765):
        print("WebSocketサーバー起動")
        await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())

タイムアウト処理を入れると、ゾンビ接続(データを送受信しないまま残った接続)を自動的に削除できるようになります。

本番環境でのデプロイ──uvicornとの組み合わせ

ローカルテストではうまく動作するが、本番環境で問題が出る──こういう経験はよくありますね。Python単体のサーバーはスレッドセーフでない場合があるため、本番ではASGIサーバー(uvicornなど)と組み合わせるのが標準的です。

Starlette/FastAPIを使う場合の実装例を紹介します。

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Set
import asyncio

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: Set[WebSocket] = set()
        self.lock = asyncio.Lock()
    
    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        async with self.lock:
            self.active_connections.add(websocket)
    
    async def disconnect(self, websocket: WebSocket):
        async with self.lock:
            self.active_connections.discard(websocket)
    
    async def broadcast(self, message: str):
        async with self.lock:
            connections_copy = self.active_connections.copy()
        
        for connection in connections_copy:
            try:
                await connection.send_text(message)
            except Exception:
                # エラーが出たら接続を削除
                await self.disconnect(connection)

manager = ConnectionManager()

@app.websocket("/ws/chat")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(data)
    except WebSocketDisconnect:
        await manager.disconnect(websocket)
    except Exception as e:
        print(f"エラー: {e}")
        await manager.disconnect(websocket)

FastAPIを使うと、WebSocket管理がHTTPハンドラーと統一されて、全体的にコードが保守しやすくなるんですね。本番環境ではuvicorn main:app --host 0.0.0.0 --port 8000 --workers 1のように起動します。

パフォーマンス最適化──メモリ使用量と処理速度

多数のクライアント(数千以上)を同時に処理する場合、パフォーマンスが重要になってきます。調べてみたところ、いくつかの最適化テクニックが有効だと分かりました。

バッチ処理でのブロードキャスト

複数のメッセージを受け取った場合、それぞれをすぐに全クライアントに送信するのではなく、数ミリ秒バッチ処理することで、ネットワークスループットを向上させられます。

import asyncio
from collections import deque

class BatchingConnectionManager:
    def __init__(self, batch_timeout_ms: float = 10):
        self.active_connections: set = set()
        self.lock = asyncio.Lock()
        self.message_queue: deque = deque()
        self.batch_timeout_ms = batch_timeout_ms / 1000.0
    
    async def start_batch_processor(self):
        """定期的にバッチ処理してブロードキャスト"""
        while True:
            await asyncio.sleep(self.batch_timeout_ms)
            
            # キューに溜まったメッセージを処理
            messages = []
            while self.message_queue:
                messages.append(self.message_queue.popleft())
            
            if messages:
                # 複数メッセージを1回のブロードキャストで送信
                await self._broadcast_batch(messages)
    
    async def _broadcast_batch(self, messages: list):
        async with self.lock:
            connections_copy = self.active_connections.copy()
        
        # バッチをまとめて送信
        batch_str = "\n".join(messages)
        for connection in connections_copy:
            try:
                await connection.send(batch_str)
            except Exception:
                await self.disconnect(connection)
    
    async def connect(self, websocket):
        async with self.lock:
            self.active_connections.add(websocket)
    
    async def disconnect(self, websocket):
        async with self.lock:
            self.active_connections.discard(websocket)
    
    async def queue_message(self, message: str):
        """メッセージをキューに追加(バッチ処理待ち)"""
        self.message_queue.append(message)

ベンチマーク結果によると、バッチサイズを10~50メッセージにすると、1000接続での処理速度が約30%向上するんです。ただし遅延が増えるため、リアルタイム性が求められるアプリではこの手法は避けるべきですね。

デバッグとログ出力

複雑な非同期処理はデバッグが難しいですね。ログ出力を適切に仕込むことが本当に重要です。

import logging
import sys
from datetime import datetime

# ログ設定
logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    stream=sys.stdout
)
logger = logging.getLogger(__name__)

async def handler_with_logging(websocket):
    client_id = id(websocket)
    logger.info(f"[{client_id}] クライアント接続")
    
    try:
        async for message in websocket:
            logger.debug(f"[{client_id}] 受信: {message[:50]}...")
            await websocket.send(f"OK: {message}")
    except Exception as e:
        logger.error(f"[{client_id}] エラー: {type(e).__name__}: {e}")
    finally:
        logger.info(f"[{client_id}] クライアント切断")

特に重要なのは、クライアントを一意に識別するIDをログに含めることなんです。複数接続を同時デバッグするとき、このIDがあれば追跡がはるかに楽になります。

よくある落とし穴と解決策

実務で見かけるバグパターンを3つ紹介します。

1. メモリリーク: 閉じられない接続が蓄積

接続をsetから削除し忘れると、ガベージコレクションの対象にならず、メモリリークが発生するんですね。

# NG パターン
async def bad_handler(websocket):
    clients.add(websocket)  # 追加
    # disconnectが呼ばれない場合、ここで蓄積

# OK パターン
async def good_handler(websocket):
    clients.add(websocket)
    try:
        # 処理
        pass
    finally:
        clients.discard(websocket)  # 必ず削除

2. デッドロック: await操作の無限待機

ロック取得後に別のロックを待つと、デッドロックが発生する可能性があります。

# NG パターン: ロック内でawaitして別のロック待機
async def bad_broadcast(self, message):
    async with self.lock:  # ロック1を取得
        for client in self.active_connections:
            await client.send(message)  # この中でロック2を待つ可能性

# OK パターン: ロック外で送信
async def good_broadcast(self, message):
    async with self.lock:
        clients_copy = self.active_connections.copy()  # コピーを取得
    
    for client in clients_copy:  # ロック外で送信
        await client.send(message)

3. タイムアウトなし: ハング接続が残存

asyncio.wait_for()をつけないと、クライアントが接続したまま何もしない場合、サーバーはずっと待ち続けてしまいます。

# NG パターン: タイムアウトなし
async def bad_handler(websocket):
    async for message in websocket:  # 永遠に待つ可能性
        pass

# OK パターン: タイムアウト付き
async def good_handler(websocket):
    try:
        while True:
            message = await asyncio.wait_for(
                websocket.recv(),
                timeout=60
            )
    except asyncio.TimeoutError:
        await websocket.close()

まとめ

  • WebSocketはHTTPの永続接続版で、双方向通信が可能。チャットやリアルタイムダッシュボードに最適
  • Pythonではasyncio + websocketsライブラリで実装。async/await構文が必須
  • 複数クライアント管理にはasyncio.Lockを使い、競合状態を防ぐこと
  • 本番環境ではFastAPI + uvicornで、スレッドセーフな環境を構築する
  • メモリリーク、デッドロック、タイムアウト対策が重要。finally句とwait_forを活用
  • ログに接続IDを含めることで、複数接続のデバッグが容易になる
  • バッチ処理で処理速度を向上させられるが、遅延とのトレードオフを考慮する

よくある質問(FAQ)

Q1. WebSocketはHTTPS(wss://)でも動作しますか?

はい、動作します。セキュアなWebSocket接続(wss://)を使う場合、SSL/TLS証明書が必要なんですね。uvicornの場合、--ssl-keyfile--ssl-certfileオプションで指定します。websocketsライブラリ単体の場合、serve(handler, ..., ssl=ssl_context)ssl.SSLContextを渡してください。

Q2. 1つのプロセスで何個のクライアント接続を同時処理できますか?

環境依存ですが、uvicornシングルワーカーで通常は数千接続が可能です。ただし、CPUコア数や応答処理の重さに左右されるんですね。ベンチマークテストを本番環境に近い設定で実施することを推奨します。

Q3. WebSocketが接続を失った場合、自動再接続できますか?

websocketsライブラリは自動再接続機能を持っていません。クライアント側でtry-exceptブロックで接続エラーをキャッチし、指数バックオフで再接続するコードを書く必要があります。FastAPIなら関連記事として「FastAPIのバックグラウンドタスクとの組み合わせ」も参考になると思いますよ。

Q4. WebSocketでバイナリデータ(画像など)を送信できますか?

できます。websocket.send_bytes()(またはバイナリモード)で送信するんですね。ただし大容量データは複数フレームに分割して送信し、クライアント側で再組立する必要があります。

Q5. WebSocketコネクションプールは必要ですか?

WebSocket自体が永続接続なので、HTTP接続プール(requests-htmlなど)とは異なります。サーバー側は接続管理クラスで一元化(本記事のConnectionManager的な)し、クライアント側は1つの接続を再利用するのが標準的だと言えます。複数の独立した通信チャネルが必要なら、同じ接続内でチャネルIDを付けるか、複数WebSocket接続を開くことになるんですね。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!
目次