oinume journal

Scratchpad of what I learned

Go言語における並行処理 - ユースケース編

はじめに

これは以下の記事の続きの記事。以下の2つではgoroutineとchannelについて説明したので、これらを使って具体的な並行処理のユースケースを書いてみる。

goroutineによる並行処理がすべて終わるまで待つ

処理を複数のgoroutineで並行で実行したい、というのはよくある例。起動したgoroutineがすべて終わるまで待ちたいときには、以下のようにsync.WaitGroupを使う。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    for i := 1; i <= 20; i++ {
        wg.Add(1) // カウンターをインクリメントする
        go func(i int) {
            fmt.Printf("i = %02d, fibonacci = %04d\n", i, fibonacci(i))
            defer wg.Done() // 処理が終わったのでカウンターをデクリメントする
        }(i)
    }
    wg.Wait() // カウンターが0になるまで待つ
    fmt.Println("Done")
}

func fibonacci(n int) int {
    if n <= 1 {
        return n
    }
    return fibonacci(n-1) + fibonacci(n-2)
}

goroutineが完了するまで待ち、エラーが発生した場合はそれをすべて取得する

goroutineで並行処理する数に上限を設けたい

よくある要件として「同時に実行する数を制限したい」があるが、この場合はセマフォというパターンで実装する。具体的には、Goであればchannelにバッファが付けられるので、このバッファの数で同時実行数を制限できる。

package main

import (
    "flag"
    "fmt"
    "io/ioutil"
    "net/http"
    "regexp"
    "sync"
    "time"
)

var (
    concurrency = flag.Int("c", 1, "num of concurrency")
)

func main() {
    flag.Parse()
    semaphore := make(chan struct{}, *concurrency) // 同時実行数
    var wg sync.WaitGroup
    urls := []string{
        "https://journal.lampetty.net/entry/what-i-like-about-heroku",
        "https://journal.lampetty.net/entry/e2e-test-with-agouti-in-go",
        "https://journal.lampetty.net/entry/heroku-custom-clock-processes",
        "https://journal.lampetty.net/entry/mac-settings-on-sierra",
        "https://journal.lampetty.net/entry/mysqldump-option-where",
        "https://journal.lampetty.net/entry/introducing-lekcije",
        "https://journal.lampetty.net/entry/intellij-shortcuts-for-reading-source-code",
        "https://journal.lampetty.net/entry/introducing-dead-mans-snitch",
        "https://journal.lampetty.net/entry/concurrency-in-go-channels",
        "https://journal.lampetty.net/entry/concurrency-in-go-goroutines",
        "https://journal.lampetty.net/entry/cancel-and-timeout-with-context-in-go",
        "https://journal.lampetty.net/entry/gcp-cloud-pubsub-memo",
        "https://journal.lampetty.net/entry/oauth2-client-handson-in-go-authorization-code-grant",
        "https://journal.lampetty.net/entry/satisfying-a-large-interface-quickly-in-go",
    }
    for _, u := range urls {
        wg.Add(1)
        u := u
        go func() {
            defer wg.Done()
            fetch(semaphore, u)
        }()
    }
    wg.Wait()
}

var r = regexp.MustCompile(`<title>(.*)</title>`)

func fetch(semaphore chan struct{}, url string) {
    semaphore <- struct{}{}
    defer func() {
        <-semaphore
    }()
    time.Sleep(3 * time.Second)
    resp, err := http.Get(url)
    if err != nil {
        fmt.Printf("err = %v\n", err)
        return
    }
    defer func() { _ = resp.Body.Close() }()

    bytes, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        fmt.Printf("err = %v\n", err)
        return
    }
    body := string(bytes)
    if group := r.FindStringSubmatch(body); len(group) > 0 {
        fmt.Printf("%v\n", group[1])
    }
}

上記のプログラムはURLのリストからhttp.Getしてtitleだけを表示するプログラムである。ここではsemaphoreというバッファ付きのchannelを使って、以下のように同時実行数を制限できるようにしている。

  • mainの中でsemaphoreを生成
  • fetch関数にsemaphoreを渡す
  • fetchの内部で、処理を実行する前にsemaphoreにデータを入れて、処理が終わったらdeferでsemaphoreからデータを取り出している。これによりchannelのバッファを超える場合は処理がブロックされるようになる

以下のように実行時に-c 3と指定することで、同時にhttp.Getする数が3つに制限されてコンソールに出力される数が3行ずつになるはず。

$ go run semaphore.go -c 3

Herokuの好きなところ - oinume journal
Better Heroku Schedulerを探したらCustom clock processesにたどり着いた - oinume journal
WebアプリケーションのE2EテストをGoで書く - oinume journal
...

バックグラウンドで一定の間隔で処理を行いたい

これはtime.Tickerとselectを使うことで簡単に実装できる。以下は1秒ごとにdoSomethingを呼び出すプログラム。

package main

import (
    "fmt"
    "time"
)

func main() {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    for i := 0; i < 10; i++ {
        select {
        case <-ticker.C:
            doSomething(i)
        }
    }
}

func doSomething(v int) {
    fmt.Printf("%d\n", v)
}

まとめ

ユースケースについては探せばもっとありそうだけど、よくありそうな並行処理の実装パターン3つを紹介した。何かのためになれば幸いです。

Go言語による並行処理

Go言語による並行処理

  • 作者: Katherine Cox-Buday,山口能迪
  • 出版社/メーカー: オライリージャパン
  • 発売日: 2018/10/26
  • メディア: 単行本(ソフトカバー)
  • この商品を含むブログを見る

2019年9月の振り返り

8月の振り返りをサボってしまったのでまとめて。

アウトプット

Goの並行処理の記事とMySQL 8.0へアップデートする記事を書いた。

睡眠

1日6時間以上寝る、ということを今年の目標にしている。ヘルスケアアプリによると

  • 8月: 6h10min
  • 9月: 5h59min

という感じで9月はギリギリで未達成。しかもこれ平均で平日は大体5時間ぐらいなのでかなりやばめ。

英語

9月にIELTSのテストを受けてきた。詳細については以下にまとめたけど、感触としては悪くない気がする。ちなみに11月にTOEICも受ける予定。あと来年前半にTOEFLもおそらく受ける。

https://journal.lampetty.net/entry/taking-ielts-test-first-time:ttile

アルゴリズム

特に進捗なし

Go, gRPC, SQLの復習

引き続き Go言語による並行処理 を読んでいる。あと一つ記事を書きたい。

まとめ

まぁボチボチ頑張る。

初めてIELTSを受験した

IELTSってなに?

国際的な英語の試験。大学の入学の条件に使われたり、国によっては永住権を申請する時のポイントとして使われるもの。

会場

自分は東京会場で、場所は幸ビルディングという新橋付近の貸し会議室っぽいところだった。

持ち込みできるもの

中に持っていけるものは

  • 鉛筆
  • 消しゴム
  • パスポート
  • パスポートのコピー

のみOK。なお、鉛筆はお願いすれば試験の最中に試験官が削ってくれるので、鉛筆削りは持っていなくても大丈夫。

会場に着いてから

  1. 荷物を預ける
  2. IDチェック
  3. 試験室に入室

という流れ。

会場のビルの会議室でビニール袋に荷物を詰めて預ける。8:40にはこの荷物置き場が物理的に閉鎖されるので、それまでに会場に着いて荷物を預ける必要がある。

その後IDチェック。これをやったあとにトイレに行こうとすると再度指紋認証する必要がありめんどいので先にトイレは済ませておいたほうが良い。IDチェックでは指紋を採取されさらにカメラで写真を取られる。パスポートの顔写真と同一人物かも当然チェックされる。

終わると、試験官と一緒に試験する別の部屋に行く。

感想

受験者は予想通り若い人が多かった印象。驚いたのは、20%ぐらいは外国人(見た目で明らかに日本人じゃないとわかる人)だったこと。アフリカ系の人やインド系の人が多かった。もしかしたら中国人や韓国人も結構いたのかもしれない。カップルで受験している人もちらほらいた。自分みたいな40代のおっさんは1%ぐらいしかいない雰囲気だった。

IELTSの試験

意外なことに

  1. Writing
  2. Reading
  3. Listnening

という順番だった。というのは、参考書ではListening, Reading, Writing, Spreakingの順番で問題が構成されていたから、順番が全然違うことにびっくりした。もろもろのアナウンスが終わって9:20頃から試験がスタートした気がする。

  • Writing: 40分
  • Reading: 60分
  • Listening: 30分

という時間配分で、12:30ごろには終わった。それぞれのパートの合間で問題や解答用紙を回収する時間があるので、少し休めるのは良かった。TOEICと比べるとこの休憩時間があるおかげでそこまで疲労感を感じなかった気がする。なお、Speakingは午後にあり、自分は15:20開始だったので、昼ごはんを食べて新橋駅付近のカフェで時間を潰していた。

Writing

トピックは

  • Task1: 友人に仕事(求人)を薦める時に、仕事内容、なぜその人にオススメするのか、求人への応募方法をレター風味で書く
  • Task2: 保育園に子供を預けることと、預けないで家族が育てることについての意見を述べよ

というもので、どちらも自分にとっては馴染みのあるものだった。なので、書く内容を日本語でまとめておいて、あとはそれをひたすら英語に訳すというスタイルで書いていった。Task1は130 word以上で、ギリギリかけた。一方、Task2は250 word以上なので、圧倒的に時間が足りなくて文章の途中で終わってしまった。Writingは「字数が足りないからあとから文章を付け足す」ということがとてもやりにくいので、英文を書き始める前にちゃんと書く内容を決めておくことが大事だと思った。Computer Basedなテストだったら文章の間に別の文を挿入したりするのが簡単にできたりするんだろうか、などと考えた。

とにかく仕事ではこういう風に文章を書くことがあまりないので(デジタルなら後からいくらでも文章を付け足したり入れ替えたりできるので)、時間配分も含めて痛いミスをしてしまったように思う。Task2はおそらく100 wordぐらいしか書けていない。

Reading

最後のsection以外はそこまで難しくなかった。あと時間が圧倒的に足りないかなと思ったけど、ギリギリ終わるレベルだった。例えばTOEICだと明らかに時間内に終わる量ではないので、それに比べれば時間的な厳しさはなかったように思う。

Listening

Section1はたしかに一番簡単だった。ただ、選択式ではなく単語を埋めるタイプなので、冠詞をつけるかつけないかでけっこう悩んだ。Section4がやはり最高に難しくて、2,3問合っていればいいか、ぐらいのノリ。

Speaking

これは外国人の試験官に連れられて小さい会議室で実施された。"How are you?" とか少し雑談してice breakしてから本試験が始まる。内容としては

  • 学生 or 仕事している?
  • 仕事で一番興味深いことはなに?
  • どんな音楽を聴く?
  • 音楽を聴くときは一人で聴くのが好き?それとも他の人と聴くのが好き?

というような質問をされた。案の定Speakingが一番難易度が高いというか、普段の実力値が出てしまうのでこれが一番点数低そうだなと思った。

まとめ

試験の2週間前にIELTSの問題集をやって「うわっなにこれ難しすぎ」と絶望していたのだけど、本試験は意外と難しくなく、少しホッとした。スコアは10/13頃に送付されるらしいので、どのぐらいできているのか楽しみ。5.0〜6.0の間ぐらいだと思うんだけどどうなることやら。

【音声ダウンロード付】IELTSブリティッシュ・カウンシル公認問題集

【音声ダウンロード付】IELTSブリティッシュ・カウンシル公認問題集

Problems when updating MySQL from 5.7 to 8.0

Introduction

I updated MySQL from 5.7 to 8.0. There were some problems when updating. This is just a memo how to solve the problems.

InnoDB deprecated file format parameters

These parameters are deprecated in 8.0.

  • innodb_file_format
  • innodb_file_format_check
  • innodb_file_format_max
  • innodb_large_prefix

ref: MySQL :: WL#7704: InnoDB: Remove deprecated file format parameters in 8.0

Query cache parameters are deprecated

These parammeters are deprecated in 8.0.

  • query_cache_limit
  • query_cache_size

ref: https://mysqlserverteam.com/mysql-8-0-retiring-support-for-the-query-cache/

innodb_support_xa

innodb_support_xa is deprecated as well.

ref: MySQL :: WL#8843: Deprecate and remove the parameter innodb_support_xa

Can't create users with GRANT

You can't create users with GRANT operation. For example, you need to change

GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, INDEX, ALTER, LOCK TABLES ON db.* TO 'user'@'%' IDENTIFIED BY 'yourpassword'

like this:

CREATE USER IF NOT EXISTS 'user'@'%' identified by 'yourpassword';
GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, INDEX, ALTER, LOCK TABLES ON db.* TO 'user'@'%';

ref: How to grant all privileges to root user in MySQL 8.0 - Stack Overflow

default-authentication-plugin

The default value of default-authentication-plugin is caching_sha2_password. If your MySQL driver doesn't support the authentication method, you'll get an error this authentication plugin is not supported. mysql_native_password is the default value in MySQL 5.7 so you can specify it like this:

default-authentication-plugin = mysql_native_password

Other problems I've faced with

TABLE_NAMES from information_schema

My program executes following query.

SELECT table_name FROM information_schema.tables WHERE table_schema = 'mydb';

+------------------------------------+
| TABLE_NAME                         |
+------------------------------------+
| event_log_email                    |
| following_teacher                  |
| goose_db_version                   |
| lesson                             |
| lesson_status_log                  |
| m_country                          |
+------------------------------------+

The result of the query is TABLE_NAME in 8.0 although it was table_name in 5.7. As a result, I changed my query like this:

SELECT TABLE_NAME AS table_name FROM information_schema.tables WHERE table_schema = 'mydb';

Goにおける並行処理 - channel編

はじめに

これはGoにおける並行処理 - goroutine編 - oinume journalの続きの記事。goroutineに引き続き、Goの並行処理を支える重要な概念であるchannelについて説明する。

channelとは?

channelはメモリに対するアクセスを同期するためやgoroutine間の通信(データの受け渡し)として使うものである。Go言語による並行処理では

水が流れる川のように、チャネルは情報の流れの水路として機能します。値はチャネルに沿って 渡され、そこから下流に読み込まれます。

と説明されている。例えば以下のプログラムでは、main関数からHello WorldをChannel messageChに書き込んで、それを無名goroutineが読み取って出力している(Playground)

package main

import (
    "fmt"
)

func main() {
    messageCh := make(chan string)
    go func() {
        fmt.Println(<-messageCh)
    }()
    messageCh <- "Hello World" 
}

channelを使う上で覚えておくべき基本的なものとしては以下である。

  1. channelにデータを書き込む送信側とデータを受け取る受信側が存在している必要がある
    • 例えば上のプログラムで fmt.Println(<-messageCh) がない場合、 fatal error: all goroutines are asleep - deadlock! というエラーになる
  2. 受信処理はchannelに書き込みがあるまでブロックされる
    • 上のプログラムで messageCh <- "Hello World" の書き込みがない場合は、goroutineとして実行されている無名関数がずっとブロックされ何も起こらずにプログラムが終了する。
  3. 送信処理はchannelがいっぱいの場合、空きができるまでブロックされる
  4. channelは一般的にはmakeで生成する
    • make(chan, 3) のように、バッファをつけて生成することができる。この例だと3個のバッファがあるため、4個目を書き込む時にブロックされる。
  5. 送信専用channelと受信専用channelがある
  6. channelをcloseする

channelのブロック

通常であれば、goroutineはsync.WaitGroupを使って起動元のgoroutineにjoinするようにしないと、タイミングにもよるがほぼ確実に実行されずにmain関数が終了されてしまう。しかし、上のHello Worldを出力するgoroutineでは、必ずHello Worldと出力される。これはいったいなぜだろうか?

理由は、channelからデータを受信する場合データが送信されるまでブロックするからである。つまり、上のプログラムの実行順序は以下のようになる。

func main() {
    messageCh := make(chan string)  // (1) channelを宣言
    go func() { // (2) goroutineの実行
        fmt.Println(<-messageCh) // (4) channelからデータを受信(送信されるまでブロックされている)
    }()
    messageCh <- "Hello World"      // (3) channelへデータを送信
}

つまりchannelとgoroutineを使うことで、何かデータを受信するまで待って、受信したら処理を開始する というようなWorker処理を簡単に実装することができる。

channelのバッファ

ch := make(chan int, 3)

のようにmakeの第2引数にバッファを指定することができる。バッファを指定することで、channelの読み込みが一度も行われなくても、キャパシティが3のバッファ付きchannelであればgoroutineは3回まで書き込みが可能になる。バッファを指定しない場合は1回書き込むとブロックされる。

送信・受信専用channel

channelを宣言する時に何も指定しなければ書き込みも読み込みも可能になるが、以下のように宣言することで書き込み専用・読み込み専用のchannelを作ることができる。

  • chan<- int: 読み込み専用
  • <-chan int: 書き込み専用

この文法は、関数の引数にchannelを受け取る場合や戻り値としてchannelを返す場合に「これは書き込みしかできない」「これは読み込みしかできない」ということを明示できるため有用である。

channelのclose

channelは使い終わったらcloseすることができる。closeされたchannelからはデータを読み込むことはできない。例えば以下のプログラムは

  • goroutineでchに書き込み、最後にchannelをcloseしている
  • mainの中のforループでチャネルchからデータを読み込んでいる

というものであるが、closeされているのでその段階でchannelからの読み込みはブロックされずにforループが終了する。

package main

import "fmt"

func main() {
    ch := make(chan int)
    go func() {
        defer close(ch)
        for i := 1; i <= 5; i++ {
            ch <- i
        }
    }()

    for value := range ch {
        fmt.Printf("%v ", value)
    }
}

実践的な例

最後に実践的なchannelの使い方を載せておく。上で紹介した以下のchannelの特徴を使っている。

  • バッファ
  • 送信・受信専用channel
  • close

このプログラムが行っていることは、URLをchannelで送って、http.Getした結果をchannelで返すというものである。main関数の中で以下のgoroutineを立ち上げて、最後にchannel resultsを読み込んで標準出力にurl, status, errを出力している。

  • httpGetを実行する(channel urlsを読み込んで、resultsに書き込む)
  • channel urlsに書き込む
package main

import (
    "fmt"
    "net/http"
)

type result struct {
    url    string
    status int
    err    error
}

func main() {
    urls := make(chan string, 3)
    results := make(chan result, 3)

    // Consumer
    go httpGet(urls, results)

    // Producer
    go func() {
        targetURLs := []string{
            "https://journal.lampetty.net/entry/review-2019-07",
            "https://journal.lampetty.net/entry/review-2019-06",
            "https://journal.lampetty.net/entry/review-2019-05",
            "https://journal.lampetty.net/entry/review-2019-04",
            "https://journal.lampetty.net/entry/review-2019-03",
            "https://journal.lampetty.net/entry/review-2019-02",
            "https://journal.lampetty.net/entry/review-2019-01",
        }
        for _, url := range targetURLs {
            urls <- url
        }
        close(urls)
    }()

    for r := range results {
        fmt.Printf("url = %v, status = %v, err = %v\n", r.url, r.status, r.err)
    }
}

func httpGet(urls <-chan string, results chan<- result) {
    for url := range urls {
        resp, err := http.Get(url)
        if err != nil {
            results <- result{
                url: url,
                err: err,
            }
            return
        }
        results <- result{
            url:    url,
            status: resp.StatusCode,
        }
        _ = resp.Body.Close()
    }
    close(results)
}

まとめ

Goの並行処理の基礎となっているchannelについて説明した。次のパートでは、応用編としてgoroutineとchannelを使った並行処理の実践的なパターンを紹介したいと思う。

Go言語による並行処理

Go言語による並行処理

  • 作者: Katherine Cox-Buday,山口能迪
  • 出版社/メーカー: オライリージャパン
  • 発売日: 2018/10/26
  • メディア: 単行本(ソフトカバー)
  • この商品を含むブログを見る