Goのerrgroupで並行処理のエラーを集約する—WaitGroupとの使い分け

Goのerrgroupで並行処理のエラーを集約する—WaitGroupとの使い分け | mohablog

3つのAPIを並列に叩いて、1本でも失敗したら残りを止めたい。sync.WaitGroup でやると、エラーの受け渡しと中断処理を自分で組むことになります。errgroup はその2つを1つの型に畳み込みます。

目次

WaitGroupで複数goroutineのエラーを拾う面倒さ

並列処理の素朴な出発点は sync.WaitGroup です。ただ、各goroutineが返すエラーを集めようとした瞬間に話がややこしくなります。

共有変数とMutexが増えていく

WaitGroupはカウンタしか持ちません。エラーを外へ渡す口が無い。だから共有変数とロックを足すことになります。

func fetch(url string) error {
	resp, err := http.Get(url)
	if err != nil {
		return err
	}
	resp.Body.Close()
	return nil
}

func main() {
	urls := []string{"https://example.com", "https://bad.invalid"}

	var (
		wg       sync.WaitGroup
		mu       sync.Mutex
		firstErr error
	)
	for _, url := range urls {
		wg.Go(func() { // Go 1.25 の WaitGroup.Go
			if err := fetch(url); err != nil {
				mu.Lock()
				if firstErr == nil {
					firstErr = err
				}
				mu.Unlock()
			}
		})
	}
	wg.Wait()
	if firstErr != nil {
		fmt.Println("failed:", firstErr)
	}
}

実行結果。

failed: Get "https://bad.invalid": dial tcp: lookup bad.invalid: no such host

エラーを1つ保持するだけで MutexfirstErr の番をすることになります。「最初のエラーが出たら他を止める」を足すと、ここに contextcancel がさらに乗ってきます。

Go 1.25で起動は楽になったが集約は別問題

上のコードで使った wg.GoGo 1.25 で入った新メソッドです。リリースノートはこう書いています。

The new WaitGroup.Go method makes the common pattern of creating and counting goroutines more convenient.

Add(1)defer Done() が消えて起動は短くなりました。ただ、エラーの集約と中断は WaitGroup の守備範囲の外。そこを引き取るのが errgroup です。

errgroupで2つのHTTPフェッチを並列化する

最小の形から。golang.org/x/sync/errgroup はゼロ値の Group がそのまま使えます。

package main

import (
	"fmt"
	"net/http"

	"golang.org/x/sync/errgroup"
)

func main() {
	urls := []string{"https://example.com", "https://pkg.go.dev"}

	var g errgroup.Group
	for _, url := range urls {
		g.Go(func() error {
			resp, err := http.Get(url)
			if err != nil {
				return err
			}
			defer resp.Body.Close()
			fmt.Printf("%s -> %d\n", url, resp.StatusCode)
			return nil
		})
	}

	if err := g.Wait(); err != nil {
		fmt.Println("failed:", err)
		return
	}
	fmt.Println("all ok")
}

実行結果。出力順はgoroutineのスケジューリング次第で入れ替わります。

https://pkg.go.dev -> 200
https://example.com -> 200
all ok

Goメソッドにfunc() errorを渡すだけ

g.Gofunc() error を取ります。WaitGroupのときの MutexfirstErr も要りません。エラーは戻り値で返すだけ。ループ変数を退避する url := urlGo 1.22 以降は不要です。各反復で別変数になります。

Waitが最初のエラーを返す

g.Wait() は全goroutineの終了を待ち、最初に返った non-nil エラーを1つ返します。2本目以降のエラーは捨てられる。複数のエラーを全部見たいなら、自前でスライスに集める作りが要ります。

WithContextは最初のエラーで残りを止める

WithContext を使うと挙動が変わります。1本がエラーを返した瞬間に、派生 Context がキャンセルされます。

func main() {
	g, ctx := errgroup.WithContext(context.Background())

	// 30msで失敗するワーカー
	g.Go(func() error {
		time.Sleep(30 * time.Millisecond)
		return errors.New("query failed")
	})

	// 本来1秒かかるワーカー。ctxの中断を見ている
	g.Go(func() error {
		select {
		case <-time.After(1 * time.Second):
			return nil
		case <-ctx.Done():
			return ctx.Err()
		}
	})

	start := time.Now()
	err := g.Wait()
	fmt.Printf("waited %v, err: %v\n", time.Since(start).Round(time.Millisecond), err)
}

実行結果。

waited 30ms, err: query failed

2本目は1秒待つはずでした。が、1本目が30msで失敗して ctx がキャンセルされ、ctx.Done() が発火して即座に抜けます。1秒待たずに 30msWait が返りました。

キャンセルのタイミングは公式が明記している

派生 Context がいつキャンセルされるかは、WithContext のドキュメントにそのまま書いてあります。

The derived Context is canceled the first time a function passed to Go returns a non-nil error or the first time Wait returns, whichever occurs first.

キャンセルされるのは「最初のエラー」か「Waitの完了」の早い方。だから各ワーカーは ctx を受け取り、ctx.Done() を見る作りにしておく必要があります。見ていないワーカーは中断されず、最後まで走り切ります。

ctxを実際のI/Oに渡さないと中断は効かない

ここを踏みやすい。WithContext が返す ctx は、各ワーカーが実際のI/Oに渡して初めて効きます。前の最小例の http.Get のままでは ctx を見ないので、グループがキャンセルされても通信は走り続けます。

g.Go(func() error {
	req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err // ctxキャンセル時は context.Canceled が返る
	}
	defer resp.Body.Close()
	return nil
})

他のワーカーが先に失敗したときの実行結果。

failed: Get "https://example.com": context canceled

DBドライバなら QueryContext、自前のループなら ctx.Done() の監視。ctx を末端まで配るのが前提です。配り忘れると WithContext は中断の口だけ作って実際には何も止めません。

Waitが返すのは最初のエラーだけ

上の出力で errquery failed でした。2本目が返した ctx.Err()(context.Canceled)ではありません。Wait が返すのは時系列で最初の non-nil エラー。中断側のエラーで上書きされることはない。

SetLimitで同時実行数に上限をかける

1万件を一気に Go へ渡すと、goroutineが1万本立ちます。SetLimit で同時数に天井を付けられます。

var g errgroup.Group
g.SetLimit(3) // 同時に走るのは最大3本

for i := 0; i < 9; i++ {
	id := i
	g.Go(func() error {
		fmt.Printf("start %d\n", id)
		time.Sleep(100 * time.Millisecond)
		return nil
	})
}
g.Wait()

実行結果。3本ずつ約100ms間隔で動きます(各バッチ内の順序は不定)。

start 0
start 1
start 2
   (約100ms後)
start 3
start 4
start 5
   (約100ms後)
start 6
start 7
start 8

SetLimit に負数を渡すと無制限、0 なら全goroutineがブロックされます。制限はgoroutineが動いている間に変えてはいけません。空きが無いとき即座に false を返す TryGo も用意されています。

errgroupとWaitGroupはどちらを使うか

起動の手軽さだけなら Go 1.25 の WaitGroup.Go でも足ります。差が出るのはエラーと中断の扱い。

観点sync.WaitGroup (Go 1.25)errgroup
goroutineの起動wg.Go(func(){...})g.Go(func() error {...})
エラーの集約共有変数+Mutexを自前でWait が最初のnon-nilを返す
最初のエラーで中断context/channelを自前でWithContext で自動
同時実行数の制限セマフォを自前でSetLimit
panicの回復しないしない(v0.20.0時点)

エラーの集約も中断も要らない「ただ待つだけ」なら WaitGroup.Go で十分です。1つでも失敗したら全部畳みたい、同時数を絞りたい。そこが要件に入った時点で errgroup に寄せると、自前のロックとキャンセル配線を消せます。

panicはerrgroupが素通しする

panicは別扱いです。errgroup はワーカー内のpanicを回復しません。1本がpanicするとプロセスごと落ちます。

var g errgroup.Group
g.Go(func() error {
	var m map[string]int
	m["x"] = 1 // nil mapへの書き込み -> panic
	return nil
})
if err := g.Wait(); err != nil {
	fmt.Println("failed:", err) // ここには到達しない
}

実行結果。

panic: assignment to entry in nil map

goroutine 6 [running]:
main.main.func1(...)
	/path/to/main.go:11
exit status 2

Waiterr は受け取れません。recover() がどこにも無いので、panicはそのままランタイムへ抜けます。並列バッチの1本がnil参照でpanicし、Wait のエラーログだけ眺めて原因にたどり着けなかったことがあります。落ちていたのは Wait の手前でした。

ワーカー側でrecoverしてerrorに変える

panicをエラーとして扱いたいなら、ワーカーの中で recover し、名前付き戻り値に詰めます。

g.Go(func() (err error) {
	defer func() {
		if r := recover(); r != nil {
			err = fmt.Errorf("recovered: %v", r)
		}
	}()
	var m map[string]int
	m["x"] = 1
	return nil
})

実行結果。

failed: recovered: assignment to entry in nil map

これでpanicが Wait のエラーに変わり、WithContext 経由なら他のワーカーも中断されます。

panic伝播の提案は受理済みだが未リリース

この挙動を変える提案が動いています。panicを Wait 側へ伝播させる golang/go#53757 が受理済みで、PanicValue / PanicError 型で Wait がpanicを再送する設計です。ただし現行の golang.org/x/sync/errgroup v0.20.0 には未反映。今のところワーカー側の recover が要ります。

まとめ

errgroupは WaitGroup にエラー集約とキャンセルを足した薄いラッパーです。要点を並べます。

  • g.Go(func() error) でエラーは戻り値。共有変数とMutexが要らない
  • Wait が返すのは時系列で最初の non-nil エラー1つ。残りは捨てられる
  • WithContext の派生Contextは「最初のエラー」か「Wait完了」の早い方でキャンセルされる
  • SetLimit で同時実行数に上限。負数で無制限、0 で全停止
  • panicは回復されず(v0.20.0時点)プロセスが落ちる。ワーカー内の recover でerrorに変える
  • 起動の手軽さだけなら Go 1.25 の WaitGroup.Go で足りる。集約と中断が要るなら errgroup

並行処理の基礎はGo言語の並行処理入門:goroutineとchannelの使い方を完全解説で、起動したまま終わらないケースはGoのgoroutineリーク対策で扱っています。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!
目次