高级Go并发模式

本文翻译自《Advanced Go Concurrency Patterns》。

Sameer Ajmani

May 2013

1 高级Go并发模式

Sameer Ajmani

Google

2 视频

该演讲于2013年5月在Google I/O上发表。

在YouTube上观看演讲

3 做好准备

4 Go支持并发

在语言和运行时(runtime)。不是一个库。

这会改变你构建程序的方式。

5 协程(Goroutine)和通道(Channel)

Goroutine是在同一地址空间中独立执行的函数。

go f()
go g(1, 2)

Channel是允许goroutine同步和交换信息的类型值。

c := make(chan int)
go func() { c <- 3 }()
n := <-c

有关基础知识的更多信息,请观看Go并发模式 (Pike, 2012)

6 示例:乒乓球(ping-pong)

type Ball struct{ hits int }

func main() {
    table := make(chan *Ball)
    go player("ping", table)
    go player("pong", table)

    table <- new(Ball) // 开始游戏:投球
    time.Sleep(1 * time.Second)
    <-table // 结束游戏:抓住这个球
}

func player(name string, table chan *Ball) {
    for {
        ball := <-table
        ball.hits++
        fmt.Println(name, ball.hits)
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}

7 死锁检测

type Ball struct{ hits int }

func main() {
    table := make(chan *Ball)
    go player("ping", table)
    go player("pong", table)

    // table <- new(Ball) // 开始游戏:投球
    time.Sleep(1 * time.Second)
    <-table // 结束游戏:抓住这个球
}

func player(name string, table chan *Ball) {
    for {
        ball := <-table
        ball.hits++
        fmt.Println(name, ball.hits)
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}

8 使用panic输出堆栈信息

type Ball struct{ hits int }

func main() {
    table := make(chan *Ball)
    go player("ping", table)
    go player("pong", table)

    table <- new(Ball) // 开始游戏:投球
    time.Sleep(1 * time.Second)
    <-table // 结束游戏:抓住这个球

    panic("show me the stacks")
}

func player(name string, table chan *Ball) {
    for {
        ball := <-table
        ball.hits++
        fmt.Println(name, ball.hits)
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}

9 走(go)很容易,但如何停下来?

长时间运行的程序需要清理。

让我们看看如何编写处理通信、周期性事件和取消的程序。

核心是Go的select语句:就像一个switch,但决定是基于通信能力做出的。

select {
case xc <- x:
    // 把x发送到xc通道
case y := <-yc:
    // 从yc通道接收y
}

10 示例:feed阅读器

我最喜欢的feed阅读器消失了。我需要一个新的。

为什么不写一个?

我们从哪里开始?

11 查找RSS客户端

godoc.org中搜索“rss”会出现几个命中,其中一个提供:

// Fetch获取uri的Items并返回应该尝试下一次获取的时刻。失败时,Fetch返回错误。
func Fetch(uri string) (items []Item, next time.Time, err error)

type Item struct{
    Title, Channel, GUID string // RSS字段的子集
}

但我想要一个流:

<-chan Item

我想要多个订阅。

12 这就是我们所拥有的

type Fetcher interface {
    Fetch() (items []Item, next time.Time, err error)
}

func Fetch(domain string) Fetcher {...} // 从域名获取条目

13 这就是我们想要的

type Subscription interface {
    Updates() <-chan Item // 条目流(stream of Items)
    Close() error         // 关闭流
}

func Subscribe(fetcher Fetcher) Subscription {...} // 把获取到的数据转换为一个流

func Merge(subs ...Subscription) Subscription {...} // 合并多个流

14 示例

func main() {
    // 订阅一些feed,并创建一个合并的更新流。
    merged := Merge(
        Subscribe(Fetch("blog.golang.org")),
        Subscribe(Fetch("googleblog.blogspot.com")),
        Subscribe(Fetch("googledevelopers.blogspot.com")))

    // 在一段时间后关闭所有订阅(subscription)。
    time.AfterFunc(3*time.Second, func() {
        fmt.Println("closed:", merged.Close())
    })

    // 输出流中数据。
    for it := range merged.Updates() {
        fmt.Println(it.Channel, it.Title)
    }

    panic("show me the stacks")
}

15 订阅(Subscribe)

Subscribe创建一个新的Subscription,它重复获取条目,直到调用Close。

func Subscribe(fetcher Fetcher) Subscription {
    s := &sub{
        fetcher: fetcher,
        updates: make(chan Item), // for Updates
    }
    go s.loop()
    return s
}

// sub实现了Subscription接口。
type sub struct {
    fetcher Fetcher   // 获取条目
    updates chan Item // 把条目分发给用户
}

// loop使用s.fetcher方法获取条目并使用s.updates方法发送它们。当调用s.Close方法时,退出loop。
func (s *sub) loop() {...}

16 实现Subscription

要实现Subscription接口,请定义Updates方法和Close方法。

func (s *sub) Updates() <-chan Item {
    return s.updates
}
func (s *sub) Close() error {
    // TODO: 让loop退出
    // TODO: 发现任何错误
    return err
}

17 loop有什么作用?

  • 定期调用Fetch
  • 在Updates通道上发送获取的条目
  • 调用Close时退出,并报告错误

18 幼稚的实现

    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err                 
            time.Sleep(10 * time.Second)
            continue
        }
        for _, item := range items {
            s.updates <- item
        }
        if now := time.Now(); next.After(now) {
            time.Sleep(next.Sub(now))
        }
     }

func (s *naiveSub) Close() error {
    s.closed = true
    return s.err   
}

19 Bug 1:对s.closed/s.err的非同步访问

    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err
            time.Sleep(10 * time.Second)
            continue
        }
        for _, item := range items {
            s.updates <- item
        }
        if now := time.Now(); next.After(now) {
            time.Sleep(next.Sub(now))
        }
    }

func (s *naiveSub) Close() error {
    s.closed = true
    return s.err
}

20 竞态(Race)检测器

go run -race naivemain.go
    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err

func (s *naiveSub) Close() error {
    s.closed = true
    return s.err
}

21 Bug 2:time.Sleep可能会保持循环运行

    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err                 
            time.Sleep(10 * time.Second)
            continue
        }
        for _, item := range items {
            s.updates <- item
        }
        if now := time.Now(); next.After(now) {
            time.Sleep(next.Sub(now))
        }
    }

22 Bug 3:循环可能会在s.updates上永远阻塞

    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err                 
            time.Sleep(10 * time.Second)
            continue
        }
        for _, item := range items {
            s.updates <- item
        }
        if now := time.Now(); next.After(now) {
            time.Sleep(next.Sub(now))
        }
    }

23 解决方案

将循环体更改为具有三种情况的select:

  • Close被调用时
  • 调用Fetch时
  • 在s.updates上发送条目

24 结构:for-select循环

循环在它自己的goroutine中运行。

select让循环避免在任何一种状态下无限期地阻塞。

func (s *sub) loop() {
    ... declare mutable state ...
    for {
        ... set up channels for cases ...
        select {
        case <-c1:
            ... read/write state ...
        case c2 <- x:
            ... read/write state ...
        case y := <-c3:
            ... read/write state ...
        }
    }
}

这些case通过循环中的本地状态进行交互。

25 case 1:Close

Close通过s.closure与循环通信。

type sub struct {
    closing chan chan error
}

服务(loop)在其通道(s.closure)上监听请求。

客户端(Close)在s.closure上发送请求:退出并回复错误

在这种情况下,请求中唯一的内容就是回复通道。

26 Case 1: Close

Close要求循环退出并等待响应。

func (s *sub) Close() error {
    errc := make(chan error)
    s.closing <- errc
    return <-errc
}

循环通过回复Fetch错误来处理Close并退出。

    var err error // 当Fetch失败时设置错误
    for {
        select {
        case errc := <-s.closing:
            errc <- err
            close(s.updates) // 告诉接收者我们做完了
            return
        }
    }

27 Case 2: Fetch

在延迟一段时间后安排下一次Fetch。

    var pending []Item // 被fetch追加;被send消费
    var next time.Time // 初始化值是January 1, year 0
    var err error
    for {
        var fetchDelay time.Duration // 初始化值是0(没有延迟)
        if now := time.Now(); next.After(now) {
            fetchDelay = next.Sub(now)
        }
        startFetch := time.After(fetchDelay)

        select {
        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)
        }
    }

28 Case 3: Send

一次发送一个获取的条目。

var pending []Item // 被fetch追加;被send消费
for {
    select {
    case s.updates <- pending[0]:
        pending = pending[1:]
    }
}

哎呀。这崩溃了。

29 select和nil通道

在nil通道上发送和接收会被阻塞。

select从不选择阻塞的case。

func main() {
    a, b := make(chan string), make(chan string)
    go func() { a <- "a" }()
    go func() { b <- "b" }()
    if rand.Intn(2) == 0 {
        a = nil
        fmt.Println("nil a")
    } else {
        b = nil
        fmt.Println("nil b")
    }
    select {
    case s := <-a:
        fmt.Println("got", s)
    case s := <-b:
        fmt.Println("got", s)
    }
}

30 Case 3: Send (修复的)

仅在pending非空时启用send。

    var pending []Item // 被fetch追加;被send消费
    for {
        var first Item
        var updates chan Item
        if len(pending) > 0 {
            first = pending[0]
            updates = s.updates // 启用send case
        }

        select {
        case updates <- first:
            pending = pending[1:]
        }
     }

31 select

将三个case放在一起:

   select {
        case errc := <-s.closing:
            errc <- err
            close(s.updates)
            return
        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)
        case updates <- first:
            pending = pending[1:]
      }

这些case通过err、next和pending交互。

没有锁,没有条件变量,没有回调函数。

32 已修复的Bug

  • Bug 1:对s.closed和s.err的非同步(unsynchronized)访问
  • Bug 2:time.Sleep可能会保持循环运行
  • Bug 3:循环可能会永远阻塞在发送s.updates上
        select {
        case errc := <-s.closing:
            errc <- err
            close(s.updates)
            return
        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)
        case updates <- first:
            pending = pending[1:]
        }

33 我们可以进一步改进循环

34 问题:Fetch可能会返回重复项

    var pending []Item
    var next time.Time
    var err error

        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)

35 修复:在添加到pending之前过滤条目

    var pending []Item
    var next time.Time
    var err error
    var seen = make(map[string]bool) // item.GUIDs集合

        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            for _, item := range fetched {
                if !seen[item.GUID] {
                    pending = append(pending, item)
                    seen[item.GUID] = true
                }
            }

36 问题:pending队列无限制地增长

        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            for _, item := range fetched {
                if !seen[item.GUID] {
                    pending = append(pending, item)
                    seen[item.GUID] = true
                }
            }

37 问题:pending队列无限制地增长

const maxPending = 10

        var fetchDelay time.Duration
        if now := time.Now(); next.After(now) {
            fetchDelay = next.Sub(now)
        }
        var startFetch <-chan time.Time
        if len(pending) < maxPending {
            startFetch = time.After(fetchDelay) // 启用fetch case
        }

可以改为从pending的头部删除较旧的条目。

38 问题:Fetch上的循环被阻塞

        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            for _, item := range fetched {
                if !seen[item.GUID] {
                    pending = append(pending, item)
                    seen[item.GUID] = true         
                }
            }

39 修复:异步运行Fetch

为fetchDone添加一个新的select case。

type fetchResult struct{ fetched []Item; next time.Time; err error }

    var fetchDone chan fetchResult // 如果不是nil,则表示Fetch正在运行

        var startFetch <-chan time.Time
        if fetchDone == nil && len(pending) < maxPending {
            startFetch = time.After(fetchDelay) // 启用fetch case
        }

        select {
        case <-startFetch:
            fetchDone = make(chan fetchResult, 1)
            go func() {
                fetched, next, err := s.fetcher.Fetch()
                fetchDone <- fetchResult{fetched, next, err}
            }()
        case result := <-fetchDone:
            fetchDone = nil
            // 使用result.fetched, result.next, result.err

40 实现订阅

响应式。干净。易于阅读和更改。

三种技术:

  • for-select循环
  • 服务通道,回复通道(chan chan err)
  • select case里的nil通道

更多详细信息在线,包括Merge。

41 总结

并发编程可能很棘手。

Go让它变得更容易:

  • 通道传送数据、定时器事件、取消信号
  • goroutines序列化对本地可变状态的访问
  • 堆栈跟踪和死锁检测器
  • 竞态检测器

42 链接

Go并发模式(2012)

go.dev/talks/2012/concurrency.slide

并发不是并行

golang.org/s/concurrency-is-not-parallelism

通过通信共享内存

golang.org/doc/codewalk/sharemem

Go Tour (在你的浏览器里学习Go)

tour.golang.org

43 致谢

Sameer Ajmani

Google

http://profiles.google.com/ajmani

@Sajma

http://golang.org

Go并发模式

本文翻译自《Go Concurrency Patterns》,是个PPT。

1 Go Concurrency Patterns

Rob Pike

Google

2 视频

这个演讲是在2012年6月在Google I/O大会上做的。

可在YouTube上观看这个演讲

3 介绍

4 Go中的并发特性

当Go语言首次发布时,人们似乎对Go的并发特性着迷。

问题:

  • 为什么支持并发?
  • 什么是并发?
  • 想法从何而来?
  • 到底有什么好处呢?
  • 我该如何使用它?

5 为什么?

看看你周围。你看到了什么?

你是否看到一个单步的世界一次只做一件事?

或者你是否看到了一个由相互作用、行为独立的片段组成的复杂世界?

这就是为什么。顺序处理(Sequential processing)本身并不能模拟世界的行为。

6 什么是并发?

并发是独立地执行计算的组合。

并发是一种构建软件的方式,特别是编写与现实世界交互良好的干净代码的方式。

它不是并行。

7 并发不是并行

并发不是并行,尽管它支持并行。

如果你只有一个处理器,你的程序仍然可以并发但不能并行。

另一方面,编写良好的并发程序可能会在多处理器上高效地并行运行。那个属性可能很重要…

有关该区别的更多信息,请参阅下面的链接。这里讨论的太多了。

golang.org/s/concurrency-is-not-parallelism

8 软件构建模型

容易理解。

便于使用。

容易推理。

你不需要成为专家!

(比处理并行的细节(线程、信号量、锁、屏障等)要好得多。)

9 历史

对许多人来说,Go的并发特性似乎很新。

但它们植根于悠久的历史,可以追溯到1978年Hoare的CSP甚至Dijkstra的守卫命令(1975)。

具有相似特征的语言:

  • Occam (May, 1983)
  • Erlang (Armstrong, 1986)
  • Newsqueak (Pike, 1988)
  • Concurrent ML (Reppy, 1993)
  • Alef (Winterbottom, 1995)
  • Limbo (Dorward, Pike, Winterbottom, 1996).

10 区别

Go是Newsqueak-Alef-Limbo分支的最新成员,以第一公民的通道而著称。

Erlang更接近于原始CSP,你可以通过名字而不是通过通道与进程通信。

这些模型是等效的,但表达方式不同。

粗略的类比:按名字写入文件(进程,Erlang)与写入文件描述符(通道,Go)。

11 基本例子

12 一个无聊的函数

我们需要一个例子来展示并发原语的有趣属性。

为了避免分心,我们把它作为一个无聊的例子。

func boring(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(time.Second)
    }
}

13 不那么无聊

使消息之间的间隔不可预测(仍不到一秒)。

func boring(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

14 运行它

无聊的功能永远运行,就像一个无聊的派对客人。

func main() {
    boring("boring!")
}

func boring(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

15 忽略它

go语句照常运行函数,但不会让调用者等待。

它启动一个goroutine。 该功能类似于shell命令末尾的&。

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    go boring("boring!")
}

16 少一点忽略

当main返回时,程序退出并带走boring函数。

我们可以闲逛一下,在路上显示main和已启动的goroutine都在运行。

func main() {
    go boring("boring!")
    fmt.Println("I'm listening.")
    time.Sleep(2 * time.Second)
    fmt.Println("You're boring; I'm leaving.")
}

17 协程(Goroutine)

什么是goroutine?它是一个独立执行的函数,由go语句启动。

它有自己的调用堆栈,可以根据需要增长和缩小。

它非常廉价。实践中,拥有数千甚至数十万个goroutine是很常见的。

这不是一个线程。

一个包含数千个goroutine的程序中可能只有一个线程。

相反,goroutines会根据需要动态地多路复用到线程上,以保持所有goroutines运行。

但是,如果你将其视为非常廉价的线程,那么你将不会走太远。

18 通信(Communication)

我们boring的例子被骗了:main函数看不到其他goroutine的输出。

它只是打印到屏幕上,我们假装看到了对话。

真正的对话需要通信。

19 通道(Channel)

Go中的通道提供了两个goroutine之间的连接,允许它们进行通信。

// 声明和初始化。
var c chan int
c = make(chan int)
// 或者
c := make(chan int)
// 在通道上发送数据。
c <- 1
// 从通道里取数据。
// 箭头指明了数据流的方向。
value = <-c

20 使用通道

通道连接main和boring的goroutine,以便它们可以通信。

func main() {
    c := make(chan string)
    go boring("boring!", c)
    for i := 0; i < 5; i++ {
        fmt.Printf("You say: %q\n", <-c) // 接收表达式仅是一个值。
    }
    fmt.Println("You're boring; I'm leaving.")
}

func boring(msg string, c chan string) {
    for i := 0; ; i++ {
        c <- fmt.Sprintf("%s %d", msg, i) // 将被发送的表达式可以是任意合适的值。
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

21 同步(Synchronization)

当main函数执行<–c时,它会等待一个值被发送。

同样,当boring函数执行c <– value时,它会等待接收者准备好,再把值发送到通道。

发送者和接收者都必须准备好在通信中发挥自己的作用。否则我们会等待直到他们准备好。 因此,通道既可以通信又可以同步。

22 关于带缓冲区的通道的旁白

专家提示:Go也可以创建带缓冲区的通道。

缓冲区消除了同步。

通道的缓冲区使它们更像Erlang的邮箱。

带缓冲区的通道对于某些问题可能很重要,但它们需要更加微妙的推理。

我们今天不需要它们。

23 Go的使用方式

不要通过共享内存来通信,而是通过通信来共享内存。

24 模式

25 生成器:返回一个通道的函数

通道是一等公民(first-class)的值,就像字符串或整数一样。

c := boring("boring!") // 返回一个通道的函数。
    for i := 0; i < 5; i++ {
        fmt.Printf("You say: %q\n", <-c)
    }
    fmt.Println("You're boring; I'm leaving.")

func boring(msg string) <-chan string { // 返回只能用来接收值(receive-only)的通道
    c := make(chan string)
    go func() { // 我们在函数内部开启一个goroutine。
        for i := 0; ; i++ {
            c <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
        }
    }()
    return c // 把通道返回给本函数的调用者。
}

26 通道作为服务的句柄

我们的boring函数返回一个通道,让我们可以与它提供的boring服务进行通信。

我们可以有该服务的更多实例。

func main() {
    joe := boring("Joe")
    ann := boring("Ann")
    for i := 0; i < 5; i++ {
        fmt.Println(<-joe)
        fmt.Println(<-ann)
    }
    fmt.Println("You're both boring; I'm leaving.")
}

27 多路复用

这些计划使Joe和Ann步调一致。

我们可以改为使用扇入功能让准备好的人说话。

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() { for { c <- <-input1 } }()
    go func() { for { c <- <-input2 } }()
    return c
}
func main() {
    c := fanIn(boring("Joe"), boring("Ann"))
    for i := 0; i < 10; i++ {
        fmt.Println(<-c)
    }
    fmt.Println("You're both boring; I'm leaving.")
}

28 扇入(Fan-in)

29 恢复顺序

在一个通道上发送一个通道,让goroutine等待轮到它。

接收所有消息,然后在专用通道上发送,再次启用它们。 首先,我们定义一个包含回复用的通道的消息类型。

type Message struct {
    str string
    wait chan bool
}

30 恢复顺序

每个发言者都必须等待批准。

for i := 0; i < 5; i++ {
        msg1 := <-c; fmt.Println(msg1.str)
        msg2 := <-c; fmt.Println(msg2.str)
        msg1.wait <- true
        msg2.wait <- true
}

waitForIt := make(chan bool) // 所有消息实例共享的通道。

// (译者注:以下是每个发言者协程的关键代码)
    c <- Message{ fmt.Sprintf("%s: %d", msg, i), waitForIt }
    time.Sleep(time.Duration(rand.Intn(2e3)) * time.Millisecond)
    <-waitForIt

(译者注:上述程序实现每个发言者发言后,必须等待批准,才能继续运行下去。)

31 select

并发独有的控制结构。

和通道、goroutine一起内置在语言中。

32 select

select语句提供了另一种处理多个通道的方法。

这就像一个开关,但每个case都是一个通信:

– 评估所有通道。

– 阻塞选择,直到一个通信可以继续,然后它会继续。

– 如果有多个可以继续,伪随机选择其中一个。 – 如果没有一个通道准备好,则默认子句(如果存在)立即执行。

select {
    case v1 := <-c1:
        fmt.Printf("received %v from c1\n", v1)
    case v2 := <-c2:
        fmt.Printf("received %v from c2\n", v1)
    case c3 <- 23:
        fmt.Printf("sent %v to c3\n", 23)
    default:
        fmt.Printf("no one was ready to communicate\n")
    }

33 再次扇入

重写我们原来的fanIn函数。实际上只需要一个goroutine就能实现。旧的:

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() { for { c <- <-input1 } }()
    go func() { for { c <- <-input2 } }()
    return c
}

34 使用select扇入

重写我们原来的fanIn函数。只需要一个goroutine就能实现。新的:

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case s := <-input1:  c <- s
            case s := <-input2:  c <- s
            }
        }
    }()
    return c
}

35 使用select超时

time.After函数返回一个阻塞指定持续时间段的通道。

在该时间段过去之后,通道将传递当前时间一次。

func main() {
    c := boring("Joe")
    for {
        select {
        case s := <-c:
            fmt.Println(s)
        case <-time.After(1 * time.Second):
            fmt.Println("You're too slow.")
            return
        }
    }
}

36 使用select的整个对话超时

在循环外创建一次计时器,以使整个对话超时。

(在前面的程序中,我们对每条消息都有一个超时。)

func main() {
    c := boring("Joe")
    timeout := time.After(5 * time.Second)
    for {
        select {
        case s := <-c:
            fmt.Println(s)
        case <-timeout:
            fmt.Println("You talk too much.")
            return
        }
    }
}

37 退出通道

我们可以扭转这种局面,告诉Joe在我们听腻了他的时候停下来。

quit := make(chan bool)
    c := boring("Joe", quit)
    for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
quit <- true

select {
            case c <- fmt.Sprintf("%s: %d", msg, i):
                // do nothing
            case <-quit:
                return
            }

38 从quit通道接收

我们怎么知道它已经完成了?等待它告诉我们它已经完成:在退出通道接收

quit := make(chan string)
    c := boring("Joe", quit)
    for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
    quit <- "Bye!"
fmt.Printf("Joe says: %q\n", <-quit)

select {
            case c <- fmt.Sprintf("%s: %d", msg, i):
                // do nothing
            case <-quit:
                cleanup()
                quit <- "See you!"
                return
            }

39 通道菊花链

func f(left, right chan int) {
    left <- 1 + <-right
}

func main() {
    const n = 10000
    leftmost := make(chan int)
    right := leftmost
    left := leftmost
    for i := 0; i < n; i++ {
        right = make(chan int)
        go f(left, right)
        left = right
    }
    go func(c chan int) { c <- 1 }(right)
    fmt.Println(<-leftmost)
}

40 中国耳语,地鼠风格

41 系统软件

Go是为编写系统软件而设计的。

让我们看看并发特性是如何发挥作用的。

42 示例:谷歌搜索

Q:谷歌搜索做什么?

A:给定一个查询,返回一页搜索结果(和一些广告)。

Q:我们如何获得搜索结果?

A:将查询发送到Web搜索、图像搜索、YouTube、地图、新闻等。然后混合结果。

我们如何实现这一点?

43 谷歌搜索:一个虚假的框架

我们可以模拟搜索功能,就像我们之前模拟对话一样。

var (
    Web = fakeSearch("web")
    Image = fakeSearch("image")
    Video = fakeSearch("video")
)

type Search func(query string) Result

func fakeSearch(kind string) Search {
        return func(query string) Result {
              time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
              return Result(fmt.Sprintf("%s result for %q\n", kind, query))
        }
}

44 Google 搜索:测试这个框架

func main() {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    results := Google("golang")
    elapsed := time.Since(start)
    fmt.Println(results)
    fmt.Println(elapsed)
}

45 谷歌搜索 1.0

Google函数接受查询并返回结果片段(只是字符串)。

Google会依次调用Web、图像和视频搜索,并将它们附加到结果切片中。

func Google(query string) (results []Result) {
    results = append(results, Web(query))
    results = append(results, Image(query))
    results = append(results, Video(query))
    return
}

46 谷歌搜索 2.0

同时运行Web、图像和视频搜索,并等待所有结果。

没有锁。没有条件变量。没有回调。

func Google(query string) (results []Result) {
    c := make(chan Result)
    go func() { c <- Web(query) } ()
    go func() { c <- Image(query) } ()
    go func() { c <- Video(query) } ()

    for i := 0; i < 3; i++ {
        result := <-c
        results = append(results, result)
    }
    return
}

47 谷歌搜索 2.1

不等待慢速服务器。没有锁。没有条件变量。没有回调。

    c := make(chan Result)
    go func() { c <- Web(query) } ()
    go func() { c <- Image(query) } ()
    go func() { c <- Video(query) } ()

    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return

(译者注:增加了超时逻辑。)

48 避免超时

问:我们如何避免丢弃来自慢速服务器的结果?

答:复制服务器。向多个副本发送请求,并使用第一个响应。

func First(query string, replicas ...Search) Result {
    c := make(chan Result)
    searchReplica := func(i int) { c <- replicas[i](query) }
    for i := range replicas {
        go searchReplica(i)
    }
    return <-c
}

49 使用第一个函数

func main() {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    result := First("golang",
        fakeSearch("replica 1"),
        fakeSearch("replica 2"))
    elapsed := time.Since(start)
    fmt.Println(result)
    fmt.Println(elapsed)
}

50 谷歌搜索 3.0

使用复制的搜索服务器减少尾部延迟。

c := make(chan Result)
    go func() { c <- First(query, Web1, Web2) } ()
    go func() { c <- First(query, Image1, Image2) } ()
    go func() { c <- First(query, Video1, Video2) } ()
    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return

51 还是…

没有锁。没有条件变量。没有回调。

52 总结

在几个简单的变换示例中,我们使用Go的并发原语来转换

  • 缓慢的
  • 顺序的
  • 故障敏感的

程序为一个

  • 快速的
  • 并发的
  • 有副本的
  • 健壮的

程序。

53 更多派对技巧

使用这些工具有无穷无尽的方法,许多在别处都有介绍。

Chatroulette toy:

golang.org/s/chat-roulette

Load balancer:

golang.org/s/load-balancer

Concurrent prime sieve:

golang.org/s/prime-sieve

Concurrent power series (by McIlroy):

golang.org/s/power-series

54 不要过用

玩起来很有趣,但不要过度使用这些想法。

Goroutines和通道是大创意。它们是程序构建的工具。

但有时你只需要一个引用计数器。

Go有”sync”和”sync/atomic”包,它们提供互斥锁、条件变量等。它们为较小的问题提供了工具。

通常,这些东西会一起解决更大的问题。

始终为工作使用正确的工具。

55 结论

Goroutines和channels使得表达复杂的操作变得容易

  • 多个输入
  • 多个输出
  • 超时
  • 失败

而且它们使用起来很有趣。

56 链接

Go主页:

golang.org

Go Tour (在浏览器里学习Go):

tour.golang.org

包(Package)文档:

golang.org/pkg

Articles galore:

golang.org/doc

并发不是并行:

golang.org/s/concurrency-is-not-parallelism

57 致谢

Rob Pike

Google

http://golang.org/s/plusrob

@rob_pike

http://golang.org

Go并发模式:管道和取消

本文翻译自《Go Concurrency Patterns: Pipelines and cancellation》。

Sameer Ajmani

13 March 2014

介绍

Go的并发原语使构建数据流管道变得容易,从而有效地利用I/O和多个CPU。本文介绍了此类管道的示例,重点介绍了操作失败时出现的细微差别,并介绍了干净地处理故障的技术。

什么是管道?

Go中没有关于管道的正式定义。它只是多种并发程序中的一种。通俗地说,管道(pipeline)是由通道(channel)连接的一系列阶段(stage),其中每个阶段是一组运行相同功能的goroutine。在每个阶段,goroutines

  • 通过入口(inbound)通道从上游接收值
  • 对该数据执行某些功能,通常会产生新值
  • 通过出口(outbound)通道向下游发送值

每个阶段都有任意数量的入口和出口通道,除了第一个阶段和最后一个阶段,它们分别只有出口或入口通道。第一阶段有时称为源头或生产者;最后一个阶段,称为水池(sink)或消费者。(译者注:一个阶段在代码实现上就是一个函数,入口通道或出口通道作为该函数的参数或返回值,在函数体内启动一个或多个goroutines来处理这一阶段的业务逻辑。见下文的各个例子。)

我们将从一个简单的示例管道开始来解释这些想法和技术。稍后,我们将展示一个更现实的例子。

平方数

考虑一个具有三个阶段的管道。

第一阶段gen是一个函数,将一个整数列表转换为一个通道,发出列表中的整数。gen函数启动一个goroutine,在通道上发出整数,并在发送完所有值后关闭该通道:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

第二阶段sq从通道接收整数并返回一个通道,该通道发出每个接收到的整数的平方值。在入口通道关闭并且此阶段已将所有值发送到下游后,它会关闭出口通道:

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

main函数设置管道并运行最后一个阶段:它从第二阶段接收值并打印输出每个值,直到通道关闭:

func main() {
    // 安装这个管道。
    c := gen(2, 3)
    out := sq(c)

    // 消费输出的值。
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

由于sq的入口通道和出口通道具有相同的类型,因此我们可以任意多次组合它。我们还可以将main重写为范围循环(译者注:即使用for-range语句),就像其他阶段一样:

func main() {
    // 安装管道并消费输出的值。
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 then 81
    }
}

扇出,扇入

多个函数可以从同一个通道读取,直到该通道关闭;这称为扇出(fan-out)。这提供了一种在一组工作者(workers)之间分配工作以并行化使用CPU和I/O的方法。

一个函数可以从多个输入中读取并继续执行,直到所有输入都关闭,方法是将输入通道多路复用到一个通道上,当所有输入都关闭时,该通道才关闭。这称为扇入(fan-in)

我们可以改变我们的管道来运行sq的两个实例,每个实例都从同一个输入通道读取。我们引入了一个新函数,merge,以扇入结果:

func main() {
    in := gen(2, 3)

    // 将sq工作分配给两个都从in通道读取的goroutine。
    c1 := sq(in)
    c2 := sq(in)

    // 消费来自c1和c2的合并输出。
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 then 9, or 9 then 4
    }
}

merge函数通过为每个入口通道启动一个goroutine,将值复制到唯一的出口通道,将通道列表转换为单个通道。一旦所有的输出goroutine都启动了,merge会再启动一个goroutine以在该通道上的所有发送完成后关闭出口通道。

往关闭的通道发送值将引发panic,因此在调用close之前确保所有发送都完成很重要。sync.WaitGroup类型提供了一种安排此同步的简单方法:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 为每个cs中的输入通道启动一个输出goroutine。output将值从c复制到out,直到c关闭,然后调用wg.Done。
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // 一旦所有输出goroutines都完成任务,就启动一个goroutine来关闭out通道。这必须在调用wg.Add之后开始。
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

止损

我们的管道函数有一个模式:

  • 当所有发送操作完成后,阶段关闭它们的出口通道。
  • 阶段不断从入口通道接收值,直到这些通道关闭。

这种模式允许将每个接收阶段编写为range循环,并确保一旦所有值都成功发送到下游,所有goroutines都会退出。

但在管道的实际使用中,并不总是接收所有入口值。有时这是设计使然:接收者可能只需要值的一个子集来取得进展。更常见的情况是,一个阶段提前退出,因为某个入口值表示前面阶段出现错误。在任何一种情况下,接收者都不应该等待剩余的值到达,我们希望前面阶段停止生产后面阶段不需要的值。

在我们的示例管道中,如果一个阶段未能消耗所有入口值,则尝试发送这些值的goroutine将无限期地阻塞:

    // 消耗输出中的第一个值。
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
    // 由于我们没有从out接收第二个值,因此其中一个输出值的goroutine被挂起,试图发送值。
}

这是资源泄漏:goroutine消耗内存和运行时资源,goroutine堆栈中的堆引用防止数据被垃圾回收。Goroutines不会被垃圾回收;他们必须自行退出。

即使下游阶段无法接收所有入口值,我们也需要安排管道的上游阶段退出。一种方法是将出口通道更改为具有缓冲区的通道。一个缓冲区可以保存固定数量的值;如果缓冲区中有空间,则发送操作立即完成:

c := make(chan int, 2) // 缓冲区大小是2
c <- 1  // 立即完成发生
c <- 2  // 立即完成发生
c <- 3  // 阻塞直到另一个goroutine执行<-c并且接收到1

当在创建通道时知道要发送的值的数量时,使用缓冲区可以简化代码。例如,我们可以重写gen以将整数列表复制到缓冲区通道中,并避免创建新的goroutine:

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

回到我们管道中被阻塞的goroutine,我们可以考虑在merge返回的出口通道中添加一个缓冲区:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int, 1) // 为未读输入提供足够的空间
    // ...剩余代码不改变...

虽然这修复了该程序中被阻塞的goroutine,但这是错误的代码。这里设置缓冲区大小为1,因为我们知道merge将接收的值的数量以及下游阶段将消耗的值的数量。这很脆弱:如果我们向gen传递一个额外的值,或者如果下游阶段读取的值更少,我们将再次阻塞goroutine。

相反,我们需要为下游阶段提供一种方法来向发送者表明他们将停止接受输入。

显式取消

main决定不从out接收所有值就退出时,它必须告诉上游阶段的goroutine放弃它们试图发送的值。它通过在一个名为done的通道上发送值来实现。它发送两个值,因为可能有两个被阻塞的发送者:

func main() {
    in := gen(2, 3)

    // 将sq工作分配给两个都从in读取的goroutine。
    c1 := sq(in)
    c2 := sq(in)

    // 从输出消费第一个值。
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // 告诉剩余的发送者我们准备离开了。
    done <- struct{}{}
    done <- struct{}{}
}

发送值的goroutine用一个select语句替换它们的发送操作,该语句在值发送给out时或者当它们从done接收到值时继续运行。done的值的类型是空结构体,因为值无关紧要:它只是一个接收事件,用来指示应该放弃给out继续发送值。输出值的goroutine继续在其入口通道c上循环,因此上游阶段不会被阻塞。(我们稍后会讨论如何让这个循环提前返回。)

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 为每个在cs中的输入通道启动一个output goroutine。output将值从c复制到out,直到c关闭或从done接收到值,然后output调用wg.Done。
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ...剩余代码不改变...

这种方法有一个问题:每个下游接收者都需要知道可能被阻塞的上游发送者的数量,并安排在提前返回时向这些发送者发出信号。跟踪这些数量既繁琐又容易出错。

我们需要一种方法来告诉未知且没有限定数量的goroutine停止向下游发送它们的值。在Go中,我们可以通过关闭通道来做到这一点,因为关闭通道上的接收操作总是可以立即进行,产生元素类型的零值

这意味着main可以通过关闭done通道来解除所有发送者的阻塞。这种关闭实际上是对所有发送者的一个广播信号。我们扩展了我们的每个管道函数以接受done作为参数,并通过defer语句安排关闭的发生,因此来自main的所有返回路径都会发出让管道阶段退出的信号。

func main() {
    // 设置一个由整个管道共享的done通道,并在该管道退出时关闭该通道,作为我们开启的所有goroutines的退出信号。
    done := make(chan struct{})
    defer close(done)          

    in := gen(done, 2, 3)

    // 将sq工作分配给两个都从in读取的goroutine。
    c1 := sq(done, in)
    c2 := sq(done, in)

    // 从output消耗第一个值。
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // done将被推迟的close调用关闭。
}

现在,一旦done关闭,我们的管道的每个阶段都可以自由返回。merge中的output例程可以在其入口通道没有排完水(不消费完入口通道的所有值)的情况下返回,因为它知道上游发送方sq将在done关闭时停止发送。output确保通过defer语句在所有返回路径上调用wg.Done

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 为每个cs中的输入通道启动一个output goroutine。output将值从c复制到out,直到c或done关闭,然后调用wg.Done。
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ...剩余代码不改变...

同样,sq可以在done关闭后立即返回。sq通过defer语句确保其输出通道在所有返回路径上都关闭:

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

(译者注:除了源头和水池,每个阶段都有一个或多个入口通道和一个或多个出口通道,用来接收或发送业务数据和错误信息,此外也可以有控制通道,用来接收或发送对通道的控制信息,这里的done就是控制通道。)

以下是管道建设的指导方针:

  • 当所有发送操作都完成后,阶段关闭它们的出口通道。
  • 阶段继续从入口通道接收值,直到这些通道关闭或发送者被解除阻塞。

管道通过确保所有发送的值有足够的缓冲区或者在接收者可能废弃通道时显式地向发送者发出信号来解除对发送者的阻塞。(译者注:本文通过done通道来控制阶段是否取消运行,是比较原始的方式,更加先进的方式是使用Go 1.14版本加入的context包提供的机制。)

计算一棵目录树里的所有文件的摘要值

让我们考虑一个更实际的管道。

MD5是一种消息摘要算法,可用作文件校验值。命令行实用程序md5sum打印文件列表的摘要值。

% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我们的示例程序与md5sum类似,但将单个目录作为参数,并打印该目录下每个常规文件的摘要值,按路径名排序。

% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我们程序的主函数调用一个辅助函数MD5All,它返回一个从路径名到摘要值的映射,然后对结果进行排序并打印:

func main() {
    // 计算指定目录下所有文件的MD5总和,然后按路径名排序打印结果。
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)
    }
}

MD5All函数是我们讨论的重点。在serial.go中,不使用并发,并且在遍历树时简单地读取和汇总每个文件。

// MD5All读取以root为根目录的文件树中的所有文件,并返回从文件路径到文件内容的MD5值的映射。如果目录遍历失败或任何读取操作失败,MD5All将返回错误。
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.Mode().IsRegular() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}

并行计算摘要值

parallel.go中,我们将MD5All拆分为两个阶段的管道。第一阶段sumFiles遍历目录树,在一个新的goroutine中计算每个文件的摘要值,并将结果发送到值类型为result的通道上:

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}

sumFiles返回两个通道:一个用于result,另一个用于filepath.Walk返回的错误。walk函数启动一个新的goroutine来处理每个常规文件,然后检查done。如果done已关闭,则立即停止遍历目录树:

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // 对于每个常规文件,启动一个对文件计算摘要值并将结果发送到c的 goroutine。在errc上发送walk的结果。
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // 如果done关闭了就停止遍历。
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk已返回,因此对wg.Add的所有调用都已完成。完成所有发送后,启动一个goroutine以关闭c。
        go func() {
            wg.Wait()
            close(c)
        }()
        // 这里不需要select,因为errc是有缓冲区的通道。
        errc <- err
    }()
    return c, errc
}

MD5Allc. MD5All接收摘要值。MD5All在出错时提前返回,通过defer关闭done

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All返回时关闭done通道;它可以在接收到来自c和errc的所有值之前这样做。
    done := make(chan struct{})
    defer close(done)          

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

有上界的并行数

parallel.go中的MD5All为每个文件启动一个新的goroutine。在包含许多大文件的目录中,这可能会分配比机器上可用的内存更多的内存。

我们可以通过限制并行读取的文件数量来限制内存分配。在bounded.go中,我们通过创建固定数量的goroutine来读取文件来做到这一点。我们的管道现在具有三个阶段:遍历目录树、读取和计算文件的摘要值以及收集摘要值。

第一阶段walkFiles发送目录树中常规文件的路径:

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // Walk返回时关闭paths通道。
        defer close(paths)
        // 这里的发送无需select,因为errc是有缓冲区的通道。
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}

中间阶段启动固定数量的digester goroutine,它们从paths接收文件名并在通道c上发送结果:

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}

与我们之前的示例不同,digester不会关闭其输出通道,因为多个goroutine在共享通道上发送。相反,MD5All中的代码安排在所有digester完成后关闭通道:

 // 启动固定数量的goroutine来读取和计算文件的摘要值。
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()

我们可以让每个digester创建并返回它自己的输出通道,但是我们需要额外的goroutine来扇入结果。

最后一个阶段接收来自c的所有results,然后检查来自errc的错误。此检查不能更早发生,因为在此之前,walkFiles可能会阻止向下游发送值:

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // 检查Walk是否失败。
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

总结

本文介绍了在Go中构建流数据管道的技术。处理此类管道中的故障很棘手,因为管道中的每个阶段都可能被阻塞而不能向下游发送值,并且下游阶段可能不再关心传入的数据值。我们展示了关闭通道如何向管道启动的所有goroutine广播“完成”信号,并定义了正确构建管道的准则。

进一步阅读: