本文翻译自《Go Concurrency Patterns: Pipelines and cancellation》。
Sameer Ajmani
13 March 2014
介绍
Go的并发原语使构建数据流管道变得容易,从而有效地利用I/O和多个CPU。本文介绍了此类管道的示例,重点介绍了操作失败时出现的细微差别,并介绍了干净地处理故障的技术。
什么是管道?
Go中没有关于管道的正式定义。它只是多种并发程序中的一种。通俗地说,管道(pipeline)是由通道(channel)连接的一系列阶段(stage),其中每个阶段是一组运行相同功能的goroutine。在每个阶段,goroutines
- 通过入口(inbound)通道从上游接收值
- 对该数据执行某些功能,通常会产生新值
- 通过出口(outbound)通道向下游发送值
每个阶段都有任意数量的入口和出口通道,除了第一个阶段和最后一个阶段,它们分别只有出口或入口通道。第一阶段有时称为源头或生产者;最后一个阶段,称为水池(sink)或消费者。(译者注:一个阶段在代码实现上就是一个函数,入口通道或出口通道作为该函数的参数或返回值,在函数体内启动一个或多个goroutines来处理这一阶段的业务逻辑。见下文的各个例子。)
我们将从一个简单的示例管道开始来解释这些想法和技术。稍后,我们将展示一个更现实的例子。
平方数
考虑一个具有三个阶段的管道。
第一阶段gen
是一个函数,将一个整数列表转换为一个通道,发出列表中的整数。gen
函数启动一个goroutine,在通道上发出整数,并在发送完所有值后关闭该通道:
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
第二阶段sq
从通道接收整数并返回一个通道,该通道发出每个接收到的整数的平方值。在入口通道关闭并且此阶段已将所有值发送到下游后,它会关闭出口通道:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
main
函数设置管道并运行最后一个阶段:它从第二阶段接收值并打印输出每个值,直到通道关闭:
func main() {
// 安装这个管道。
c := gen(2, 3)
out := sq(c)
// 消费输出的值。
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}
由于sq
的入口通道和出口通道具有相同的类型,因此我们可以任意多次组合它。我们还可以将main
重写为范围循环(译者注:即使用for-range语句),就像其他阶段一样:
func main() {
// 安装管道并消费输出的值。
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n) // 16 then 81
}
}
扇出,扇入
多个函数可以从同一个通道读取,直到该通道关闭;这称为扇出(fan-out)。这提供了一种在一组工作者(workers)之间分配工作以并行化使用CPU和I/O的方法。
一个函数可以从多个输入中读取并继续执行,直到所有输入都关闭,方法是将输入通道多路复用到一个通道上,当所有输入都关闭时,该通道才关闭。这称为扇入(fan-in)。
我们可以改变我们的管道来运行sq
的两个实例,每个实例都从同一个输入通道读取。我们引入了一个新函数,merge
,以扇入结果:
func main() {
in := gen(2, 3)
// 将sq工作分配给两个都从in通道读取的goroutine。
c1 := sq(in)
c2 := sq(in)
// 消费来自c1和c2的合并输出。
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
merge
函数通过为每个入口通道启动一个goroutine,将值复制到唯一的出口通道,将通道列表转换为单个通道。一旦所有的输出goroutine都启动了,merge
会再启动一个goroutine以在该通道上的所有发送完成后关闭出口通道。
往关闭的通道发送值将引发panic,因此在调用close之前确保所有发送都完成很重要。sync.WaitGroup
类型提供了一种安排此同步的简单方法:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为每个cs中的输入通道启动一个输出goroutine。output将值从c复制到out,直到c关闭,然后调用wg.Done。
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// 一旦所有输出goroutines都完成任务,就启动一个goroutine来关闭out通道。这必须在调用wg.Add之后开始。
go func() {
wg.Wait()
close(out)
}()
return out
}
止损
我们的管道函数有一个模式:
- 当所有发送操作完成后,阶段关闭它们的出口通道。
- 阶段不断从入口通道接收值,直到这些通道关闭。
这种模式允许将每个接收阶段编写为range
循环,并确保一旦所有值都成功发送到下游,所有goroutines都会退出。
但在管道的实际使用中,并不总是接收所有入口值。有时这是设计使然:接收者可能只需要值的一个子集来取得进展。更常见的情况是,一个阶段提前退出,因为某个入口值表示前面阶段出现错误。在任何一种情况下,接收者都不应该等待剩余的值到达,我们希望前面阶段停止生产后面阶段不需要的值。
在我们的示例管道中,如果一个阶段未能消耗所有入口值,则尝试发送这些值的goroutine将无限期地阻塞:
// 消耗输出中的第一个值。
out := merge(c1, c2)
fmt.Println(<-out) // 4 or 9
return
// 由于我们没有从out接收第二个值,因此其中一个输出值的goroutine被挂起,试图发送值。
}
这是资源泄漏:goroutine消耗内存和运行时资源,goroutine堆栈中的堆引用防止数据被垃圾回收。Goroutines不会被垃圾回收;他们必须自行退出。
即使下游阶段无法接收所有入口值,我们也需要安排管道的上游阶段退出。一种方法是将出口通道更改为具有缓冲区的通道。一个缓冲区可以保存固定数量的值;如果缓冲区中有空间,则发送操作立即完成:
c := make(chan int, 2) // 缓冲区大小是2
c <- 1 // 立即完成发生
c <- 2 // 立即完成发生
c <- 3 // 阻塞直到另一个goroutine执行<-c并且接收到1
当在创建通道时知道要发送的值的数量时,使用缓冲区可以简化代码。例如,我们可以重写gen
以将整数列表复制到缓冲区通道中,并避免创建新的goroutine:
func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
回到我们管道中被阻塞的goroutine,我们可以考虑在merge
返回的出口通道中添加一个缓冲区:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1) // 为未读输入提供足够的空间
// ...剩余代码不改变...
虽然这修复了该程序中被阻塞的goroutine,但这是错误的代码。这里设置缓冲区大小为1,因为我们知道merge
将接收的值的数量以及下游阶段将消耗的值的数量。这很脆弱:如果我们向gen
传递一个额外的值,或者如果下游阶段读取的值更少,我们将再次阻塞goroutine。
相反,我们需要为下游阶段提供一种方法来向发送者表明他们将停止接受输入。
显式取消
当main
决定不从out
接收所有值就退出时,它必须告诉上游阶段的goroutine放弃它们试图发送的值。它通过在一个名为done
的通道上发送值来实现。它发送两个值,因为可能有两个被阻塞的发送者:
func main() {
in := gen(2, 3)
// 将sq工作分配给两个都从in读取的goroutine。
c1 := sq(in)
c2 := sq(in)
// 从输出消费第一个值。
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// 告诉剩余的发送者我们准备离开了。
done <- struct{}{}
done <- struct{}{}
}
发送值的goroutine用一个select
语句替换它们的发送操作,该语句在值发送给out
时或者当它们从done
接收到值时继续运行。done
的值的类型是空结构体,因为值无关紧要:它只是一个接收事件,用来指示应该放弃给out
继续发送值。输出值的goroutine继续在其入口通道c上循环,因此上游阶段不会被阻塞。(我们稍后会讨论如何让这个循环提前返回。)
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为每个在cs中的输入通道启动一个output goroutine。output将值从c复制到out,直到c关闭或从done接收到值,然后output调用wg.Done。
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}
// ...剩余代码不改变...
这种方法有一个问题:每个下游接收者都需要知道可能被阻塞的上游发送者的数量,并安排在提前返回时向这些发送者发出信号。跟踪这些数量既繁琐又容易出错。
我们需要一种方法来告诉未知且没有限定数量的goroutine停止向下游发送它们的值。在Go中,我们可以通过关闭通道来做到这一点,因为关闭通道上的接收操作总是可以立即进行,产生元素类型的零值。
这意味着main
可以通过关闭done
通道来解除所有发送者的阻塞。这种关闭实际上是对所有发送者的一个广播信号。我们扩展了我们的每个管道函数以接受done
作为参数,并通过defer
语句安排关闭的发生,因此来自main
的所有返回路径都会发出让管道阶段退出的信号。
func main() {
// 设置一个由整个管道共享的done通道,并在该管道退出时关闭该通道,作为我们开启的所有goroutines的退出信号。
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// 将sq工作分配给两个都从in读取的goroutine。
c1 := sq(done, in)
c2 := sq(done, in)
// 从output消耗第一个值。
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// done将被推迟的close调用关闭。
}
现在,一旦done
关闭,我们的管道的每个阶段都可以自由返回。merge
中的output
例程可以在其入口通道没有排完水(不消费完入口通道的所有值)的情况下返回,因为它知道上游发送方sq
将在done
关闭时停止发送。output
确保通过defer
语句在所有返回路径上调用wg.Done
:
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为每个cs中的输入通道启动一个output goroutine。output将值从c复制到out,直到c或done关闭,然后调用wg.Done。
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
// ...剩余代码不改变...
同样,sq
可以在done
关闭后立即返回。sq
通过defer
语句确保其输出通道在所有返回路径上都关闭:
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
(译者注:除了源头和水池,每个阶段都有一个或多个入口通道和一个或多个出口通道,用来接收或发送业务数据和错误信息,此外也可以有控制通道,用来接收或发送对通道的控制信息,这里的done
就是控制通道。)
以下是管道建设的指导方针:
- 当所有发送操作都完成后,阶段关闭它们的出口通道。
- 阶段继续从入口通道接收值,直到这些通道关闭或发送者被解除阻塞。
管道通过确保所有发送的值有足够的缓冲区或者在接收者可能废弃通道时显式地向发送者发出信号来解除对发送者的阻塞。(译者注:本文通过done
通道来控制阶段是否取消运行,是比较原始的方式,更加先进的方式是使用Go 1.14版本加入的context
包提供的机制。)
计算一棵目录树里的所有文件的摘要值
让我们考虑一个更实际的管道。
MD5是一种消息摘要算法,可用作文件校验值。命令行实用程序md5sum打印文件列表的摘要值。
% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我们的示例程序与md5sum
类似,但将单个目录作为参数,并打印该目录下每个常规文件的摘要值,按路径名排序。
% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我们程序的主函数调用一个辅助函数MD5All
,它返回一个从路径名到摘要值的映射,然后对结果进行排序并打印:
func main() {
// 计算指定目录下所有文件的MD5总和,然后按路径名排序打印结果。
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}
MD5All
函数是我们讨论的重点。在serial.go中,不使用并发,并且在遍历树时简单地读取和汇总每个文件。
// MD5All读取以root为根目录的文件树中的所有文件,并返回从文件路径到文件内容的MD5值的映射。如果目录遍历失败或任何读取操作失败,MD5All将返回错误。
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
m[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return m, nil
}
并行计算摘要值
在parallel.go中,我们将MD5All
拆分为两个阶段的管道。第一阶段sumFiles
遍历目录树,在一个新的goroutine中计算每个文件的摘要值,并将结果发送到值类型为result
的通道上:
type result struct {
path string
sum [md5.Size]byte
err error
}
sumFiles
返回两个通道:一个用于result
,另一个用于filepath.Walk
返回的错误。walk
函数启动一个新的goroutine来处理每个常规文件,然后检查done
。如果done
已关闭,则立即停止遍历目录树:
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// 对于每个常规文件,启动一个对文件计算摘要值并将结果发送到c的 goroutine。在errc上发送walk的结果。
c := make(chan result)
errc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// 如果done关闭了就停止遍历。
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// Walk已返回,因此对wg.Add的所有调用都已完成。完成所有发送后,启动一个goroutine以关闭c。
go func() {
wg.Wait()
close(c)
}()
// 这里不需要select,因为errc是有缓冲区的通道。
errc <- err
}()
return c, errc
}
MD5All
从c. MD5All
接收摘要值。MD5All
在出错时提前返回,通过defer
关闭done
:
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All返回时关闭done通道;它可以在接收到来自c和errc的所有值之前这样做。
done := make(chan struct{})
defer close(done)
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
有上界的并行数
parallel.go中的MD5All
为每个文件启动一个新的goroutine。在包含许多大文件的目录中,这可能会分配比机器上可用的内存更多的内存。
我们可以通过限制并行读取的文件数量来限制内存分配。在bounded.go中,我们通过创建固定数量的goroutine来读取文件来做到这一点。我们的管道现在具有三个阶段:遍历目录树、读取和计算文件的摘要值以及收集摘要值。
第一阶段walkFiles
发送目录树中常规文件的路径:
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() {
// Walk返回时关闭paths通道。
defer close(paths)
// 这里的发送无需select,因为errc是有缓冲区的通道。
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
})
}()
return paths, errc
}
中间阶段启动固定数量的digester
goroutine,它们从paths
接收文件名并在通道c
上发送结果:
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
与我们之前的示例不同,digester
不会关闭其输出通道,因为多个goroutine在共享通道上发送。相反,MD5All
中的代码安排在所有digester
完成后关闭通道:
// 启动固定数量的goroutine来读取和计算文件的摘要值。
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
我们可以让每个digester
创建并返回它自己的输出通道,但是我们需要额外的goroutine来扇入结果。
最后一个阶段接收来自c
的所有results
,然后检查来自errc
的错误。此检查不能更早发生,因为在此之前,walkFiles
可能会阻止向下游发送值:
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// 检查Walk是否失败。
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
总结
本文介绍了在Go中构建流数据管道的技术。处理此类管道中的故障很棘手,因为管道中的每个阶段都可能被阻塞而不能向下游发送值,并且下游阶段可能不再关心传入的数据值。我们展示了关闭通道如何向管道启动的所有goroutine广播“完成”信号,并定义了正确构建管道的准则。
进一步阅读:
- Go并发模式(视频)介绍了Go并发原语的基础知识以及应用它们的几种方法。
- 高级Go并发模式(视频)涵盖了Go原语的更复杂用途,尤其是
select
。 - Douglas McIlroy的论文Squinting at Power Series展示了类似Go的并发如何为复杂的计算提供优雅的支持。