Go并发模式

本文翻译自《Go Concurrency Patterns》,是个PPT。

1 Go Concurrency Patterns

Rob Pike

Google

2 视频

这个演讲是在2012年6月在Google I/O大会上做的。

可在YouTube上观看这个演讲

3 介绍

4 Go中的并发特性

当Go语言首次发布时,人们似乎对Go的并发特性着迷。

问题:

  • 为什么支持并发?
  • 什么是并发?
  • 想法从何而来?
  • 到底有什么好处呢?
  • 我该如何使用它?

5 为什么?

看看你周围。你看到了什么?

你是否看到一个单步的世界一次只做一件事?

或者你是否看到了一个由相互作用、行为独立的片段组成的复杂世界?

这就是为什么。顺序处理(Sequential processing)本身并不能模拟世界的行为。

6 什么是并发?

并发是独立地执行计算的组合。

并发是一种构建软件的方式,特别是编写与现实世界交互良好的干净代码的方式。

它不是并行。

7 并发不是并行

并发不是并行,尽管它支持并行。

如果你只有一个处理器,你的程序仍然可以并发但不能并行。

另一方面,编写良好的并发程序可能会在多处理器上高效地并行运行。那个属性可能很重要…

有关该区别的更多信息,请参阅下面的链接。这里讨论的太多了。

golang.org/s/concurrency-is-not-parallelism

8 软件构建模型

容易理解。

便于使用。

容易推理。

你不需要成为专家!

(比处理并行的细节(线程、信号量、锁、屏障等)要好得多。)

9 历史

对许多人来说,Go的并发特性似乎很新。

但它们植根于悠久的历史,可以追溯到1978年Hoare的CSP甚至Dijkstra的守卫命令(1975)。

具有相似特征的语言:

  • Occam (May, 1983)
  • Erlang (Armstrong, 1986)
  • Newsqueak (Pike, 1988)
  • Concurrent ML (Reppy, 1993)
  • Alef (Winterbottom, 1995)
  • Limbo (Dorward, Pike, Winterbottom, 1996).

10 区别

Go是Newsqueak-Alef-Limbo分支的最新成员,以第一公民的通道而著称。

Erlang更接近于原始CSP,你可以通过名字而不是通过通道与进程通信。

这些模型是等效的,但表达方式不同。

粗略的类比:按名字写入文件(进程,Erlang)与写入文件描述符(通道,Go)。

11 基本例子

12 一个无聊的函数

我们需要一个例子来展示并发原语的有趣属性。

为了避免分心,我们把它作为一个无聊的例子。

func boring(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(time.Second)
    }
}

13 不那么无聊

使消息之间的间隔不可预测(仍不到一秒)。

func boring(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

14 运行它

无聊的功能永远运行,就像一个无聊的派对客人。

func main() {
    boring("boring!")
}

func boring(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

15 忽略它

go语句照常运行函数,但不会让调用者等待。

它启动一个goroutine。 该功能类似于shell命令末尾的&。

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    go boring("boring!")
}

16 少一点忽略

当main返回时,程序退出并带走boring函数。

我们可以闲逛一下,在路上显示main和已启动的goroutine都在运行。

func main() {
    go boring("boring!")
    fmt.Println("I'm listening.")
    time.Sleep(2 * time.Second)
    fmt.Println("You're boring; I'm leaving.")
}

17 协程(Goroutine)

什么是goroutine?它是一个独立执行的函数,由go语句启动。

它有自己的调用堆栈,可以根据需要增长和缩小。

它非常廉价。实践中,拥有数千甚至数十万个goroutine是很常见的。

这不是一个线程。

一个包含数千个goroutine的程序中可能只有一个线程。

相反,goroutines会根据需要动态地多路复用到线程上,以保持所有goroutines运行。

但是,如果你将其视为非常廉价的线程,那么你将不会走太远。

18 通信(Communication)

我们boring的例子被骗了:main函数看不到其他goroutine的输出。

它只是打印到屏幕上,我们假装看到了对话。

真正的对话需要通信。

19 通道(Channel)

Go中的通道提供了两个goroutine之间的连接,允许它们进行通信。

// 声明和初始化。
var c chan int
c = make(chan int)
// 或者
c := make(chan int)
// 在通道上发送数据。
c <- 1
// 从通道里取数据。
// 箭头指明了数据流的方向。
value = <-c

20 使用通道

通道连接main和boring的goroutine,以便它们可以通信。

func main() {
    c := make(chan string)
    go boring("boring!", c)
    for i := 0; i < 5; i++ {
        fmt.Printf("You say: %q\n", <-c) // 接收表达式仅是一个值。
    }
    fmt.Println("You're boring; I'm leaving.")
}

func boring(msg string, c chan string) {
    for i := 0; ; i++ {
        c <- fmt.Sprintf("%s %d", msg, i) // 将被发送的表达式可以是任意合适的值。
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

21 同步(Synchronization)

当main函数执行<–c时,它会等待一个值被发送。

同样,当boring函数执行c <– value时,它会等待接收者准备好,再把值发送到通道。

发送者和接收者都必须准备好在通信中发挥自己的作用。否则我们会等待直到他们准备好。 因此,通道既可以通信又可以同步。

22 关于带缓冲区的通道的旁白

专家提示:Go也可以创建带缓冲区的通道。

缓冲区消除了同步。

通道的缓冲区使它们更像Erlang的邮箱。

带缓冲区的通道对于某些问题可能很重要,但它们需要更加微妙的推理。

我们今天不需要它们。

23 Go的使用方式

不要通过共享内存来通信,而是通过通信来共享内存。

24 模式

25 生成器:返回一个通道的函数

通道是一等公民(first-class)的值,就像字符串或整数一样。

c := boring("boring!") // 返回一个通道的函数。
    for i := 0; i < 5; i++ {
        fmt.Printf("You say: %q\n", <-c)
    }
    fmt.Println("You're boring; I'm leaving.")

func boring(msg string) <-chan string { // 返回只能用来接收值(receive-only)的通道
    c := make(chan string)
    go func() { // 我们在函数内部开启一个goroutine。
        for i := 0; ; i++ {
            c <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
        }
    }()
    return c // 把通道返回给本函数的调用者。
}

26 通道作为服务的句柄

我们的boring函数返回一个通道,让我们可以与它提供的boring服务进行通信。

我们可以有该服务的更多实例。

func main() {
    joe := boring("Joe")
    ann := boring("Ann")
    for i := 0; i < 5; i++ {
        fmt.Println(<-joe)
        fmt.Println(<-ann)
    }
    fmt.Println("You're both boring; I'm leaving.")
}

27 多路复用

这些计划使Joe和Ann步调一致。

我们可以改为使用扇入功能让准备好的人说话。

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() { for { c <- <-input1 } }()
    go func() { for { c <- <-input2 } }()
    return c
}
func main() {
    c := fanIn(boring("Joe"), boring("Ann"))
    for i := 0; i < 10; i++ {
        fmt.Println(<-c)
    }
    fmt.Println("You're both boring; I'm leaving.")
}

28 扇入(Fan-in)

29 恢复顺序

在一个通道上发送一个通道,让goroutine等待轮到它。

接收所有消息,然后在专用通道上发送,再次启用它们。 首先,我们定义一个包含回复用的通道的消息类型。

type Message struct {
    str string
    wait chan bool
}

30 恢复顺序

每个发言者都必须等待批准。

for i := 0; i < 5; i++ {
        msg1 := <-c; fmt.Println(msg1.str)
        msg2 := <-c; fmt.Println(msg2.str)
        msg1.wait <- true
        msg2.wait <- true
}

waitForIt := make(chan bool) // 所有消息实例共享的通道。

// (译者注:以下是每个发言者协程的关键代码)
    c <- Message{ fmt.Sprintf("%s: %d", msg, i), waitForIt }
    time.Sleep(time.Duration(rand.Intn(2e3)) * time.Millisecond)
    <-waitForIt

(译者注:上述程序实现每个发言者发言后,必须等待批准,才能继续运行下去。)

31 select

并发独有的控制结构。

和通道、goroutine一起内置在语言中。

32 select

select语句提供了另一种处理多个通道的方法。

这就像一个开关,但每个case都是一个通信:

– 评估所有通道。

– 阻塞选择,直到一个通信可以继续,然后它会继续。

– 如果有多个可以继续,伪随机选择其中一个。 – 如果没有一个通道准备好,则默认子句(如果存在)立即执行。

select {
    case v1 := <-c1:
        fmt.Printf("received %v from c1\n", v1)
    case v2 := <-c2:
        fmt.Printf("received %v from c2\n", v1)
    case c3 <- 23:
        fmt.Printf("sent %v to c3\n", 23)
    default:
        fmt.Printf("no one was ready to communicate\n")
    }

33 再次扇入

重写我们原来的fanIn函数。实际上只需要一个goroutine就能实现。旧的:

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() { for { c <- <-input1 } }()
    go func() { for { c <- <-input2 } }()
    return c
}

34 使用select扇入

重写我们原来的fanIn函数。只需要一个goroutine就能实现。新的:

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case s := <-input1:  c <- s
            case s := <-input2:  c <- s
            }
        }
    }()
    return c
}

35 使用select超时

time.After函数返回一个阻塞指定持续时间段的通道。

在该时间段过去之后,通道将传递当前时间一次。

func main() {
    c := boring("Joe")
    for {
        select {
        case s := <-c:
            fmt.Println(s)
        case <-time.After(1 * time.Second):
            fmt.Println("You're too slow.")
            return
        }
    }
}

36 使用select的整个对话超时

在循环外创建一次计时器,以使整个对话超时。

(在前面的程序中,我们对每条消息都有一个超时。)

func main() {
    c := boring("Joe")
    timeout := time.After(5 * time.Second)
    for {
        select {
        case s := <-c:
            fmt.Println(s)
        case <-timeout:
            fmt.Println("You talk too much.")
            return
        }
    }
}

37 退出通道

我们可以扭转这种局面,告诉Joe在我们听腻了他的时候停下来。

quit := make(chan bool)
    c := boring("Joe", quit)
    for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
quit <- true

select {
            case c <- fmt.Sprintf("%s: %d", msg, i):
                // do nothing
            case <-quit:
                return
            }

38 从quit通道接收

我们怎么知道它已经完成了?等待它告诉我们它已经完成:在退出通道接收

quit := make(chan string)
    c := boring("Joe", quit)
    for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
    quit <- "Bye!"
fmt.Printf("Joe says: %q\n", <-quit)

select {
            case c <- fmt.Sprintf("%s: %d", msg, i):
                // do nothing
            case <-quit:
                cleanup()
                quit <- "See you!"
                return
            }

39 通道菊花链

func f(left, right chan int) {
    left <- 1 + <-right
}

func main() {
    const n = 10000
    leftmost := make(chan int)
    right := leftmost
    left := leftmost
    for i := 0; i < n; i++ {
        right = make(chan int)
        go f(left, right)
        left = right
    }
    go func(c chan int) { c <- 1 }(right)
    fmt.Println(<-leftmost)
}

40 中国耳语,地鼠风格

41 系统软件

Go是为编写系统软件而设计的。

让我们看看并发特性是如何发挥作用的。

42 示例:谷歌搜索

Q:谷歌搜索做什么?

A:给定一个查询,返回一页搜索结果(和一些广告)。

Q:我们如何获得搜索结果?

A:将查询发送到Web搜索、图像搜索、YouTube、地图、新闻等。然后混合结果。

我们如何实现这一点?

43 谷歌搜索:一个虚假的框架

我们可以模拟搜索功能,就像我们之前模拟对话一样。

var (
    Web = fakeSearch("web")
    Image = fakeSearch("image")
    Video = fakeSearch("video")
)

type Search func(query string) Result

func fakeSearch(kind string) Search {
        return func(query string) Result {
              time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
              return Result(fmt.Sprintf("%s result for %q\n", kind, query))
        }
}

44 Google 搜索:测试这个框架

func main() {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    results := Google("golang")
    elapsed := time.Since(start)
    fmt.Println(results)
    fmt.Println(elapsed)
}

45 谷歌搜索 1.0

Google函数接受查询并返回结果片段(只是字符串)。

Google会依次调用Web、图像和视频搜索,并将它们附加到结果切片中。

func Google(query string) (results []Result) {
    results = append(results, Web(query))
    results = append(results, Image(query))
    results = append(results, Video(query))
    return
}

46 谷歌搜索 2.0

同时运行Web、图像和视频搜索,并等待所有结果。

没有锁。没有条件变量。没有回调。

func Google(query string) (results []Result) {
    c := make(chan Result)
    go func() { c <- Web(query) } ()
    go func() { c <- Image(query) } ()
    go func() { c <- Video(query) } ()

    for i := 0; i < 3; i++ {
        result := <-c
        results = append(results, result)
    }
    return
}

47 谷歌搜索 2.1

不等待慢速服务器。没有锁。没有条件变量。没有回调。

    c := make(chan Result)
    go func() { c <- Web(query) } ()
    go func() { c <- Image(query) } ()
    go func() { c <- Video(query) } ()

    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return

(译者注:增加了超时逻辑。)

48 避免超时

问:我们如何避免丢弃来自慢速服务器的结果?

答:复制服务器。向多个副本发送请求,并使用第一个响应。

func First(query string, replicas ...Search) Result {
    c := make(chan Result)
    searchReplica := func(i int) { c <- replicas[i](query) }
    for i := range replicas {
        go searchReplica(i)
    }
    return <-c
}

49 使用第一个函数

func main() {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    result := First("golang",
        fakeSearch("replica 1"),
        fakeSearch("replica 2"))
    elapsed := time.Since(start)
    fmt.Println(result)
    fmt.Println(elapsed)
}

50 谷歌搜索 3.0

使用复制的搜索服务器减少尾部延迟。

c := make(chan Result)
    go func() { c <- First(query, Web1, Web2) } ()
    go func() { c <- First(query, Image1, Image2) } ()
    go func() { c <- First(query, Video1, Video2) } ()
    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return

51 还是…

没有锁。没有条件变量。没有回调。

52 总结

在几个简单的变换示例中,我们使用Go的并发原语来转换

  • 缓慢的
  • 顺序的
  • 故障敏感的

程序为一个

  • 快速的
  • 并发的
  • 有副本的
  • 健壮的

程序。

53 更多派对技巧

使用这些工具有无穷无尽的方法,许多在别处都有介绍。

Chatroulette toy:

golang.org/s/chat-roulette

Load balancer:

golang.org/s/load-balancer

Concurrent prime sieve:

golang.org/s/prime-sieve

Concurrent power series (by McIlroy):

golang.org/s/power-series

54 不要过用

玩起来很有趣,但不要过度使用这些想法。

Goroutines和通道是大创意。它们是程序构建的工具。

但有时你只需要一个引用计数器。

Go有”sync”和”sync/atomic”包,它们提供互斥锁、条件变量等。它们为较小的问题提供了工具。

通常,这些东西会一起解决更大的问题。

始终为工作使用正确的工具。

55 结论

Goroutines和channels使得表达复杂的操作变得容易

  • 多个输入
  • 多个输出
  • 超时
  • 失败

而且它们使用起来很有趣。

56 链接

Go主页:

golang.org

Go Tour (在浏览器里学习Go):

tour.golang.org

包(Package)文档:

golang.org/pkg

Articles galore:

golang.org/doc

并发不是并行:

golang.org/s/concurrency-is-not-parallelism

57 致谢

Rob Pike

Google

http://golang.org/s/plusrob

@rob_pike

http://golang.org

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注