本文翻译自《Advanced Go Concurrency Patterns》。
Sameer Ajmani
May 2013
1 高级Go并发模式
Sameer Ajmani
2 视频
该演讲于2013年5月在Google I/O上发表。
3 做好准备
4 Go支持并发
在语言和运行时(runtime)。不是一个库。
这会改变你构建程序的方式。
5 协程(Goroutine)和通道(Channel)
Goroutine是在同一地址空间中独立执行的函数。
go f()
go g(1, 2)
Channel是允许goroutine同步和交换信息的类型值。
c := make(chan int)
go func() { c <- 3 }()
n := <-c
有关基础知识的更多信息,请观看Go并发模式 (Pike, 2012)。
6 示例:乒乓球(ping-pong)
type Ball struct{ hits int }
func main() {
table := make(chan *Ball)
go player("ping", table)
go player("pong", table)
table <- new(Ball) // 开始游戏:投球
time.Sleep(1 * time.Second)
<-table // 结束游戏:抓住这个球
}
func player(name string, table chan *Ball) {
for {
ball := <-table
ball.hits++
fmt.Println(name, ball.hits)
time.Sleep(100 * time.Millisecond)
table <- ball
}
}
7 死锁检测
type Ball struct{ hits int }
func main() {
table := make(chan *Ball)
go player("ping", table)
go player("pong", table)
// table <- new(Ball) // 开始游戏:投球
time.Sleep(1 * time.Second)
<-table // 结束游戏:抓住这个球
}
func player(name string, table chan *Ball) {
for {
ball := <-table
ball.hits++
fmt.Println(name, ball.hits)
time.Sleep(100 * time.Millisecond)
table <- ball
}
}
8 使用panic输出堆栈信息
type Ball struct{ hits int }
func main() {
table := make(chan *Ball)
go player("ping", table)
go player("pong", table)
table <- new(Ball) // 开始游戏:投球
time.Sleep(1 * time.Second)
<-table // 结束游戏:抓住这个球
panic("show me the stacks")
}
func player(name string, table chan *Ball) {
for {
ball := <-table
ball.hits++
fmt.Println(name, ball.hits)
time.Sleep(100 * time.Millisecond)
table <- ball
}
}
9 走(go)很容易,但如何停下来?
长时间运行的程序需要清理。
让我们看看如何编写处理通信、周期性事件和取消的程序。
核心是Go的select语句:就像一个switch,但决定是基于通信能力做出的。
select {
case xc <- x:
// 把x发送到xc通道
case y := <-yc:
// 从yc通道接收y
}
10 示例:feed阅读器
我最喜欢的feed阅读器消失了。我需要一个新的。
为什么不写一个?
我们从哪里开始?
11 查找RSS客户端
在godoc.org中搜索“rss”会出现几个命中,其中一个提供:
// Fetch获取uri的Items并返回应该尝试下一次获取的时刻。失败时,Fetch返回错误。
func Fetch(uri string) (items []Item, next time.Time, err error)
type Item struct{
Title, Channel, GUID string // RSS字段的子集
}
但我想要一个流:
<-chan Item
我想要多个订阅。
12 这就是我们所拥有的
type Fetcher interface {
Fetch() (items []Item, next time.Time, err error)
}
func Fetch(domain string) Fetcher {...} // 从域名获取条目
13 这就是我们想要的
type Subscription interface {
Updates() <-chan Item // 条目流(stream of Items)
Close() error // 关闭流
}
func Subscribe(fetcher Fetcher) Subscription {...} // 把获取到的数据转换为一个流
func Merge(subs ...Subscription) Subscription {...} // 合并多个流
14 示例
func main() {
// 订阅一些feed,并创建一个合并的更新流。
merged := Merge(
Subscribe(Fetch("blog.golang.org")),
Subscribe(Fetch("googleblog.blogspot.com")),
Subscribe(Fetch("googledevelopers.blogspot.com")))
// 在一段时间后关闭所有订阅(subscription)。
time.AfterFunc(3*time.Second, func() {
fmt.Println("closed:", merged.Close())
})
// 输出流中数据。
for it := range merged.Updates() {
fmt.Println(it.Channel, it.Title)
}
panic("show me the stacks")
}
15 订阅(Subscribe)
Subscribe创建一个新的Subscription,它重复获取条目,直到调用Close。
func Subscribe(fetcher Fetcher) Subscription {
s := &sub{
fetcher: fetcher,
updates: make(chan Item), // for Updates
}
go s.loop()
return s
}
// sub实现了Subscription接口。
type sub struct {
fetcher Fetcher // 获取条目
updates chan Item // 把条目分发给用户
}
// loop使用s.fetcher方法获取条目并使用s.updates方法发送它们。当调用s.Close方法时,退出loop。
func (s *sub) loop() {...}
16 实现Subscription
要实现Subscription接口,请定义Updates方法和Close方法。
func (s *sub) Updates() <-chan Item {
return s.updates
}
func (s *sub) Close() error {
// TODO: 让loop退出
// TODO: 发现任何错误
return err
}
17 loop有什么作用?
- 定期调用Fetch
- 在Updates通道上发送获取的条目
- 调用Close时退出,并报告错误
18 幼稚的实现
for {
if s.closed {
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
s.err = err
time.Sleep(10 * time.Second)
continue
}
for _, item := range items {
s.updates <- item
}
if now := time.Now(); next.After(now) {
time.Sleep(next.Sub(now))
}
}
func (s *naiveSub) Close() error {
s.closed = true
return s.err
}
19 Bug 1:对s.closed/s.err的非同步访问
for {
if s.closed {
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
s.err = err
time.Sleep(10 * time.Second)
continue
}
for _, item := range items {
s.updates <- item
}
if now := time.Now(); next.After(now) {
time.Sleep(next.Sub(now))
}
}
func (s *naiveSub) Close() error {
s.closed = true
return s.err
}
20 竞态(Race)检测器
go run -race naivemain.go
for {
if s.closed {
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
s.err = err
func (s *naiveSub) Close() error {
s.closed = true
return s.err
}
21 Bug 2:time.Sleep可能会保持循环运行
for {
if s.closed {
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
s.err = err
time.Sleep(10 * time.Second)
continue
}
for _, item := range items {
s.updates <- item
}
if now := time.Now(); next.After(now) {
time.Sleep(next.Sub(now))
}
}
22 Bug 3:循环可能会在s.updates上永远阻塞
for {
if s.closed {
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
s.err = err
time.Sleep(10 * time.Second)
continue
}
for _, item := range items {
s.updates <- item
}
if now := time.Now(); next.After(now) {
time.Sleep(next.Sub(now))
}
}
23 解决方案
将循环体更改为具有三种情况的select:
- Close被调用时
- 调用Fetch时
- 在s.updates上发送条目
24 结构:for-select循环
循环在它自己的goroutine中运行。
select让循环避免在任何一种状态下无限期地阻塞。
func (s *sub) loop() {
... declare mutable state ...
for {
... set up channels for cases ...
select {
case <-c1:
... read/write state ...
case c2 <- x:
... read/write state ...
case y := <-c3:
... read/write state ...
}
}
}
这些case通过循环中的本地状态进行交互。
25 case 1:Close
Close通过s.closure与循环通信。
type sub struct {
closing chan chan error
}
服务(loop)在其通道(s.closure)上监听请求。
客户端(Close)在s.closure上发送请求:退出并回复错误
在这种情况下,请求中唯一的内容就是回复通道。
26 Case 1: Close
Close要求循环退出并等待响应。
func (s *sub) Close() error {
errc := make(chan error)
s.closing <- errc
return <-errc
}
循环通过回复Fetch错误来处理Close并退出。
var err error // 当Fetch失败时设置错误
for {
select {
case errc := <-s.closing:
errc <- err
close(s.updates) // 告诉接收者我们做完了
return
}
}
27 Case 2: Fetch
在延迟一段时间后安排下一次Fetch。
var pending []Item // 被fetch追加;被send消费
var next time.Time // 初始化值是January 1, year 0
var err error
for {
var fetchDelay time.Duration // 初始化值是0(没有延迟)
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
startFetch := time.After(fetchDelay)
select {
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
}
}
28 Case 3: Send
一次发送一个获取的条目。
var pending []Item // 被fetch追加;被send消费
for {
select {
case s.updates <- pending[0]:
pending = pending[1:]
}
}
哎呀。这崩溃了。
29 select和nil通道
在nil通道上发送和接收会被阻塞。
select从不选择阻塞的case。
func main() {
a, b := make(chan string), make(chan string)
go func() { a <- "a" }()
go func() { b <- "b" }()
if rand.Intn(2) == 0 {
a = nil
fmt.Println("nil a")
} else {
b = nil
fmt.Println("nil b")
}
select {
case s := <-a:
fmt.Println("got", s)
case s := <-b:
fmt.Println("got", s)
}
}
30 Case 3: Send (修复的)
仅在pending非空时启用send。
var pending []Item // 被fetch追加;被send消费
for {
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates // 启用send case
}
select {
case updates <- first:
pending = pending[1:]
}
}
31 select
将三个case放在一起:
select {
case errc := <-s.closing:
errc <- err
close(s.updates)
return
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
case updates <- first:
pending = pending[1:]
}
这些case通过err、next和pending交互。
没有锁,没有条件变量,没有回调函数。
32 已修复的Bug
- Bug 1:对s.closed和s.err的非同步(unsynchronized)访问
- Bug 2:time.Sleep可能会保持循环运行
- Bug 3:循环可能会永远阻塞在发送s.updates上
select {
case errc := <-s.closing:
errc <- err
close(s.updates)
return
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
case updates <- first:
pending = pending[1:]
}
33 我们可以进一步改进循环
34 问题:Fetch可能会返回重复项
var pending []Item
var next time.Time
var err error
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
35 修复:在添加到pending之前过滤条目
var pending []Item
var next time.Time
var err error
var seen = make(map[string]bool) // item.GUIDs集合
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
if !seen[item.GUID] {
pending = append(pending, item)
seen[item.GUID] = true
}
}
36 问题:pending队列无限制地增长
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
if !seen[item.GUID] {
pending = append(pending, item)
seen[item.GUID] = true
}
}
37 问题:pending队列无限制地增长
const maxPending = 10
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
var startFetch <-chan time.Time
if len(pending) < maxPending {
startFetch = time.After(fetchDelay) // 启用fetch case
}
可以改为从pending的头部删除较旧的条目。
38 问题:Fetch上的循环被阻塞
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
if !seen[item.GUID] {
pending = append(pending, item)
seen[item.GUID] = true
}
}
39 修复:异步运行Fetch
为fetchDone添加一个新的select case。
type fetchResult struct{ fetched []Item; next time.Time; err error }
var fetchDone chan fetchResult // 如果不是nil,则表示Fetch正在运行
var startFetch <-chan time.Time
if fetchDone == nil && len(pending) < maxPending {
startFetch = time.After(fetchDelay) // 启用fetch case
}
select {
case <-startFetch:
fetchDone = make(chan fetchResult, 1)
go func() {
fetched, next, err := s.fetcher.Fetch()
fetchDone <- fetchResult{fetched, next, err}
}()
case result := <-fetchDone:
fetchDone = nil
// 使用result.fetched, result.next, result.err
40 实现订阅
响应式。干净。易于阅读和更改。
三种技术:
- for-select循环
- 服务通道,回复通道(chan chan err)
- select case里的nil通道
更多详细信息在线,包括Merge。
41 总结
并发编程可能很棘手。
Go让它变得更容易:
- 通道传送数据、定时器事件、取消信号
- goroutines序列化对本地可变状态的访问
- 堆栈跟踪和死锁检测器
- 竞态检测器
42 链接
Go并发模式(2012)
go.dev/talks/2012/concurrency.slide
并发不是并行
golang.org/s/concurrency-is-not-parallelism
通过通信共享内存
golang.org/doc/codewalk/sharemem
Go Tour (在你的浏览器里学习Go)
43 致谢
Sameer Ajmani
http://profiles.google.com/ajmani
@Sajma
Your article gave me a lot of inspiration, I hope you can explain your point of view in more detail, because I have some doubts, thank you.