Go言語における並行処理 - ユースケース編
·3 分で読めます
はじめに
これは以下の記事の続きの記事。以下の2つではgoroutineとchannelについて説明したので、これらを使って具体的な並行処理のユースケースを書いてみる。
goroutineによる並行処理がすべて終わるまで待つ
処理を複数のgoroutineで並行で実行したい、というのはよくある例。起動したgoroutineがすべて終わるまで待ちたいときには、以下のようにsync.WaitGroupを使う。
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := 1; i <= 20; i++ {
wg.Add(1) // カウンターをインクリメントする
go func(i int) {
fmt.Printf("i = %02d, fibonacci = %04d\n", i, fibonacci(i))
defer wg.Done() // 処理が終わったのでカウンターをデクリメントする
}(i)
}
wg.Wait() // カウンターが0になるまで待つ
fmt.Println("Done")
}
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}goroutineで並行処理する数に上限を設けたい
よくある要件として「同時に実行する数を制限したい」があるが、この場合はセマフォというパターンで実装する。具体的には、Goであればchannelにバッファが付けられるので、このバッファの数で同時実行数を制限できる。
package main
import (
"flag"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"sync"
"time"
)
var (
concurrency = flag.Int("c", 1, "num of concurrency")
)
func main() {
flag.Parse()
semaphore := make(chan struct{}, *concurrency) // 同時実行数
var wg sync.WaitGroup
urls := []string{
"https://journal.lampetty.net/entry/what-i-like-about-heroku",
"https://journal.lampetty.net/entry/e2e-test-with-agouti-in-go",
"https://journal.lampetty.net/entry/heroku-custom-clock-processes",
"https://journal.lampetty.net/entry/mac-settings-on-sierra",
"https://journal.lampetty.net/entry/mysqldump-option-where",
"https://journal.lampetty.net/entry/introducing-lekcije",
"https://journal.lampetty.net/entry/intellij-shortcuts-for-reading-source-code",
"https://journal.lampetty.net/entry/introducing-dead-mans-snitch",
"https://journal.lampetty.net/entry/concurrency-in-go-channels",
"https://journal.lampetty.net/entry/concurrency-in-go-goroutines",
"https://journal.lampetty.net/entry/cancel-and-timeout-with-context-in-go",
"https://journal.lampetty.net/entry/gcp-cloud-pubsub-memo",
"https://journal.lampetty.net/entry/oauth2-client-handson-in-go-authorization-code-grant",
"https://journal.lampetty.net/entry/satisfying-a-large-interface-quickly-in-go",
}
for _, u := range urls {
wg.Add(1)
u := u
go func() {
defer wg.Done()
fetch(semaphore, u)
}()
}
wg.Wait()
}
var r = regexp.MustCompile(`<title>(.*)</title>`)
func fetch(semaphore chan struct{}, url string) {
semaphore <- struct{}{}
defer func() {
<-semaphore
}()
time.Sleep(3 * time.Second)
resp, err := http.Get(url)
if err != nil {
fmt.Printf("err = %v\n", err)
return
}
defer func() { _ = resp.Body.Close() }()
bytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Printf("err = %v\n", err)
return
}
body := string(bytes)
if group := r.FindStringSubmatch(body); len(group) > 0 {
fmt.Printf("%v\n", group[1])
}
}上記のプログラムはURLのリストからhttp.Getしてtitleだけを表示するプログラムである。ここではsemaphoreというバッファ付きのchannelを使って、以下のように同時実行数を制限できるようにしている。
- mainの中でsemaphoreを生成
- fetch関数にsemaphoreを渡す
- fetchの内部で、処理を実行する前にsemaphoreにデータを入れて、処理が終わったらdeferでsemaphoreからデータを取り出している。これによりchannelのバッファを超える場合は処理がブロックされるようになる
以下のように実行時に-c 3と指定することで、同時にhttp.Getする数が3つに制限されてコンソールに出力される数が3行ずつになるはず。
$ go run semaphore.go -c 3
Herokuの好きなところ - oinume journal
Better Heroku Schedulerを探したらCustom clock processesにたどり着いた - oinume journal
WebアプリケーションのE2EテストをGoで書く - oinume journal
...バックグラウンドで一定の間隔で処理を行いたい
これはtime.Tickerとselectを使うことで簡単に実装できる。以下は1秒ごとにdoSomethingを呼び出すプログラム。
package main
import (
"fmt"
"time"
)
func main() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for i := 0; i < 10; i++ {
select {
case <-ticker.C:
doSomething(i)
}
}
}
func doSomething(v int) {
fmt.Printf("%d\n", v)
}まとめ
ユースケースについては探せばもっとありそうだけど、よくありそうな並行処理の実装パターン3つを紹介した。何かのためになれば幸いです。
- 作者:Katherine Cox-Buday
- 出版社/メーカー: オライリージャパン
- 発売日: 2018/10/26
- メディア: 単行本(ソフトカバー)
関連記事
Goにおける並行処理 - channel編
2019-09-10
Goにおける並行処理 - goroutine編
2019-08-28
Goのcontextによるキャンセルやタイムアウト
2019-06-17