はじめに
これはGoにおける並行処理 - goroutine編 - oinume journalの続きの記事。goroutineに引き続き、Goの並行処理を支える重要な概念であるchannelについて説明する。
channelとは?
channelはメモリに対するアクセスを同期するためやgoroutine間の通信(データの受け渡し)として使うものである。Go言語による並行処理
では
水が流れる川のように、チャネルは情報の流れの水路として機能します。値はチャネルに沿って 渡され、そこから下流に読み込まれます。
と説明されている。例えば以下のプログラムでは、main関数からHello World
をChannel messageCh
に書き込んで、それを無名goroutineが読み取って出力している(Playground)
package main
import (
"fmt"
)
func main() {
messageCh := make(chan string)
go func() {
fmt.Println(<-messageCh)
}()
messageCh <- "Hello World"
}
channelを使う上で覚えておくべき基本的なものとしては以下である。
- channelにデータを書き込む送信側とデータを受け取る受信側が存在している必要がある
- 例えば上のプログラムで
fmt.Println(<-messageCh)
がない場合、 fatal error: all goroutines are asleep - deadlock!
というエラーになる
- 受信処理はchannelに書き込みがあるまでブロックされる
- 上のプログラムで
messageCh <- "Hello World"
の書き込みがない場合は、goroutineとして実行されている無名関数がずっとブロックされ何も起こらずにプログラムが終了する。
- 送信処理はchannelがいっぱいの場合、空きができるまでブロックされる
- channelは一般的には
make
で生成する
make(chan, 3)
のように、バッファをつけて生成することができる。この例だと3個のバッファがあるため、4個目を書き込む時にブロックされる。
- 送信専用channelと受信専用channelがある
- channelをcloseする
channelのブロック
通常であれば、goroutineはsync.WaitGroupを使って起動元のgoroutineにjoinするようにしないと、タイミングにもよるがほぼ確実に実行されずにmain関数が終了されてしまう。しかし、上のHello Worldを出力するgoroutineでは、必ずHello Worldと出力される。これはいったいなぜだろうか?
理由は、channelからデータを受信する場合データが送信されるまでブロックするからである。つまり、上のプログラムの実行順序は以下のようになる。
func main() {
messageCh := make(chan string)
go func() {
fmt.Println(<-messageCh)
}()
messageCh <- "Hello World"
}
つまりchannelとgoroutineを使うことで、何かデータを受信するまで待って、受信したら処理を開始する
というようなWorker処理を簡単に実装することができる。
channelのバッファ
ch := make(chan int, 3)
のようにmakeの第2引数にバッファを指定することができる。バッファを指定することで、channelの読み込みが一度も行われなくても、キャパシティが3のバッファ付きchannelであればgoroutineは3回まで書き込みが可能になる。バッファを指定しない場合は1回書き込むとブロックされる。
送信・受信専用channel
channelを宣言する時に何も指定しなければ書き込みも読み込みも可能になるが、以下のように宣言することで書き込み専用・読み込み専用のchannelを作ることができる。
chan<- int
: 読み込み専用
<-chan int
: 書き込み専用
この文法は、関数の引数にchannelを受け取る場合や戻り値としてchannelを返す場合に「これは書き込みしかできない」「これは読み込みしかできない」ということを明示できるため有用である。
channelのclose
channelは使い終わったらcloseすることができる。closeされたchannelからはデータを読み込むことはできない。例えば以下のプログラムは
- goroutineで
ch
に書き込み、最後にchannelをcloseしている
- mainの中のforループでチャネル
ch
からデータを読み込んでいる
というものであるが、closeされているのでその段階でchannelからの読み込みはブロックされずにforループが終了する。
package main
import "fmt"
func main() {
ch := make(chan int)
go func() {
defer close(ch)
for i := 1; i <= 5; i++ {
ch <- i
}
}()
for value := range ch {
fmt.Printf("%v ", value)
}
}
実践的な例
最後に実践的なchannelの使い方を載せておく。上で紹介した以下のchannelの特徴を使っている。
- バッファ
- 送信・受信専用channel
- close
このプログラムが行っていることは、URLをchannelで送って、http.Getした結果をchannelで返すというものである。main関数の中で以下のgoroutineを立ち上げて、最後にchannel results
を読み込んで標準出力にurl, status, errを出力している。
- httpGetを実行する(channel
urls
を読み込んで、results
に書き込む)
- channel
urls
に書き込む
package main
import (
"fmt"
"net/http"
)
type result struct {
url string
status int
err error
}
func main() {
urls := make(chan string, 3)
results := make(chan result, 3)
go httpGet(urls, results)
go func() {
targetURLs := []string{
"https://journal.lampetty.net/entry/review-2019-07",
"https://journal.lampetty.net/entry/review-2019-06",
"https://journal.lampetty.net/entry/review-2019-05",
"https://journal.lampetty.net/entry/review-2019-04",
"https://journal.lampetty.net/entry/review-2019-03",
"https://journal.lampetty.net/entry/review-2019-02",
"https://journal.lampetty.net/entry/review-2019-01",
}
for _, url := range targetURLs {
urls <- url
}
close(urls)
}()
for r := range results {
fmt.Printf("url = %v, status = %v, err = %v\n", r.url, r.status, r.err)
}
}
func httpGet(urls <-chan string, results chan<- result) {
for url := range urls {
resp, err := http.Get(url)
if err != nil {
results <- result{
url: url,
err: err,
}
return
}
results <- result{
url: url,
status: resp.StatusCode,
}
_ = resp.Body.Close()
}
close(results)
}
まとめ
Goの並行処理の基礎となっているchannelについて説明した。次のパートでは、応用編としてgoroutineとchannelを使った並行処理の実践的なパターンを紹介したいと思う。