本文翻译自《Go Concurrency Patterns》,是个PPT。
1 Go Concurrency Patterns
Rob Pike
2 视频
这个演讲是在2012年6月在Google I/O大会上做的。
3 介绍
4 Go中的并发特性
当Go语言首次发布时,人们似乎对Go的并发特性着迷。
问题:
- 为什么支持并发?
- 什么是并发?
- 想法从何而来?
- 到底有什么好处呢?
- 我该如何使用它?
5 为什么?
看看你周围。你看到了什么?
你是否看到一个单步的世界一次只做一件事?
或者你是否看到了一个由相互作用、行为独立的片段组成的复杂世界?
这就是为什么。顺序处理(Sequential processing)本身并不能模拟世界的行为。
6 什么是并发?
并发是独立地执行计算的组合。
并发是一种构建软件的方式,特别是编写与现实世界交互良好的干净代码的方式。
它不是并行。
7 并发不是并行
并发不是并行,尽管它支持并行。
如果你只有一个处理器,你的程序仍然可以并发但不能并行。
另一方面,编写良好的并发程序可能会在多处理器上高效地并行运行。那个属性可能很重要…
有关该区别的更多信息,请参阅下面的链接。这里讨论的太多了。
golang.org/s/concurrency-is-not-parallelism
8 软件构建模型
容易理解。
便于使用。
容易推理。
你不需要成为专家!
(比处理并行的细节(线程、信号量、锁、屏障等)要好得多。)
9 历史
对许多人来说,Go的并发特性似乎很新。
但它们植根于悠久的历史,可以追溯到1978年Hoare的CSP甚至Dijkstra的守卫命令(1975)。
具有相似特征的语言:
- Occam (May, 1983)
- Erlang (Armstrong, 1986)
- Newsqueak (Pike, 1988)
- Concurrent ML (Reppy, 1993)
- Alef (Winterbottom, 1995)
- Limbo (Dorward, Pike, Winterbottom, 1996).
10 区别
Go是Newsqueak-Alef-Limbo分支的最新成员,以第一公民的通道而著称。
Erlang更接近于原始CSP,你可以通过名字而不是通过通道与进程通信。
这些模型是等效的,但表达方式不同。
粗略的类比:按名字写入文件(进程,Erlang)与写入文件描述符(通道,Go)。
11 基本例子
12 一个无聊的函数
我们需要一个例子来展示并发原语的有趣属性。
为了避免分心,我们把它作为一个无聊的例子。
func boring(msg string) {
for i := 0; ; i++ {
fmt.Println(msg, i)
time.Sleep(time.Second)
}
}
13 不那么无聊
使消息之间的间隔不可预测(仍不到一秒)。
func boring(msg string) {
for i := 0; ; i++ {
fmt.Println(msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}
14 运行它
无聊的功能永远运行,就像一个无聊的派对客人。
func main() {
boring("boring!")
}
func boring(msg string) {
for i := 0; ; i++ {
fmt.Println(msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}
15 忽略它
go语句照常运行函数,但不会让调用者等待。
它启动一个goroutine。 该功能类似于shell命令末尾的&。
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
go boring("boring!")
}
16 少一点忽略
当main返回时,程序退出并带走boring函数。
我们可以闲逛一下,在路上显示main和已启动的goroutine都在运行。
func main() {
go boring("boring!")
fmt.Println("I'm listening.")
time.Sleep(2 * time.Second)
fmt.Println("You're boring; I'm leaving.")
}
17 协程(Goroutine)
什么是goroutine?它是一个独立执行的函数,由go语句启动。
它有自己的调用堆栈,可以根据需要增长和缩小。
它非常廉价。实践中,拥有数千甚至数十万个goroutine是很常见的。
这不是一个线程。
一个包含数千个goroutine的程序中可能只有一个线程。
相反,goroutines会根据需要动态地多路复用到线程上,以保持所有goroutines运行。
但是,如果你将其视为非常廉价的线程,那么你将不会走太远。
18 通信(Communication)
我们boring的例子被骗了:main函数看不到其他goroutine的输出。
它只是打印到屏幕上,我们假装看到了对话。
真正的对话需要通信。
19 通道(Channel)
Go中的通道提供了两个goroutine之间的连接,允许它们进行通信。
// 声明和初始化。
var c chan int
c = make(chan int)
// 或者
c := make(chan int)
// 在通道上发送数据。
c <- 1
// 从通道里取数据。
// 箭头指明了数据流的方向。
value = <-c
20 使用通道
通道连接main和boring的goroutine,以便它们可以通信。
func main() {
c := make(chan string)
go boring("boring!", c)
for i := 0; i < 5; i++ {
fmt.Printf("You say: %q\n", <-c) // 接收表达式仅是一个值。
}
fmt.Println("You're boring; I'm leaving.")
}
func boring(msg string, c chan string) {
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i) // 将被发送的表达式可以是任意合适的值。
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}
21 同步(Synchronization)
当main函数执行<–c时,它会等待一个值被发送。
同样,当boring函数执行c <– value时,它会等待接收者准备好,再把值发送到通道。
发送者和接收者都必须准备好在通信中发挥自己的作用。否则我们会等待直到他们准备好。 因此,通道既可以通信又可以同步。
22 关于带缓冲区的通道的旁白
专家提示:Go也可以创建带缓冲区的通道。
缓冲区消除了同步。
通道的缓冲区使它们更像Erlang的邮箱。
带缓冲区的通道对于某些问题可能很重要,但它们需要更加微妙的推理。
我们今天不需要它们。
23 Go的使用方式
不要通过共享内存来通信,而是通过通信来共享内存。
24 模式
25 生成器:返回一个通道的函数
通道是一等公民(first-class)的值,就像字符串或整数一样。
c := boring("boring!") // 返回一个通道的函数。
for i := 0; i < 5; i++ {
fmt.Printf("You say: %q\n", <-c)
}
fmt.Println("You're boring; I'm leaving.")
func boring(msg string) <-chan string { // 返回只能用来接收值(receive-only)的通道
c := make(chan string)
go func() { // 我们在函数内部开启一个goroutine。
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // 把通道返回给本函数的调用者。
}
26 通道作为服务的句柄
我们的boring函数返回一个通道,让我们可以与它提供的boring服务进行通信。
我们可以有该服务的更多实例。
func main() {
joe := boring("Joe")
ann := boring("Ann")
for i := 0; i < 5; i++ {
fmt.Println(<-joe)
fmt.Println(<-ann)
}
fmt.Println("You're both boring; I'm leaving.")
}
27 多路复用
这些计划使Joe和Ann步调一致。
我们可以改为使用扇入功能让准备好的人说话。
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() { for { c <- <-input1 } }()
go func() { for { c <- <-input2 } }()
return c
}
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
fmt.Println("You're both boring; I'm leaving.")
}
28 扇入(Fan-in)
29 恢复顺序
在一个通道上发送一个通道,让goroutine等待轮到它。
接收所有消息,然后在专用通道上发送,再次启用它们。 首先,我们定义一个包含回复用的通道的消息类型。
type Message struct {
str string
wait chan bool
}
30 恢复顺序
每个发言者都必须等待批准。
for i := 0; i < 5; i++ {
msg1 := <-c; fmt.Println(msg1.str)
msg2 := <-c; fmt.Println(msg2.str)
msg1.wait <- true
msg2.wait <- true
}
waitForIt := make(chan bool) // 所有消息实例共享的通道。
// (译者注:以下是每个发言者协程的关键代码)
c <- Message{ fmt.Sprintf("%s: %d", msg, i), waitForIt }
time.Sleep(time.Duration(rand.Intn(2e3)) * time.Millisecond)
<-waitForIt
(译者注:上述程序实现每个发言者发言后,必须等待批准,才能继续运行下去。)
31 select
并发独有的控制结构。
和通道、goroutine一起内置在语言中。
32 select
select语句提供了另一种处理多个通道的方法。
这就像一个开关,但每个case都是一个通信:
– 评估所有通道。
– 阻塞选择,直到一个通信可以继续,然后它会继续。
– 如果有多个可以继续,伪随机选择其中一个。 – 如果没有一个通道准备好,则默认子句(如果存在)立即执行。
select {
case v1 := <-c1:
fmt.Printf("received %v from c1\n", v1)
case v2 := <-c2:
fmt.Printf("received %v from c2\n", v1)
case c3 <- 23:
fmt.Printf("sent %v to c3\n", 23)
default:
fmt.Printf("no one was ready to communicate\n")
}
33 再次扇入
重写我们原来的fanIn函数。实际上只需要一个goroutine就能实现。旧的:
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() { for { c <- <-input1 } }()
go func() { for { c <- <-input2 } }()
return c
}
34 使用select扇入
重写我们原来的fanIn函数。只需要一个goroutine就能实现。新的:
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() {
for {
select {
case s := <-input1: c <- s
case s := <-input2: c <- s
}
}
}()
return c
}
35 使用select超时
time.After函数返回一个阻塞指定持续时间段的通道。
在该时间段过去之后,通道将传递当前时间一次。
func main() {
c := boring("Joe")
for {
select {
case s := <-c:
fmt.Println(s)
case <-time.After(1 * time.Second):
fmt.Println("You're too slow.")
return
}
}
}
36 使用select的整个对话超时
在循环外创建一次计时器,以使整个对话超时。
(在前面的程序中,我们对每条消息都有一个超时。)
func main() {
c := boring("Joe")
timeout := time.After(5 * time.Second)
for {
select {
case s := <-c:
fmt.Println(s)
case <-timeout:
fmt.Println("You talk too much.")
return
}
}
}
37 退出通道
我们可以扭转这种局面,告诉Joe在我们听腻了他的时候停下来。
quit := make(chan bool)
c := boring("Joe", quit)
for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
quit <- true
select {
case c <- fmt.Sprintf("%s: %d", msg, i):
// do nothing
case <-quit:
return
}
38 从quit通道接收
我们怎么知道它已经完成了?等待它告诉我们它已经完成:在退出通道接收
quit := make(chan string)
c := boring("Joe", quit)
for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
quit <- "Bye!"
fmt.Printf("Joe says: %q\n", <-quit)
select {
case c <- fmt.Sprintf("%s: %d", msg, i):
// do nothing
case <-quit:
cleanup()
quit <- "See you!"
return
}
39 通道菊花链
func f(left, right chan int) {
left <- 1 + <-right
}
func main() {
const n = 10000
leftmost := make(chan int)
right := leftmost
left := leftmost
for i := 0; i < n; i++ {
right = make(chan int)
go f(left, right)
left = right
}
go func(c chan int) { c <- 1 }(right)
fmt.Println(<-leftmost)
}
40 中国耳语,地鼠风格
41 系统软件
Go是为编写系统软件而设计的。
让我们看看并发特性是如何发挥作用的。
42 示例:谷歌搜索
Q:谷歌搜索做什么?
A:给定一个查询,返回一页搜索结果(和一些广告)。
Q:我们如何获得搜索结果?
A:将查询发送到Web搜索、图像搜索、YouTube、地图、新闻等。然后混合结果。
我们如何实现这一点?
43 谷歌搜索:一个虚假的框架
我们可以模拟搜索功能,就像我们之前模拟对话一样。
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
type Search func(query string) Result
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
44 Google 搜索:测试这个框架
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
45 谷歌搜索 1.0
Google函数接受查询并返回结果片段(只是字符串)。
Google会依次调用Web、图像和视频搜索,并将它们附加到结果切片中。
func Google(query string) (results []Result) {
results = append(results, Web(query))
results = append(results, Image(query))
results = append(results, Video(query))
return
}
46 谷歌搜索 2.0
同时运行Web、图像和视频搜索,并等待所有结果。
没有锁。没有条件变量。没有回调。
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- Web(query) } ()
go func() { c <- Image(query) } ()
go func() { c <- Video(query) } ()
for i := 0; i < 3; i++ {
result := <-c
results = append(results, result)
}
return
}
47 谷歌搜索 2.1
不等待慢速服务器。没有锁。没有条件变量。没有回调。
c := make(chan Result)
go func() { c <- Web(query) } ()
go func() { c <- Image(query) } ()
go func() { c <- Video(query) } ()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return
(译者注:增加了超时逻辑。)
48 避免超时
问:我们如何避免丢弃来自慢速服务器的结果?
答:复制服务器。向多个副本发送请求,并使用第一个响应。
func First(query string, replicas ...Search) Result {
c := make(chan Result)
searchReplica := func(i int) { c <- replicas[i](query) }
for i := range replicas {
go searchReplica(i)
}
return <-c
}
49 使用第一个函数
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
result := First("golang",
fakeSearch("replica 1"),
fakeSearch("replica 2"))
elapsed := time.Since(start)
fmt.Println(result)
fmt.Println(elapsed)
}
50 谷歌搜索 3.0
使用复制的搜索服务器减少尾部延迟。
c := make(chan Result)
go func() { c <- First(query, Web1, Web2) } ()
go func() { c <- First(query, Image1, Image2) } ()
go func() { c <- First(query, Video1, Video2) } ()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return
51 还是…
没有锁。没有条件变量。没有回调。
52 总结
在几个简单的变换示例中,我们使用Go的并发原语来转换
- 缓慢的
- 顺序的
- 故障敏感的
程序为一个
- 快速的
- 并发的
- 有副本的
- 健壮的
程序。
53 更多派对技巧
使用这些工具有无穷无尽的方法,许多在别处都有介绍。
Chatroulette toy:
Load balancer:
Concurrent prime sieve:
Concurrent power series (by McIlroy):
54 不要过用
玩起来很有趣,但不要过度使用这些想法。
Goroutines和通道是大创意。它们是程序构建的工具。
但有时你只需要一个引用计数器。
Go有”sync”和”sync/atomic”包,它们提供互斥锁、条件变量等。它们为较小的问题提供了工具。
通常,这些东西会一起解决更大的问题。
始终为工作使用正确的工具。
55 结论
Goroutines和channels使得表达复杂的操作变得容易
- 多个输入
- 多个输出
- 超时
- 失败
而且它们使用起来很有趣。
56 链接
Go主页:
Go Tour (在浏览器里学习Go):
包(Package)文档:
Articles galore:
并发不是并行:
golang.org/s/concurrency-is-not-parallelism
57 致谢
Rob Pike
@rob_pike