高级Go并发模式

本文翻译自《Advanced Go Concurrency Patterns》。

Sameer Ajmani

May 2013

1 高级Go并发模式

Sameer Ajmani

Google

2 视频

该演讲于2013年5月在Google I/O上发表。

在YouTube上观看演讲

3 做好准备

4 Go支持并发

在语言和运行时(runtime)。不是一个库。

这会改变你构建程序的方式。

5 协程(Goroutine)和通道(Channel)

Goroutine是在同一地址空间中独立执行的函数。

go f()
go g(1, 2)

Channel是允许goroutine同步和交换信息的类型值。

c := make(chan int)
go func() { c <- 3 }()
n := <-c

有关基础知识的更多信息,请观看Go并发模式 (Pike, 2012)

6 示例:乒乓球(ping-pong)

type Ball struct{ hits int }

func main() {
    table := make(chan *Ball)
    go player("ping", table)
    go player("pong", table)

    table <- new(Ball) // 开始游戏:投球
    time.Sleep(1 * time.Second)
    <-table // 结束游戏:抓住这个球
}

func player(name string, table chan *Ball) {
    for {
        ball := <-table
        ball.hits++
        fmt.Println(name, ball.hits)
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}

7 死锁检测

type Ball struct{ hits int }

func main() {
    table := make(chan *Ball)
    go player("ping", table)
    go player("pong", table)

    // table <- new(Ball) // 开始游戏:投球
    time.Sleep(1 * time.Second)
    <-table // 结束游戏:抓住这个球
}

func player(name string, table chan *Ball) {
    for {
        ball := <-table
        ball.hits++
        fmt.Println(name, ball.hits)
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}

8 使用panic输出堆栈信息

type Ball struct{ hits int }

func main() {
    table := make(chan *Ball)
    go player("ping", table)
    go player("pong", table)

    table <- new(Ball) // 开始游戏:投球
    time.Sleep(1 * time.Second)
    <-table // 结束游戏:抓住这个球

    panic("show me the stacks")
}

func player(name string, table chan *Ball) {
    for {
        ball := <-table
        ball.hits++
        fmt.Println(name, ball.hits)
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}

9 走(go)很容易,但如何停下来?

长时间运行的程序需要清理。

让我们看看如何编写处理通信、周期性事件和取消的程序。

核心是Go的select语句:就像一个switch,但决定是基于通信能力做出的。

select {
case xc <- x:
    // 把x发送到xc通道
case y := <-yc:
    // 从yc通道接收y
}

10 示例:feed阅读器

我最喜欢的feed阅读器消失了。我需要一个新的。

为什么不写一个?

我们从哪里开始?

11 查找RSS客户端

godoc.org中搜索“rss”会出现几个命中,其中一个提供:

// Fetch获取uri的Items并返回应该尝试下一次获取的时刻。失败时,Fetch返回错误。
func Fetch(uri string) (items []Item, next time.Time, err error)

type Item struct{
    Title, Channel, GUID string // RSS字段的子集
}

但我想要一个流:

<-chan Item

我想要多个订阅。

12 这就是我们所拥有的

type Fetcher interface {
    Fetch() (items []Item, next time.Time, err error)
}

func Fetch(domain string) Fetcher {...} // 从域名获取条目

13 这就是我们想要的

type Subscription interface {
    Updates() <-chan Item // 条目流(stream of Items)
    Close() error         // 关闭流
}

func Subscribe(fetcher Fetcher) Subscription {...} // 把获取到的数据转换为一个流

func Merge(subs ...Subscription) Subscription {...} // 合并多个流

14 示例

func main() {
    // 订阅一些feed,并创建一个合并的更新流。
    merged := Merge(
        Subscribe(Fetch("blog.golang.org")),
        Subscribe(Fetch("googleblog.blogspot.com")),
        Subscribe(Fetch("googledevelopers.blogspot.com")))

    // 在一段时间后关闭所有订阅(subscription)。
    time.AfterFunc(3*time.Second, func() {
        fmt.Println("closed:", merged.Close())
    })

    // 输出流中数据。
    for it := range merged.Updates() {
        fmt.Println(it.Channel, it.Title)
    }

    panic("show me the stacks")
}

15 订阅(Subscribe)

Subscribe创建一个新的Subscription,它重复获取条目,直到调用Close。

func Subscribe(fetcher Fetcher) Subscription {
    s := &sub{
        fetcher: fetcher,
        updates: make(chan Item), // for Updates
    }
    go s.loop()
    return s
}

// sub实现了Subscription接口。
type sub struct {
    fetcher Fetcher   // 获取条目
    updates chan Item // 把条目分发给用户
}

// loop使用s.fetcher方法获取条目并使用s.updates方法发送它们。当调用s.Close方法时,退出loop。
func (s *sub) loop() {...}

16 实现Subscription

要实现Subscription接口,请定义Updates方法和Close方法。

func (s *sub) Updates() <-chan Item {
    return s.updates
}
func (s *sub) Close() error {
    // TODO: 让loop退出
    // TODO: 发现任何错误
    return err
}

17 loop有什么作用?

  • 定期调用Fetch
  • 在Updates通道上发送获取的条目
  • 调用Close时退出,并报告错误

18 幼稚的实现

    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err                 
            time.Sleep(10 * time.Second)
            continue
        }
        for _, item := range items {
            s.updates <- item
        }
        if now := time.Now(); next.After(now) {
            time.Sleep(next.Sub(now))
        }
     }

func (s *naiveSub) Close() error {
    s.closed = true
    return s.err   
}

19 Bug 1:对s.closed/s.err的非同步访问

    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err
            time.Sleep(10 * time.Second)
            continue
        }
        for _, item := range items {
            s.updates <- item
        }
        if now := time.Now(); next.After(now) {
            time.Sleep(next.Sub(now))
        }
    }

func (s *naiveSub) Close() error {
    s.closed = true
    return s.err
}

20 竞态(Race)检测器

go run -race naivemain.go
    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err

func (s *naiveSub) Close() error {
    s.closed = true
    return s.err
}

21 Bug 2:time.Sleep可能会保持循环运行

    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err                 
            time.Sleep(10 * time.Second)
            continue
        }
        for _, item := range items {
            s.updates <- item
        }
        if now := time.Now(); next.After(now) {
            time.Sleep(next.Sub(now))
        }
    }

22 Bug 3:循环可能会在s.updates上永远阻塞

    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err                 
            time.Sleep(10 * time.Second)
            continue
        }
        for _, item := range items {
            s.updates <- item
        }
        if now := time.Now(); next.After(now) {
            time.Sleep(next.Sub(now))
        }
    }

23 解决方案

将循环体更改为具有三种情况的select:

  • Close被调用时
  • 调用Fetch时
  • 在s.updates上发送条目

24 结构:for-select循环

循环在它自己的goroutine中运行。

select让循环避免在任何一种状态下无限期地阻塞。

func (s *sub) loop() {
    ... declare mutable state ...
    for {
        ... set up channels for cases ...
        select {
        case <-c1:
            ... read/write state ...
        case c2 <- x:
            ... read/write state ...
        case y := <-c3:
            ... read/write state ...
        }
    }
}

这些case通过循环中的本地状态进行交互。

25 case 1:Close

Close通过s.closure与循环通信。

type sub struct {
    closing chan chan error
}

服务(loop)在其通道(s.closure)上监听请求。

客户端(Close)在s.closure上发送请求:退出并回复错误

在这种情况下,请求中唯一的内容就是回复通道。

26 Case 1: Close

Close要求循环退出并等待响应。

func (s *sub) Close() error {
    errc := make(chan error)
    s.closing <- errc
    return <-errc
}

循环通过回复Fetch错误来处理Close并退出。

    var err error // 当Fetch失败时设置错误
    for {
        select {
        case errc := <-s.closing:
            errc <- err
            close(s.updates) // 告诉接收者我们做完了
            return
        }
    }

27 Case 2: Fetch

在延迟一段时间后安排下一次Fetch。

    var pending []Item // 被fetch追加;被send消费
    var next time.Time // 初始化值是January 1, year 0
    var err error
    for {
        var fetchDelay time.Duration // 初始化值是0(没有延迟)
        if now := time.Now(); next.After(now) {
            fetchDelay = next.Sub(now)
        }
        startFetch := time.After(fetchDelay)

        select {
        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)
        }
    }

28 Case 3: Send

一次发送一个获取的条目。

var pending []Item // 被fetch追加;被send消费
for {
    select {
    case s.updates <- pending[0]:
        pending = pending[1:]
    }
}

哎呀。这崩溃了。

29 select和nil通道

在nil通道上发送和接收会被阻塞。

select从不选择阻塞的case。

func main() {
    a, b := make(chan string), make(chan string)
    go func() { a <- "a" }()
    go func() { b <- "b" }()
    if rand.Intn(2) == 0 {
        a = nil
        fmt.Println("nil a")
    } else {
        b = nil
        fmt.Println("nil b")
    }
    select {
    case s := <-a:
        fmt.Println("got", s)
    case s := <-b:
        fmt.Println("got", s)
    }
}

30 Case 3: Send (修复的)

仅在pending非空时启用send。

    var pending []Item // 被fetch追加;被send消费
    for {
        var first Item
        var updates chan Item
        if len(pending) > 0 {
            first = pending[0]
            updates = s.updates // 启用send case
        }

        select {
        case updates <- first:
            pending = pending[1:]
        }
     }

31 select

将三个case放在一起:

   select {
        case errc := <-s.closing:
            errc <- err
            close(s.updates)
            return
        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)
        case updates <- first:
            pending = pending[1:]
      }

这些case通过err、next和pending交互。

没有锁,没有条件变量,没有回调函数。

32 已修复的Bug

  • Bug 1:对s.closed和s.err的非同步(unsynchronized)访问
  • Bug 2:time.Sleep可能会保持循环运行
  • Bug 3:循环可能会永远阻塞在发送s.updates上
        select {
        case errc := <-s.closing:
            errc <- err
            close(s.updates)
            return
        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)
        case updates <- first:
            pending = pending[1:]
        }

33 我们可以进一步改进循环

34 问题:Fetch可能会返回重复项

    var pending []Item
    var next time.Time
    var err error

        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)

35 修复:在添加到pending之前过滤条目

    var pending []Item
    var next time.Time
    var err error
    var seen = make(map[string]bool) // item.GUIDs集合

        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            for _, item := range fetched {
                if !seen[item.GUID] {
                    pending = append(pending, item)
                    seen[item.GUID] = true
                }
            }

36 问题:pending队列无限制地增长

        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            for _, item := range fetched {
                if !seen[item.GUID] {
                    pending = append(pending, item)
                    seen[item.GUID] = true
                }
            }

37 问题:pending队列无限制地增长

const maxPending = 10

        var fetchDelay time.Duration
        if now := time.Now(); next.After(now) {
            fetchDelay = next.Sub(now)
        }
        var startFetch <-chan time.Time
        if len(pending) < maxPending {
            startFetch = time.After(fetchDelay) // 启用fetch case
        }

可以改为从pending的头部删除较旧的条目。

38 问题:Fetch上的循环被阻塞

        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            for _, item := range fetched {
                if !seen[item.GUID] {
                    pending = append(pending, item)
                    seen[item.GUID] = true         
                }
            }

39 修复:异步运行Fetch

为fetchDone添加一个新的select case。

type fetchResult struct{ fetched []Item; next time.Time; err error }

    var fetchDone chan fetchResult // 如果不是nil,则表示Fetch正在运行

        var startFetch <-chan time.Time
        if fetchDone == nil && len(pending) < maxPending {
            startFetch = time.After(fetchDelay) // 启用fetch case
        }

        select {
        case <-startFetch:
            fetchDone = make(chan fetchResult, 1)
            go func() {
                fetched, next, err := s.fetcher.Fetch()
                fetchDone <- fetchResult{fetched, next, err}
            }()
        case result := <-fetchDone:
            fetchDone = nil
            // 使用result.fetched, result.next, result.err

40 实现订阅

响应式。干净。易于阅读和更改。

三种技术:

  • for-select循环
  • 服务通道,回复通道(chan chan err)
  • select case里的nil通道

更多详细信息在线,包括Merge。

41 总结

并发编程可能很棘手。

Go让它变得更容易:

  • 通道传送数据、定时器事件、取消信号
  • goroutines序列化对本地可变状态的访问
  • 堆栈跟踪和死锁检测器
  • 竞态检测器

42 链接

Go并发模式(2012)

go.dev/talks/2012/concurrency.slide

并发不是并行

golang.org/s/concurrency-is-not-parallelism

通过通信共享内存

golang.org/doc/codewalk/sharemem

Go Tour (在你的浏览器里学习Go)

tour.golang.org

43 致谢

Sameer Ajmani

Google

http://profiles.google.com/ajmani

@Sajma

http://golang.org

“高级Go并发模式”的一个回复

  1. Your article gave me a lot of inspiration, I hope you can explain your point of view in more detail, because I have some doubts, thank you.

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注