はじめに
これは以下の記事の続きの記事。以下の2つではgoroutineとchannelについて説明したので、これらを使って具体的な並行処理のユースケースを書いてみる。
goroutineによる並行処理がすべて終わるまで待つ
処理を複数のgoroutineで並行で実行したい、というのはよくある例。起動したgoroutineがすべて終わるまで待ちたいときには、以下のようにsync.WaitGroupを使う。
package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup for i := 1; i <= 20; i++ { wg.Add(1) // カウンターをインクリメントする go func(i int) { fmt.Printf("i = %02d, fibonacci = %04d\n", i, fibonacci(i)) defer wg.Done() // 処理が終わったのでカウンターをデクリメントする }(i) } wg.Wait() // カウンターが0になるまで待つ fmt.Println("Done") } func fibonacci(n int) int { if n <= 1 { return n } return fibonacci(n-1) + fibonacci(n-2) }
goroutineで並行処理する数に上限を設けたい
よくある要件として「同時に実行する数を制限したい」があるが、この場合はセマフォというパターンで実装する。具体的には、Goであればchannelにバッファが付けられるので、このバッファの数で同時実行数を制限できる。
package main import ( "flag" "fmt" "io/ioutil" "net/http" "regexp" "sync" "time" ) var ( concurrency = flag.Int("c", 1, "num of concurrency") ) func main() { flag.Parse() semaphore := make(chan struct{}, *concurrency) // 同時実行数 var wg sync.WaitGroup urls := []string{ "https://journal.lampetty.net/entry/what-i-like-about-heroku", "https://journal.lampetty.net/entry/e2e-test-with-agouti-in-go", "https://journal.lampetty.net/entry/heroku-custom-clock-processes", "https://journal.lampetty.net/entry/mac-settings-on-sierra", "https://journal.lampetty.net/entry/mysqldump-option-where", "https://journal.lampetty.net/entry/introducing-lekcije", "https://journal.lampetty.net/entry/intellij-shortcuts-for-reading-source-code", "https://journal.lampetty.net/entry/introducing-dead-mans-snitch", "https://journal.lampetty.net/entry/concurrency-in-go-channels", "https://journal.lampetty.net/entry/concurrency-in-go-goroutines", "https://journal.lampetty.net/entry/cancel-and-timeout-with-context-in-go", "https://journal.lampetty.net/entry/gcp-cloud-pubsub-memo", "https://journal.lampetty.net/entry/oauth2-client-handson-in-go-authorization-code-grant", "https://journal.lampetty.net/entry/satisfying-a-large-interface-quickly-in-go", } for _, u := range urls { wg.Add(1) u := u go func() { defer wg.Done() fetch(semaphore, u) }() } wg.Wait() } var r = regexp.MustCompile(`<title>(.*)</title>`) func fetch(semaphore chan struct{}, url string) { semaphore <- struct{}{} defer func() { <-semaphore }() time.Sleep(3 * time.Second) resp, err := http.Get(url) if err != nil { fmt.Printf("err = %v\n", err) return } defer func() { _ = resp.Body.Close() }() bytes, err := ioutil.ReadAll(resp.Body) if err != nil { fmt.Printf("err = %v\n", err) return } body := string(bytes) if group := r.FindStringSubmatch(body); len(group) > 0 { fmt.Printf("%v\n", group[1]) } }
上記のプログラムはURLのリストからhttp.Getしてtitleだけを表示するプログラムである。ここではsemaphore
というバッファ付きのchannelを使って、以下のように同時実行数を制限できるようにしている。
- mainの中でsemaphoreを生成
- fetch関数にsemaphoreを渡す
- fetchの内部で、処理を実行する前にsemaphoreにデータを入れて、処理が終わったらdeferでsemaphoreからデータを取り出している。これによりchannelのバッファを超える場合は処理がブロックされるようになる
以下のように実行時に-c 3
と指定することで、同時にhttp.Getする数が3つに制限されてコンソールに出力される数が3行ずつになるはず。
$ go run semaphore.go -c 3 Herokuの好きなところ - oinume journal Better Heroku Schedulerを探したらCustom clock processesにたどり着いた - oinume journal WebアプリケーションのE2EテストをGoで書く - oinume journal ...
バックグラウンドで一定の間隔で処理を行いたい
これはtime.Tickerとselectを使うことで簡単に実装できる。以下は1秒ごとにdoSomethingを呼び出すプログラム。
package main import ( "fmt" "time" ) func main() { ticker := time.NewTicker(time.Second) defer ticker.Stop() for i := 0; i < 10; i++ { select { case <-ticker.C: doSomething(i) } } } func doSomething(v int) { fmt.Printf("%d\n", v) }
まとめ
ユースケースについては探せばもっとありそうだけど、よくありそうな並行処理の実装パターン3つを紹介した。何かのためになれば幸いです。
- 作者:Katherine Cox-Buday
- 出版社/メーカー: オライリージャパン
- 発売日: 2018/10/26
- メディア: 単行本(ソフトカバー)