Go言語で並行処理

2.4K Views

January 23, 23

スライド概要

Go言語での並行処理を実装するうえで気にすると良いこと

profile-image

LIFULL HOME'Sを運営する株式会社LIFULLのアカウントです。 LIFULLが主催するエンジニア向けイベント「Ltech」等で公開されたスライド等をこちらで共有しております。

シェア

またはPlayer版

埋め込む »CMSなどでJSが使えない場合

関連スライド

各ページのテキスト
1.

Go言語で並行処理 社内技術勉強会 テクノロジー本部事業基盤UプラットフォームG 宮崎泰輔 1 © LIFULL Co.,Ltd. 本書の無断転載、複製を固く禁じます。

2.

並行処理はなぜ難しいか 競合状態 var data int go func() { data++ }() if data == 0 { fmt.Printf(“the value is %v.\n”, data) } これはどうなる? 1. 何も表示されない 2. the value is 0. が表示される 3. the value is 1. が表示される

3.

並行処理はなぜ難しいか アトミック性 異なるコンテキストで、アトミックであることが保証できないと、 平行なコンテキストで安全なプログラムにならない。 i++ => i = i + 1 1. iの値を取得する 2. iの値を1増やす 3. iの値を保存する

4.
[beta]
並行処理はなぜ難しいか

デッドロック

- 2つのgoroutineがそれぞれ相手の

type value struct {
mu sync.Mutex
value int
}
var wg sync.WaitGroup
printSum := func(v1, v2 *value) {
defer wg.Done()
v1.mu.Lock()

完了を待っている

defer v1.mu.Unlock()
time.Sleep(2 * time.Second)
v2.mu.Lock()
defer v2.mu.Unlock()

そこから進むことはない

fmt.Printf("sum = %v\n", v1.value + v2.value)
}
var a, b value
wg.Add(2)
go printSum(&a, &b)
go printSum(&b, &a)
wg.Wait()

5.

並行処理はなぜ難しいか - Goを使っても、並行処理は依然難しいもの - ただ、Goの並行処理のプリミティブを使えば、簡潔に書ける そのパターンや、注意点についていくつか紹介

6.

Goの並行プリミティブ go 文: go に続く関数を別のgoroutineで動かす channel: 異なるgoroutine間でメッセージをやり取りするためのもの select文: 複数のchannelからメッセージを待ち受けるためのもの sync.WaitGroup: 安全なカウンターみたいなもの sync.Mutex, sync.RWMutex: クリティカルセクションを保護するもの sync.Once: 平行であろうと、1度しかコードを実行しない sync.Pool: オブジェクトプールを並行に使えるようにしたもの

7.

sync.WaitGroup goroutine起動時に、Addしておいて、goroutineの中で defer Done() するのが定番 var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() fmt.Println("1st goroutine sleeping...") time.Sleep(1) }() wg.Add(1) go func() { defer wg.Done() fmt.Println("2nd goroutine sleeping...") time.Sleep(2) }() wg.Wait() fmt.Println("All goroutines complete.")

8.
[beta]
sync.WaitGroup

forで起動する場合はこんな感じ
const numGreeters = 5
var wg sync.WaitGroup
wg.Add(numGreeters)
for i := 0; i < numGreeters; i++ {
// i := i
go func(i int) {
defer wg.Done()
fmt.Printf("Hello from %v!\n", i)
}(i)
}
wg.Wait()

i := i としてるのは、ループのiを参照していると、goroutine起動後に
書き換わることがあるため

9.
[beta]
channelのパターン

generator

関数の中でchannelを作成して

generator := func(ctx context.Context, integers ...int) <-chan int {
stream := make(chan int, len(integers))
go func() {
defer close(stream)
for _, i := range integers {
select {
case <-ctx.Done():
return
case stream <- i:
}
}

goroutineの中でchannelに値を入れる
defer で、goroutineを抜けるときに
channelをcloseすることで、
チャンネルからの読み取りが終了する

}()
return stream
}
for i := range generator(context.Background(), 1, 2, 3, 4, 5) {
fmt.Println(i)
}

10.
[beta]
sync.Once

sync.Once#Do は、何回呼ばれても一度しか実行されない
var count int
increment := func() {
count++
}
var once sync.Once
var increments sync.WaitGroup
increments.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer increments.Done()
once.Do(increment)
}()
}
increments.Wait()
fmt.Printf("Count is %v!\n", count)
=> Count is 1!

11.
[beta]
sync.Pool

使うものを決まった数だけ作る方法

workerが1024*1024動いているが、
実際のメモリ確保は、数KBのみ
Poolを使っていない場合、
最悪1024B * 1024 * 1024 = 1GB

var numCalcsCreated int
calcPool := &sync.Pool{
New: func() interface{} {
numCalcsCreated += 1
mem := make([]byte, 1024)
return &mem
},
}
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())
const numWorkers = 1024 * 1024
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := numWorkers; i > 0; i-- {
go func() {
defer wg.Done()
mem := calcPool.Get().(*[]byte)
defer calcPool.Put(mem)
}()

オブジェクトを使い回せる場合に便利

}
wg.Wait()
fmt.Printf("%d calculators were created.", numCalcsCreated)

12.
[beta]
channelのパターン

for-selectループ

無限ループしつつ、doneに
値が入ったらbreakするようなコード

100[ms]毎に終了すべきか
チェックしているとも言える

done := make(chan struct{})
go func() {
time.Sleep(1 * time.Second)
done <- struct{}{}
}()
Loop:
for {
select {
case <-done:
break Loop
default:
}
fmt.Println("hi")
time.Sleep(100 * time.Millisecond)
}

13.
[beta]
channelのパターン

for-selectループ
time.Afterを使う場合はこんな感じ

default節を書かないと、
timeoutChから値が来るまでずっと
ブロックし続けるので注意
(ミスりやすい)
time.NewTimerを使う

timeoutCh := time.After(1 * time.Second)
Loop:
for {
select {
case <-timeoutCh: // 内部のgoroutineがリークする
break Loop
case <-ctx.Done():
return
default:
}
fmt.Println("hi")
time.Sleep(100 * time.Millisecond)
}
fmt.Println("finished")

14.
[beta]
channelのパターン

便利な関数
これがあると、

func orDone[T any](ctx context.Context, c <-chan T) <-chan T {
stream := make(chan T)
go func() {
defer close(stream)

for val := range orDone(ctx, myChan) {
// valに対してなにかする
}

for {
select {
case <-ctx.Done():
return nil
case v, ok := <-c:
if !ok {
return
}
select {
case stream <- v:
case <-ctx.Done():
}
}
}

こう書くだけでcontextが終了するか
myChanがcloseされるまでのループを
安全にできる。

}()
return stream
}

15.
[beta]
channelのパターン

fan-in

複数のチャンネルをまとめて
1つのチャンネルにする

パイプラインパターンのときに
よく出てくる

var wg sync.WaitGroup
multiplexed := make(chan T)
multiplex := func(c <-chan T) {
defer wg.Done()
for i := range c {
{
select {
case <-ctx.Done():
return
case multiplexed <- i:
}
}
}
}
wg.Add(len(channels))
for _, c := range channels {
go multiplex(c)
}
go func() {
wg.Wait()
close(multiplexed)
}()
return multiplexed

16.

channelのパターン パイプライン processB input processA processC processB それぞれのステージは、goroutine それらをつないでいるのがchannel つまり、ステージはchannelを受け取ってchannelを返す output

17.
[beta]
channelのパターン

だいたい全部こんな感じになる

それらのステージを

func processA(ctx context.Context, input <-chan int) <-chan int {
ret := make(chan int)
go func() {
defer close(ret)

つなぎ合わせて処理する

for i := range input {
// ここでなんか処理する
select {
case <-ctx.Done():
return
case ret <- i:
}
}
}()
return ret
}

18.

個人的に良くないと思ってるパターン 都度goroutineを起動する - forでループのたびにgoroutineを起動すると、goroutineの起動数に制限を かけておかないとgoroutineが起動しすぎてしまう。 制限をかけていれば問題ない(webサーバーとかはこのパターン) よく考えずchannelのバッファーを1以上にする - バッファを設定しても、それぞれのステージがブロック状態になる 時間を短くできるだけ 全体としての処理時間が早くなるわけではない

19.

注意点 goroutineがリークしないように注意 - 関数が終わることで、channelから読み込まれるものが無い状態にならな いことを注意したい

20.

他にも有用なパターン - ハートビート - 流量制限 - errgroup