Go pgx v5入門—database/sqlにないbatchとCOPYを使う

Go pgx v5入門—database/sqlにないbatchとCOPYを使う | mohablog

GoでPostgreSQLを触るとき、database/sqllib/pq を挿す構成が長く定番でした。ただ database/sql は複数のDBを同じ形で扱うための抽象層。ここを通すと、PostgreSQL固有の機能が落ちます。pgx v5 はその抽象層を外し、PostgreSQLのワイヤプロトコルを直接話すドライバです。

目次

database/sql 経由では落ちる機能がある

pgx を選ぶ理由は速度だけではありません。公式READMEの “Choosing Between the pgx and database/sql Interfaces” にこう書かれています。

The pgx interface is faster. Many PostgreSQL specific features such as LISTEN / NOTIFY and COPY are not available through the database/sql interface.

つまり COPY による一括ロードと LISTEN / NOTIFY は、database/sql 越しには触れません。pgx には2つの顔があります。database/sqldriver.Driver として使う互換モードと、pgxpool から直接使うネイティブモード。後者でだけ、上の機能とジェネリクスによる行スキャンが開きます。

pgx v5 の現行バージョン

記事執筆時点の最新は pgx v5.10.0(2026年6月3日リリース)です。サポートGoバージョンはREADMEの “Supported Go and PostgreSQL Versions” の通り Go 1.25 以上。モジュールパスは github.com/jackc/pgx/v5、プールは github.com/jackc/pgx/v5/pgxpool に分かれています。

どちらのモードを選ぶか

判断はシンプルです。アプリがPostgreSQLしか使わず、database/sql を要求する他ライブラリも挟まないなら、ネイティブモードを選びます。逆に複数DBを抽象化したい、または database/sql 前提の既存資産があるなら互換モード。この記事はネイティブモードを扱います。

pgxpool で接続する

接続プールの概念そのものは Go のデータベース接続プーリングで何が変わる? で扱ったので、ここではpgx側のAPIだけ見ます。最小構成は pgxpool.New 一発です。

package main

import (
	"context"
	"log"
	"os"

	"github.com/jackc/pgx/v5/pgxpool"
)

func main() {
	pool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatalf("connect: %v", err)
	}
	defer pool.Close()

	var now string
	if err := pool.QueryRow(context.Background(), "select now()").Scan(&now); err != nil {
		log.Fatal(err)
	}
	log.Println("connected:", now)
}

実行結果。

2026/06/20 10:12:33 connected: 2026-06-20 10:12:33.482911+09

pgxpool.New が返す *pgxpool.Pool は並行安全。複数のgoroutineから同じプールを共有でき、QueryRow / Query / Exec は呼ぶたびにプールから接続を借りて自動で返します。database/sql*sql.DB と同じ感覚で使えます。

プールの上限は Config で決める

本番では MaxConns を握っておきたい場面が出ます。接続文字列をパースしてから値を差し込みます。

cfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
if err != nil {
	log.Fatal(err)
}
cfg.MaxConns = 10
cfg.MinConns = 2
cfg.MaxConnLifetime = time.Hour

pool, err := pgxpool.NewWithConfig(context.Background(), cfg)

デフォルトの MaxConns は4とCPU数の大きい方。PostgreSQL側の max_connections を超えない範囲で決めます。複数インスタンスから接続するなら、インスタンス数 × MaxConns がサーバ上限を割らないかを先に計算しておきます。

クエリ結果を構造体に流し込む

database/sql で一番だるいのは行スキャンです。列を1つ足すたびに rows.Scan(&u.ID, &u.Name, ...) の引数を直し、for rows.Next() ループを手で回す。pgx v5 はこの定型をジェネリクスで畳みます。

pgx.CollectRows で一括スキャン

パッケージドキュメントの “Query Interface” にある pgx.CollectRowspgx.RowToStructByName を組み合わせます。

type User struct {
	ID    int64  `db:"id"`
	Name  string `db:"name"`
	Email string `db:"email"`
}

func activeUsers(ctx context.Context, pool *pgxpool.Pool) ([]User, error) {
	rows, err := pool.Query(ctx,
		"select id, name, email from users where active = $1", true)
	if err != nil {
		return nil, err
	}
	return pgx.CollectRows(rows, pgx.RowToStructByName[User])
}

実行すると、行が []User に詰まって返ります。

[{1 Alice alice@example.com} {2 Bob bob@example.com}]

CollectRows は内部で rows.Close() まで面倒を見ます。for rows.Next()rows.Err() のチェックも要りません。同じことを database/sql で書くとこうなります。

rows, err := db.QueryContext(ctx,
	"select id, name, email from users where active = $1", true)
if err != nil {
	return nil, err
}
defer rows.Close()

var users []User
for rows.Next() {
	var u User
	if err := rows.Scan(&u.ID, &u.Name, &u.Email); err != nil {
		return nil, err
	}
	users = append(users, u)
}
if err := rows.Err(); err != nil {
	return nil, err
}

同じ結果に対して、定型のループとエラーチェックが消えます。列順とScan引数の対応ずれという定番のバグも、ここでは起きません。

db タグで列名を合わせる

RowToStructByName はフィールド名と列名を大文字小文字を無視して照合します。スネークケースの列とキャメルケースのフィールドがずれるときは、db タグで明示します。タグの仕組み自体は Go structタグを読み解く で整理しました。位置で合わせる RowToStructByPos もありますが、列順への依存が生まれるので名前照合を基本にします。

SendBatch で往復を1回にまとめる

往復回数を1回に畳めるのが、ネイティブモードのバッチ送信です。1000件のINSERTを Exec のループで投げると、クエリごとにサーバとの往復が1回ずつ発生します。ネットワーク往復が1ミリ秒なら、1000回のループは送信内容と無関係に1秒近くを往復待ちだけで消費します。

1行ずつ Exec は往復がボトルネックになる

まず素朴な書き方。これは件数が増えるほど遅くなります。

for _, u := range newUsers {
	_, err := pool.Exec(ctx,
		"insert into users (name, email) values ($1, $2)", u.Name, u.Email)
	if err != nil {
		return err
	}
}

1件あたり「送信 → 実行 → 応答」の往復が1セット。N件ならN往復です。DBの処理時間より、往復のレイテンシが支配的になります。

Batch にキューしてまとめて送る

pgx.Batch にクエリを積み、SendBatch で一度に送ります。ドキュメントの “Copy Protocol” の手前、バッチの節にある通り、キューした全クエリが1回の送信にまとまります。

batch := &pgx.Batch{}
for _, u := range newUsers {
	batch.Queue("insert into users (name, email) values ($1, $2)", u.Name, u.Email)
}

br := pool.SendBatch(ctx, batch)
defer br.Close()

for range newUsers {
	if _, err := br.Exec(); err != nil {
		return fmt.Errorf("batch exec: %w", err)
	}
}

batch.Queue はまだ送りません。クエリを積むだけです。実際の送信は SendBatch の1回。N件のINSERTがN往復から1往復に減ります。返ってきた BatchResults から、キューした順に Exec / Query を呼んで各クエリの結果を取り出します。

// 100件INSERTしたときの往復回数
Exec ループ : 100 round trips
SendBatch   :   1 round trip

BatchResults は必ず閉じる

SendBatch が返す BatchResults は、閉じるまでその接続を占有します。defer br.Close() を忘れると接続がプールに戻らず、プール枯渇につながります。もう1つ。バッチ内のクエリは暗黙のトランザクションで実行されるので、途中の1件が失敗すると全体がロールバックされます。明示的にトランザクション境界を変えたいときは、後述の tx.SendBatch を使います。

CopyFrom で大量ロードする

INSERTがさらに大量、数万行のシードや一括取り込みなら COPY プロトコルが効きます。これは database/sql 経由では触れない、ネイティブモード専用の機能です。

rows := [][]any{
	{"Alice", "alice@example.com"},
	{"Bob", "bob@example.com"},
	{"Carol", "carol@example.com"},
}

n, err := pool.CopyFrom(
	ctx,
	pgx.Identifier{"users"},
	[]string{"name", "email"},
	pgx.CopyFromRows(rows),
)
if err != nil {
	return err
}
log.Printf("copied %d rows", n)

実行結果。

2026/06/20 10:21:07 copied 3 rows

CopyFrom は行をテキストのINSERT文に組み立てず、バイナリ形式でまとめて流します。返り値は挿入した行数。pgx.CopyFromRows はスライスをそのまま渡すヘルパーですが、巨大なデータを全部メモリに載せたくないときは CopyFromSource インターフェースを自前実装して、1行ずつ供給することもできます。

SendBatch と CopyFrom の使い分け

どちらも一括処理ですが、向き先が違います。整理すると次の通り。

  • SendBatch: 種類の違うクエリ(INSERT・UPDATE・SELECTの混在)を1往復にまとめたいとき
  • CopyFrom: 同じテーブルへの大量INSERT1種類を、最速で流し込みたいとき

混在クエリのまとめは CopyFrom ではできません。逆に純粋な大量INSERTを SendBatch でやると、1文ずつのINSERTを積むぶん CopyFrom ほどは速くなりません。

pgx・database/sql・sqlx の比較

同じPostgreSQLアクセスでも、選んだ層で触れる機能が変わります。生SQLを型安全に書く sqlx や、ORMの GORM との位置関係を1枚にまとめます。

観点pgx(ネイティブ)database/sqlsqlx
接続の型pgxpool.Poolsql.DBsqlx.DB
行→構造体CollectRows + ジェネリクス手動ScanStructScan
COPYCopyFrom 可不可不可
LISTEN/NOTIFY不可不可
他DBへの移植不可(PostgreSQL専用)

移植性を捨てる代わりに、速度とCOPY・LISTEN/NOTIFYを取ります。MySQLとPostgreSQLを1つのコードで切り替えたいなら、pgxをネイティブで使う前提が崩れます。

トランザクションと PgBouncer の注意点

tx は Begin して defer Rollback

トランザクションは pool.Begin で開きます。defer tx.Rollback(ctx) を先に置いておくのが定石です。Commit 済みの RollbackErrTxClosed を返すだけで無害なので、commit前に早期returnしても接続が宙ぶらりんになりません。

tx, err := pool.Begin(ctx)
if err != nil {
	return err
}
defer tx.Rollback(ctx)

if _, err := tx.Exec(ctx,
	"update accounts set balance = balance - $1 where id = $2", amount, from); err != nil {
	return err
}
if _, err := tx.Exec(ctx,
	"update accounts set balance = balance + $1 where id = $2", amount, to); err != nil {
	return err
}
return tx.Commit(ctx)

片方の Exec が失敗したら、return err で抜けた先の defer がロールバックを走らせます。送金処理で片側だけ反映される事故を、構造で防げます。

PgBouncer 配下では QueryExecMode を変える

pgx はデフォルトでプリペアドステートメントを自動キャッシュします。ところがトランザクションプーリングモードの PgBouncer を挟むと、キャッシュした文ハンドルが別接続に割り当たってエラー。ドキュメントの “PgBouncer” の節にある通り、この構成では実行モードを落とします。

cfg, _ := pgxpool.ParseConfig(dsn)
cfg.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol

これでプリペアドステートメントを使わないシンプルプロトコルに切り替わります。PgBouncerを入れていて原因不明の prepared statement already exists が出たら、まずここを疑います。

まとめ

pgx v5 をネイティブモードで使うと、database/sql では閉じている機能が開きます。

  • pgxpool.New で接続し、プールは *pgxpool.Pool として並行安全に共有できる
  • pgx.CollectRowsRowToStructByName で、行スキャンの定型ループが消える
  • SendBatch はN件のクエリをN往復から1往復に畳む。BatchResults のClose漏れに注意
  • CopyFromdatabase/sql では触れないCOPYプロトコルで、大量INSERTを流し込む
  • 移植性を捨てる代わりにPostgreSQL固有機能を取る。これがネイティブモードの判断軸
  • PgBouncerのトランザクションプーリング下では QueryExecModeSimpleProtocol に落とす

PostgreSQLしか使わないと決めたサービスなら、database/sql の抽象層を外すだけで、COPYやバッチといった固有APIが直接使えます。現行は pgx v5.10.0、Go 1.25以上で動きます。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!
目次