Go:优雅成长的代码

本文翻译自《Go: code that grows with grace》,是个PPT。

Andrew Gerrand

Google Sydney

1 Go:优雅成长的代码

Andrew Gerrand

Google Sydney

2 视频

本次演讲的视频于2012年11月在瑞典马尔默的Øredev录制。

在Vimeo上观看演讲

3 Go

你可能听说过Go。

这是我最喜欢的语言。我想你也会喜欢的。

4 什么是Go?

一个开源(BSD 许可)项目:

  • 语言规范,
  • 小型运行时(垃圾收集器、调度器等),
  • 两个编译器(gc和gccgo),
  • “包括电池”标准库,
  • 工具(构建、获取、测试、文档、配置文件、格式),
  • 文档。

截至2012年9月,我们有300多名贡献者。

5 Go是关于组合的

Go是面向对象的,但不是以通常的方式实现。

  • 没有类(可以在任何类型上声明方法)
  • 没有子类继承
  • 隐式满足接口(结构类型)

结果:通过小接口连接起来的多个简单部件。

6 Go是关于并发的

Go提供了类似CSP的并发原语。

  • 轻量级线程(协程(goroutines))
  • 类型化线程安全通信和同步(通道(channels))

结果:可理解的并发代码。

7 Go是关于地鼠的

8 核心价值

Go是关于组合、并发和地鼠的。

记在脑子里。

9 Hello, go

package main

import "fmt"

func main() {
    fmt.Println("Hello, go")
}

10 Hello, net

package main

import (
    "fmt"
    "log"
    "net"
)

const listenAddr = "localhost:4000"

func main() {
    l, err := net.Listen("tcp", listenAddr)
    if err != nil {
        log.Fatal(err)
    }
    for {
        c, err := l.Accept()
        if err != nil {
            log.Fatal(err)
        }
        fmt.Fprintln(c, "Hello!")
        c.Close()
    }
}

11 接口

嘿内托!我们只是使用Fprintln写入网络连接。

这是因为Fprintln写入io.Writer,而net.Conn实现了io.Writer接口。

        fmt.Fprintln(c, "Hello!")

func Fprintln(w io.Writer, a ...interface{}) (n int, err error)

type Writer interface {
    Write(p []byte) (n int, err error)
}

type Conn interface {
    Read(b []byte) (n int, err error)
    Write(b []byte) (n int, err error)
    Close() error
    // ... 省略了一些额外函数 ...
}

12 回声服务器

package main

import (
    "io"
    "log"
    "net"
)

const listenAddr = "localhost:4000"

func main() {
    l, err := net.Listen("tcp", listenAddr)
    if err != nil {
        log.Fatal(err)
    }
    for {
        c, err := l.Accept()
        if err != nil {
            log.Fatal(err)
        }
        io.Copy(c, c)
    }
}

13 深入了解io.Copy

        io.Copy(c, c)

// 将副本从src复制到dst,直到在src上达到EOF或发生错误。它返回复制的字节数和复制时遇到的第一个错误(如果有)。
func Copy(dst Writer, src Reader) (written int64, err error)

type Conn interface {
    Read(b []byte) (n int, err error)
    Write(b []byte) (n int, err error)
    Close() error
    // ... 省略了一些额外函数 ...
}

type Writer interface {
    Write(p []byte) (n int, err error)
}

type Reader interface {
    Read(p []byte) (n int, err error)
}

14 协程(Goroutine)

Goroutine是由Go运行时管理的轻量级线程。要在新的goroutine中运行函数,只需在函数调用之前加上“go”关键字。

package main

import (
    "fmt"
    "time"
)

func main() {
    go say("let's go!", 3)
    go say("ho!", 2)
    go say("hey!", 1)
    time.Sleep(4 * time.Second)
}

func say(text string, secs int) {
    time.Sleep(time.Duration(secs) * time.Second)
    fmt.Println(text)
}

15 并发版本的回声服务器

package main

import (
    "io"
    "log"
    "net"
)

const listenAddr = "localhost:4000"

func main() {
    l, err := net.Listen("tcp", listenAddr)
    if err != nil {
        log.Fatal(err)
    }
    for {
        c, err := l.Accept()
        if err != nil {
            log.Fatal(err)
        }
        go io.Copy(c, c)
    }
}

16 “聊天轮盘”

在本次演讲中,我们将看一个基于流行的“聊天轮盘”网站的简单程序。

简而言之:

  • 用户连接,
  • 另一个用户连接,
  • 每一个用户输入的所有内容都会发送给另一个用户。

17 设计

聊天程序类似于回声程序。使用echo,我们将连接的传入数据复制回到同一连接。

对于聊天,我们必须将传入数据从一个用户的连接复制到另一个用户的连接。

复制数据很容易。就像在现实生活中一样,困难的部分是将一个用户与另一个用户匹配。

18 设计图

19 通道(Channel)

Goroutines通过通道进行通信。通道是一个类型化的管道,可以是同步的(无缓冲的)或异步的(有缓冲的)。

package main

import "fmt"

func main() {
    ch := make(chan int)
    go fibs(ch)
    for i := 0; i < 20; i++ {
        fmt.Println(<-ch)
    }
}

func fibs(ch chan int) {
    i, j := 0, 1
    for {
        ch <- j
        i, j = j, i+j
    }
}

20 选择(select)

一个select语句就像一个开关,但它选择通道的操作(并选择其中一个)。

package main

import (
    "fmt"
    "time"
)

func main() {
    ticker := time.NewTicker(time.Millisecond * 250)
    boom := time.After(time.Second * 1)
    for {
        select {
        case <-ticker.C:
            fmt.Println("tick")
        case <-boom:
            fmt.Println("boom!")
            return
        }
    }
}

21 修改回声程序以创建聊天程序

在Accept循环中,我们替换了对io.Copy的调用:

    for {
        c, err := l.Accept()
        if err != nil {
            log.Fatal(err)
        }
        go io.Copy(c, c)
    }

通过调用新函数,match:

    for {
        c, err := l.Accept()
        if err != nil {
            log.Fatal(err)
        }
        go match(c)
    }

22 匹配器

match函数尝试在通道上同时发送和接收连接。

  • 如果发送成功,则连接已移交给另一个goroutine,因此函数退出并且goroutine关闭。
  • 如果接收成功,则表示已从另一个goroutine接收到连接。然后当前的goroutine已有两个连接,因此它在它们之间启动一个聊天会话。
var partner = make(chan io.ReadWriteCloser)

func match(c io.ReadWriteCloser) {
    fmt.Fprint(c, "Waiting for a partner...")
    select {
    case partner <- c:
        // 现在由另一个goroutine处理
    case p := <-partner:
        chat(p, c)
    }
}

23 交谈

聊天功能向每个连接发送问候,然后将数据从一个连接复制到另一个,反之亦然。

请注意,它启动了另一个goroutine,以便复制操作可能同时发生。

func chat(a, b io.ReadWriteCloser) {
    fmt.Fprintln(a, "Found one! Say hi.")
    fmt.Fprintln(b, "Found one! Say hi.")
    go io.Copy(a, b)
    io.Copy(b, a)
}

24 演示(Demo)

25 错误处理

谈话结束后进行清理很重要。为此,我们将每个io.Copy调用的错误值发送到通道,记录任何非零值错误,并关闭两个连接。

func chat(a, b io.ReadWriteCloser) {
    fmt.Fprintln(a, "Found one! Say hi.")
    fmt.Fprintln(b, "Found one! Say hi.")
    errc := make(chan error, 1)
    go cp(a, b, errc)
    go cp(b, a, errc)
    if err := <-errc; err != nil {
        log.Println(err)
    }
    a.Close()
    b.Close()
}

func cp(w io.Writer, r io.Reader, errc chan<- error) {
    _, err := io.Copy(w, r)
    errc <- err
}

26 演示

27 把它带到网上

“可爱的程序,”你说,“但是谁想通过原始TCP连接来聊天呢?”

好观点。让我们通过将其转变为Web应用程序来对其进行现代化改造。

我们将使用websocket代替TCP套接字。

我们将使用Go的标准net/http包提供用户界面,并且websocket支持由go.net子存储库中的websocket包提供。

28 Hello, web

package main

import (
    "fmt"
    "log"
    "net/http"
)

const listenAddr = "localhost:4000"

func main() {
    http.HandleFunc("/", handler)
    err := http.ListenAndServe(listenAddr, nil)
    if err != nil {
        log.Fatal(err)
    }
}

func handler(w http.ResponseWriter, r *http.Request) {
    fmt.Fprint(w, "Hello, web")
}

29 Hello, WebSocket

var sock = new WebSocket("ws://localhost:4000/");
sock.onmessage = function(m) { console.log("Received:", m.data); }
sock.send("Hello!\n")

package main

import (
    "fmt"
    "golang.org/x/net/websocket"
    "net/http"
)

func main() {
    http.Handle("/", websocket.Handler(handler))
    http.ListenAndServe("localhost:4000", nil)
}

func handler(c *websocket.Conn) {
    var s string
    fmt.Fscan(c, &s)
    fmt.Println("Received:", s)
    fmt.Fprint(c, "How do you do?")
}

30 使用http和websocket包

package main

import (
    "fmt"
    "io"
    "log"
    "net/http"

    "golang.org/x/net/websocket"
)

const listenAddr = "localhost:4000"

func main() {
    http.HandleFunc("/", rootHandler)
    http.Handle("/socket", websocket.Handler(socketHandler))
    err := http.ListenAndServe(listenAddr, nil)
    if err != nil {
        log.Fatal(err)
    }
}

31 提供HTML和JavaScript

import "html/template"
func rootHandler(w http.ResponseWriter, r *http.Request) {
    rootTemplate.Execute(w, listenAddr)
}

var rootTemplate = template.Must(template.New("root").Parse(`
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<script>
    websocket = new WebSocket("ws://{{.}}/socket");
    websocket.onmessage = onMessage;
    websocket.onclose = onClose;
</html>
`))

32 添加套接字(socket)类型

我们不能只使用websocket.Conn而不是net.Conn,因为websocket.Conn由其处理函数保持打开状态。在这里,我们使用一个通道来保持处理程序运行,直到调用套接字的Close方法。

type socket struct {
    conn *websocket.Conn
    done chan bool
}

func (s socket) Read(b []byte) (int, error)  { return s.conn.Read(b) }
func (s socket) Write(b []byte) (int, error) { return s.conn.Write(b) }

func (s socket) Close() error {
    s.done <- true
    return nil
}

func socketHandler(ws *websocket.Conn) {
    s := socket{conn: ws, done: make(chan bool)}
    go match(s)
    <-s.done
}

33 结构嵌入

Go支持一种具有“结构嵌入”特性的“混合”功能。被嵌入结构的类型(下例中的B)可以直接调用嵌入结构(下例中的A)的方法(下例中的Hello)。

type A struct{}

func (A) Hello() {
    fmt.Println("Hello!")
}

type B struct {
    A
}

// func (b B) Hello() { b.A.Hello() } // (隐式的!)

func main() {
    var b B
    b.Hello()
}

34 嵌入websocket连接

通过将*websocket.Conn嵌入io.ReadWriter,我们可以删除套接字显式的Read和Write方法。

type socket struct {
    io.ReadWriter
    done          chan bool
}

func (s socket) Close() error {
    s.done <- true
    return nil
}

func socketHandler(ws *websocket.Conn) {
    s := socket{ws, make(chan bool)}
    go match(s)
    <-s.done
}

35 演示

36 缓解孤独

如果你连接了,但那里没有人怎么办?

如果我们能合成一个聊天伙伴不是很好吗?

我们开始做吧。

37 使用马尔可夫链生成文本

Source
"I am not a number! I am a free man!"

Prefix           Suffix 
"" ""            "I"
"" "I"           "am"
"I" "am"         "a"
"I" "am"         "not"
"a" "free"       "man!"
"am" "a"         "free"
"am" "not"       "a"
"a" "number!"    "I"
"number!" "I"    "am"
"not" "a"        "number!"

Generated sentences beginning with the prefix "I am"
"I am a free man!"
"I am not a number! I am a free man!"
"I am not a number! I am not a number! I am a free man!"
"I am not a number! I am not a number! I am not a number! I am a free man!"

38 使用马尔可夫链生成文本

幸运的是,Go doc包含一个马尔可夫链实现:

golang.org/doc/codewalk/markov

我们将使用经过修改以确保并发使用的版本。

// Chain包含前缀到后缀列表的映射map ("chain")。
// 前缀是由空格连接的prefixLen长度的字符串串。
// 后缀是一个单词。一个前缀可以有多个后缀。
type Chain struct {

// Write将字节解析为存储在Chain中的前缀和后缀。
func (c *Chain) Write(b []byte) (int, error) {

// Generate返回从Chain生成的最多n个单词的字符串。
func (c *Chain) Generate(n int) string {

39 给链(chan)喂食

我们将使用进入系统的所有文本来构建马尔可夫链。

为此,我们将套接字的ReadWriter拆分为Reader和Writer,并将所有传入数据提供给Chain实例。

type socket struct {
    io.Reader
    io.Writer
    done chan bool
}

var chain = NewChain(2) // 2个单词前缀

func socketHandler(ws *websocket.Conn) {
    r, w := io.Pipe()
    go func() {
        _, err := io.Copy(io.MultiWriter(w, chain), ws)
        w.CloseWithError(err)
    }()
    s := socket{r, ws, make(chan bool)}
    go match(s)
    <-s.done
}

40 马尔可夫机器人

// Bot返回一个io.ReadWriteCloser,它使用生成的句子响应给每个传入的写入。
func Bot() io.ReadWriteCloser {
    r, out := io.Pipe() // 用于传出数据
    return bot{r, out}
}

type bot struct {
    io.ReadCloser
    out io.Writer
}

func (b bot) Write(buf []byte) (int, error) {
    go b.speak()
    return len(buf), nil
}

func (b bot) speak() {
    time.Sleep(time.Second)
    msg := chain.Generate(10) // 最多10个单词
    b.out.Write([]byte(msg))
}

41 集成马尔可夫机器人

如果真正的伙伴不加入,那么加入一个机器人。

为此,我们在5秒后触发的select中添加一个case,开始用户的套接字和机器人之间的聊天。

func match(c io.ReadWriteCloser) {
    fmt.Fprint(c, "Waiting for a partner...")
    select {
    case partner <- c:
        // 现在由另一个goroutine处理
    case p := <-partner:
        chat(p, c)
    case <-time.After(5 * time.Second):
        chat(Bot(), c)
    }
}

42 演示

43 还有一件事情

44 同时使用TCP和HTTP

func main() {
    go netListen()
    http.HandleFunc("/", rootHandler)
    http.Handle("/socket", websocket.Handler(socketHandler))
    err := http.ListenAndServe(listenAddr, nil)
    if err != nil {
        log.Fatal(err)
    }
}

func netListen() {
    l, err := net.Listen("tcp", "localhost:4001")
    if err != nil {
        log.Fatal(err)
    }
    for {
        c, err := l.Accept()
        if err != nil {
            log.Fatal(err)
        }
        go match(c)
    }
}

45 演示

46 讨论

47 进一步阅读

关于Go:

golang.org

本次演讲的幻灯片:

go.dev/talks/2012/chat.slide

Rob Pike的“Go并发模式”:

golang.org/s/concurrency-patterns

48 致谢

Andrew Gerrand

Google Sydney

http://andrewgerrand.com/

@enneff

http://golang.org/

好的程序员有三种美德

Perl语言的发明人Larry Wall说,好的程序员有三种美德: 懒惰、急躁和傲慢(Laziness, Impatience and hubris)。

懒惰: 是这样一种品质,它使得你花大力气去避免消耗过多的精力。它敦促你写出节省体力的程序,同时别人也能利用它们。为此你会写出完善的文档,以免别人问你太多问题。

急躁: 是这样一种愤怒—-当你发现计算机懒洋洋地不给出结果。于是你写出更优秀的代码,能尽快真正的解决问题。至少看上去是这样。

傲慢: 极度的自信,使你有信心写出(或维护)别人挑不出毛病的程序。

参考

https://www.zhihu.com/question/435258463/answer/2648273894

对大公司和小公司的批判和认知

美团的二号位王慧文把大公司和小公司的批判和认知太到位,太戳心了。

给大家参考。

大公司的难处在于:

每个人都有职级有晋升的需求,而晋升的答辨委员会通常是被大的成熟的业务的老板们把持的,新业务线的员工在公司通常地位不高,而在发展速度快的业务线晋升也快,发展慢的业务线很难晋升,因为评审委员对新业务不太懂,如果有成果就会高抬贵手,没有成果就不给过。此外大公司内部有业务线之间的人员流动,所以大公司里大家都在追涨杀跌,一个业务势头好大家都过去了,势头不好大家都走了,如果一个业务长期没有进展,里面剩下的都是能力不足的人,即使机会来了也做不成

小公司的难处在于:

搞一段时间没有进展,团队里优秀的人会不断被人挖走。 第一个人被挖走的时候你觉得这哥们叛变革命了,多走几个人你就觉得革命叛变了自己,你就会质疑自己是方向选错了,还是行业选错了,还是做法有问题,还是自己能力不够,还是资源不够,还是投资人不行,会陷入自我否定。 此外帮人一起创业总有一个领头的,公司里领头的通常就是 CEO, CEO 平时要见投资人见媒体要招人,慢慢精力就不在业务上了,而 CTO 是实际管事的,业务发展方向是 CEO 定的,千了一段时间如果没有进展,实际干活的 CTO 就会受到很多职位的诱惑,并且会对业务的发展产生怀疑,如果 CEO 说没搞错大家接着干,CTO 会觉得 CEO 很难沟通,听不进团队意见反馈,可能自己不受认可和尊重,可能就离职了,如果 CEO 让 CTO 负责改版,CTO 改版通常不靠谱,这次改版可能把 CEO 原来的想法颠覆掉了,如果改版不成功,试个 2 次这个创业团队就会面临家里的压力,如果没有进展可能创业队就解散了,所以大部分创业团队会在第二年年底解散。

参考

https://www.zhihu.com/question/31116099/answer/1572832069

高级Go并发模式

本文翻译自《Advanced Go Concurrency Patterns》。

Sameer Ajmani

May 2013

1 高级Go并发模式

Sameer Ajmani

Google

2 视频

该演讲于2013年5月在Google I/O上发表。

在YouTube上观看演讲

3 做好准备

4 Go支持并发

在语言和运行时(runtime)。不是一个库。

这会改变你构建程序的方式。

5 协程(Goroutine)和通道(Channel)

Goroutine是在同一地址空间中独立执行的函数。

go f()
go g(1, 2)

Channel是允许goroutine同步和交换信息的类型值。

c := make(chan int)
go func() { c <- 3 }()
n := <-c

有关基础知识的更多信息,请观看Go并发模式 (Pike, 2012)

6 示例:乒乓球(ping-pong)

type Ball struct{ hits int }

func main() {
    table := make(chan *Ball)
    go player("ping", table)
    go player("pong", table)

    table <- new(Ball) // 开始游戏:投球
    time.Sleep(1 * time.Second)
    <-table // 结束游戏:抓住这个球
}

func player(name string, table chan *Ball) {
    for {
        ball := <-table
        ball.hits++
        fmt.Println(name, ball.hits)
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}

7 死锁检测

type Ball struct{ hits int }

func main() {
    table := make(chan *Ball)
    go player("ping", table)
    go player("pong", table)

    // table <- new(Ball) // 开始游戏:投球
    time.Sleep(1 * time.Second)
    <-table // 结束游戏:抓住这个球
}

func player(name string, table chan *Ball) {
    for {
        ball := <-table
        ball.hits++
        fmt.Println(name, ball.hits)
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}

8 使用panic输出堆栈信息

type Ball struct{ hits int }

func main() {
    table := make(chan *Ball)
    go player("ping", table)
    go player("pong", table)

    table <- new(Ball) // 开始游戏:投球
    time.Sleep(1 * time.Second)
    <-table // 结束游戏:抓住这个球

    panic("show me the stacks")
}

func player(name string, table chan *Ball) {
    for {
        ball := <-table
        ball.hits++
        fmt.Println(name, ball.hits)
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}

9 走(go)很容易,但如何停下来?

长时间运行的程序需要清理。

让我们看看如何编写处理通信、周期性事件和取消的程序。

核心是Go的select语句:就像一个switch,但决定是基于通信能力做出的。

select {
case xc <- x:
    // 把x发送到xc通道
case y := <-yc:
    // 从yc通道接收y
}

10 示例:feed阅读器

我最喜欢的feed阅读器消失了。我需要一个新的。

为什么不写一个?

我们从哪里开始?

11 查找RSS客户端

godoc.org中搜索“rss”会出现几个命中,其中一个提供:

// Fetch获取uri的Items并返回应该尝试下一次获取的时刻。失败时,Fetch返回错误。
func Fetch(uri string) (items []Item, next time.Time, err error)

type Item struct{
    Title, Channel, GUID string // RSS字段的子集
}

但我想要一个流:

<-chan Item

我想要多个订阅。

12 这就是我们所拥有的

type Fetcher interface {
    Fetch() (items []Item, next time.Time, err error)
}

func Fetch(domain string) Fetcher {...} // 从域名获取条目

13 这就是我们想要的

type Subscription interface {
    Updates() <-chan Item // 条目流(stream of Items)
    Close() error         // 关闭流
}

func Subscribe(fetcher Fetcher) Subscription {...} // 把获取到的数据转换为一个流

func Merge(subs ...Subscription) Subscription {...} // 合并多个流

14 示例

func main() {
    // 订阅一些feed,并创建一个合并的更新流。
    merged := Merge(
        Subscribe(Fetch("blog.golang.org")),
        Subscribe(Fetch("googleblog.blogspot.com")),
        Subscribe(Fetch("googledevelopers.blogspot.com")))

    // 在一段时间后关闭所有订阅(subscription)。
    time.AfterFunc(3*time.Second, func() {
        fmt.Println("closed:", merged.Close())
    })

    // 输出流中数据。
    for it := range merged.Updates() {
        fmt.Println(it.Channel, it.Title)
    }

    panic("show me the stacks")
}

15 订阅(Subscribe)

Subscribe创建一个新的Subscription,它重复获取条目,直到调用Close。

func Subscribe(fetcher Fetcher) Subscription {
    s := &sub{
        fetcher: fetcher,
        updates: make(chan Item), // for Updates
    }
    go s.loop()
    return s
}

// sub实现了Subscription接口。
type sub struct {
    fetcher Fetcher   // 获取条目
    updates chan Item // 把条目分发给用户
}

// loop使用s.fetcher方法获取条目并使用s.updates方法发送它们。当调用s.Close方法时,退出loop。
func (s *sub) loop() {...}

16 实现Subscription

要实现Subscription接口,请定义Updates方法和Close方法。

func (s *sub) Updates() <-chan Item {
    return s.updates
}
func (s *sub) Close() error {
    // TODO: 让loop退出
    // TODO: 发现任何错误
    return err
}

17 loop有什么作用?

  • 定期调用Fetch
  • 在Updates通道上发送获取的条目
  • 调用Close时退出,并报告错误

18 幼稚的实现

    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err                 
            time.Sleep(10 * time.Second)
            continue
        }
        for _, item := range items {
            s.updates <- item
        }
        if now := time.Now(); next.After(now) {
            time.Sleep(next.Sub(now))
        }
     }

func (s *naiveSub) Close() error {
    s.closed = true
    return s.err   
}

19 Bug 1:对s.closed/s.err的非同步访问

    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err
            time.Sleep(10 * time.Second)
            continue
        }
        for _, item := range items {
            s.updates <- item
        }
        if now := time.Now(); next.After(now) {
            time.Sleep(next.Sub(now))
        }
    }

func (s *naiveSub) Close() error {
    s.closed = true
    return s.err
}

20 竞态(Race)检测器

go run -race naivemain.go
    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err

func (s *naiveSub) Close() error {
    s.closed = true
    return s.err
}

21 Bug 2:time.Sleep可能会保持循环运行

    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err                 
            time.Sleep(10 * time.Second)
            continue
        }
        for _, item := range items {
            s.updates <- item
        }
        if now := time.Now(); next.After(now) {
            time.Sleep(next.Sub(now))
        }
    }

22 Bug 3:循环可能会在s.updates上永远阻塞

    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err                 
            time.Sleep(10 * time.Second)
            continue
        }
        for _, item := range items {
            s.updates <- item
        }
        if now := time.Now(); next.After(now) {
            time.Sleep(next.Sub(now))
        }
    }

23 解决方案

将循环体更改为具有三种情况的select:

  • Close被调用时
  • 调用Fetch时
  • 在s.updates上发送条目

24 结构:for-select循环

循环在它自己的goroutine中运行。

select让循环避免在任何一种状态下无限期地阻塞。

func (s *sub) loop() {
    ... declare mutable state ...
    for {
        ... set up channels for cases ...
        select {
        case <-c1:
            ... read/write state ...
        case c2 <- x:
            ... read/write state ...
        case y := <-c3:
            ... read/write state ...
        }
    }
}

这些case通过循环中的本地状态进行交互。

25 case 1:Close

Close通过s.closure与循环通信。

type sub struct {
    closing chan chan error
}

服务(loop)在其通道(s.closure)上监听请求。

客户端(Close)在s.closure上发送请求:退出并回复错误

在这种情况下,请求中唯一的内容就是回复通道。

26 Case 1: Close

Close要求循环退出并等待响应。

func (s *sub) Close() error {
    errc := make(chan error)
    s.closing <- errc
    return <-errc
}

循环通过回复Fetch错误来处理Close并退出。

    var err error // 当Fetch失败时设置错误
    for {
        select {
        case errc := <-s.closing:
            errc <- err
            close(s.updates) // 告诉接收者我们做完了
            return
        }
    }

27 Case 2: Fetch

在延迟一段时间后安排下一次Fetch。

    var pending []Item // 被fetch追加;被send消费
    var next time.Time // 初始化值是January 1, year 0
    var err error
    for {
        var fetchDelay time.Duration // 初始化值是0(没有延迟)
        if now := time.Now(); next.After(now) {
            fetchDelay = next.Sub(now)
        }
        startFetch := time.After(fetchDelay)

        select {
        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)
        }
    }

28 Case 3: Send

一次发送一个获取的条目。

var pending []Item // 被fetch追加;被send消费
for {
    select {
    case s.updates <- pending[0]:
        pending = pending[1:]
    }
}

哎呀。这崩溃了。

29 select和nil通道

在nil通道上发送和接收会被阻塞。

select从不选择阻塞的case。

func main() {
    a, b := make(chan string), make(chan string)
    go func() { a <- "a" }()
    go func() { b <- "b" }()
    if rand.Intn(2) == 0 {
        a = nil
        fmt.Println("nil a")
    } else {
        b = nil
        fmt.Println("nil b")
    }
    select {
    case s := <-a:
        fmt.Println("got", s)
    case s := <-b:
        fmt.Println("got", s)
    }
}

30 Case 3: Send (修复的)

仅在pending非空时启用send。

    var pending []Item // 被fetch追加;被send消费
    for {
        var first Item
        var updates chan Item
        if len(pending) > 0 {
            first = pending[0]
            updates = s.updates // 启用send case
        }

        select {
        case updates <- first:
            pending = pending[1:]
        }
     }

31 select

将三个case放在一起:

   select {
        case errc := <-s.closing:
            errc <- err
            close(s.updates)
            return
        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)
        case updates <- first:
            pending = pending[1:]
      }

这些case通过err、next和pending交互。

没有锁,没有条件变量,没有回调函数。

32 已修复的Bug

  • Bug 1:对s.closed和s.err的非同步(unsynchronized)访问
  • Bug 2:time.Sleep可能会保持循环运行
  • Bug 3:循环可能会永远阻塞在发送s.updates上
        select {
        case errc := <-s.closing:
            errc <- err
            close(s.updates)
            return
        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)
        case updates <- first:
            pending = pending[1:]
        }

33 我们可以进一步改进循环

34 问题:Fetch可能会返回重复项

    var pending []Item
    var next time.Time
    var err error

        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)

35 修复:在添加到pending之前过滤条目

    var pending []Item
    var next time.Time
    var err error
    var seen = make(map[string]bool) // item.GUIDs集合

        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            for _, item := range fetched {
                if !seen[item.GUID] {
                    pending = append(pending, item)
                    seen[item.GUID] = true
                }
            }

36 问题:pending队列无限制地增长

        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            for _, item := range fetched {
                if !seen[item.GUID] {
                    pending = append(pending, item)
                    seen[item.GUID] = true
                }
            }

37 问题:pending队列无限制地增长

const maxPending = 10

        var fetchDelay time.Duration
        if now := time.Now(); next.After(now) {
            fetchDelay = next.Sub(now)
        }
        var startFetch <-chan time.Time
        if len(pending) < maxPending {
            startFetch = time.After(fetchDelay) // 启用fetch case
        }

可以改为从pending的头部删除较旧的条目。

38 问题:Fetch上的循环被阻塞

        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            for _, item := range fetched {
                if !seen[item.GUID] {
                    pending = append(pending, item)
                    seen[item.GUID] = true         
                }
            }

39 修复:异步运行Fetch

为fetchDone添加一个新的select case。

type fetchResult struct{ fetched []Item; next time.Time; err error }

    var fetchDone chan fetchResult // 如果不是nil,则表示Fetch正在运行

        var startFetch <-chan time.Time
        if fetchDone == nil && len(pending) < maxPending {
            startFetch = time.After(fetchDelay) // 启用fetch case
        }

        select {
        case <-startFetch:
            fetchDone = make(chan fetchResult, 1)
            go func() {
                fetched, next, err := s.fetcher.Fetch()
                fetchDone <- fetchResult{fetched, next, err}
            }()
        case result := <-fetchDone:
            fetchDone = nil
            // 使用result.fetched, result.next, result.err

40 实现订阅

响应式。干净。易于阅读和更改。

三种技术:

  • for-select循环
  • 服务通道,回复通道(chan chan err)
  • select case里的nil通道

更多详细信息在线,包括Merge。

41 总结

并发编程可能很棘手。

Go让它变得更容易:

  • 通道传送数据、定时器事件、取消信号
  • goroutines序列化对本地可变状态的访问
  • 堆栈跟踪和死锁检测器
  • 竞态检测器

42 链接

Go并发模式(2012)

go.dev/talks/2012/concurrency.slide

并发不是并行

golang.org/s/concurrency-is-not-parallelism

通过通信共享内存

golang.org/doc/codewalk/sharemem

Go Tour (在你的浏览器里学习Go)

tour.golang.org

43 致谢

Sameer Ajmani

Google

http://profiles.google.com/ajmani

@Sajma

http://golang.org

nil channel(零值通道)及其使用场景

在Go语言里,把nil赋值给通道变量c,c就成为了nil通道。在nil通道上发送或接收都会被阻塞。

nil通道的使用场景之一是,select的某个case的通道根据某个条件来启用或禁用,当条件为true时,启用通道,case块代码有机会被执行,当条件为false时,把nil赋值给该通道使其成为nil通道,以禁用该通道(一直阻塞直到条件变为true),case块代码不会被执行。

如何在CentOS 7上使用VSFTPD设置FTP服务器

本文翻译自《How to Setup FTP Server with VSFTPD on CentOS 7》。

FTP(File Transfer Protocol,文件传输协议)是一种标准的客户端-服务器(client-server)网络协议,允许用户在远程网络之间传输文件。

有几个可用于Linux的开源FTP服务器。最流行和使用最广泛的是PureFTPdProFTPDvsftpd

在本教程中,我们将在CentOS 7上安装vsftpd(非常安全的Ftp守护进程)。它是一个稳定、安全和快速的FTP服务器。我们还将向你展示如何配置vsftpd以将用户限制在他们的家目录中并使用SSL/TLS加密整个传输。

要获得更安全和更快的数据传输,请使用SCPSFTP

先决条件

在继续本教程之前,请确保你以具有sudo权限的用户身份登录。

在CentOS 7上安装vsftpd

vsftpd软件包在默认的CentOS存储库中可用。要安装它,请执行以下命令:

$ sudo yum install vsftpd

安装软件包后,启动vsftpd守护进程并使其在操作系统启动时自动启动:

$ sudo systemctl start vsftpd
$ sudo systemctl enable vsftpd

你可以通过打印其状态来验证vsftpd服务是否正在运行:

$ sudo systemctl status vsftpd

输出如下所示,表明vsftpd服务处于活动状态并正在运行:

● vsftpd.service - Vsftpd ftp daemon
   Loaded: loaded (/usr/lib/systemd/system/vsftpd.service; enabled; vendor preset: disabled)
   Active: active (running) since Thu 2018-11-22 09:42:37 UTC; 6s ago
 Main PID: 29612 (vsftpd)
   CGroup: /system.slice/vsftpd.service
           └─29612 /usr/sbin/vsftpd /etc/vsftpd/vsftpd.conf

配置vsftpd

配置vsftpd服务涉及编辑/etc/vsftpd/vsftpd.conf配置文件。大多数设置都在配置文件中详细记录。有关所有可用选项,请访问官方vsftpd页面

在以下部分中,我们将介绍配置安全的vsftpd安装所需的一些重要设置。

首先打开vsftpd配置文件:

$ sudo nano /etc/vsftpd/vsftpd.conf

1. FTP访问

我们将只允许本地用户访问FTP服务器,找到anonymous_enablelocal_enable指令并验证你的配置是否与以下行匹配:

anonymous_enable=NO
local_enable=YES

2. 启用上传

取消注释write_enable设置以允许更改文件系统,例如上传和删除文件。

write_enable=YES

3. Chroot监狱

通过取消注释chroot指令来防止FTP用户访问其家目录之外的任何文件。

chroot_local_user=YES

默认情况下,启用chroot时,如果用户锁定的目录是可写的,vsftpd将拒绝上传文件。这是为了防止安全漏洞。

启用chroot时,使用以下方法之一允许上传。

方法 1. – 允许上传的推荐方法是启用chroot并配置FTP目录。在本教程中,我们将在用户家目录中创建一个ftp目录,该目录将用作chroot和一个可写的上传目录,用于上传文件。

user_sub_token=$USER
local_root=/home/$USER/ftp

方法 2. – 另一种选择是在vsftpd配置文件中添加以下指令。如果你必须授予用户对其家目录的可写访问权限,请使用此选项。

allow_writeable_chroot=YES

4. 被动FTP连接

vsftpd可以使用任何端口进行被动FTP连接。我们将指定端口的最小和最大范围,然后在我们的防火墙中打开该范围。

将以下行添加到配置文件中:

pasv_min_port=30000
pasv_max_port=31000

5. 限制用户登录

要仅允许某些用户登录FTP服务器,请在userlist_enable=YES行之后添加以下行:

userlist_file=/etc/vsftpd/user_list
userlist_deny=NO

启用此选项后,你需要通过将用户名添加到/etc/vsftpd/user_list文件(每行一个用户名)来明确指定哪些用户能够登录。

6. 使用SSL/TLS保护传输

为了使用SSL/TLS加密FTP传输,你需要拥有SSL证书并配置FTP服务器以使用它。

你可以使用由受信任的证书颁发机构签名的现有的SSL证书或创建自签名证书。

如果你有一个指向FTP服务器IP地址的域名或子域名,你可以轻松生成免费的Let’s Encrypt SSL证书。

在本教程中,我们将使用openssl命令生成自签名的SSL证书

以下命令将创建一个有效期为10年的2048位私钥和自签名证书。私钥和证书都将保存在同一个文件中:

$ sudo openssl req -x509 -nodes -days 3650 -newkey rsa:2048 -keyout /etc/vsftpd/vsftpd.pem -out /etc/vsftpd/vsftpd.pem

创建SSL证书后,打开vsftpd配置文件:

$ sudo nano /etc/vsftpd/vsftpd.conf

找到rsa_cert_filersa_private_key_file指令,将它们的值更改为pem文件路径并将ssl_enable指令设置为YES

rsa_cert_file=/etc/vsftpd/vsftpd.pem
rsa_private_key_file=/etc/vsftpd/vsftpd.pem
ssl_enable=YES

如果没有另外指定,FTP服务器将只使用TLS来建立安全连接。

重启vsftpd服务

完成编辑后,vsftpd配置文件(不包括注释)应如下所示:

anonymous_enable=NO
local_enable=YES
write_enable=YES
local_umask=022
dirmessage_enable=YES
xferlog_enable=YES
connect_from_port_20=YES
xferlog_std_format=YES
chroot_local_user=YES
listen=NO
listen_ipv6=YES
pam_service_name=vsftpd
userlist_enable=YES
userlist_file=/etc/vsftpd/user_list
userlist_deny=NO
tcp_wrappers=YES
user_sub_token=$USER
local_root=/home/$USER/ftp
pasv_min_port=30000
pasv_max_port=31000
rsa_cert_file=/etc/vsftpd/vsftpd.pem
rsa_private_key_file=/etc/vsftpd/vsftpd.pem
ssl_enable=YES

保存文件并重新启动vsftpd服务以使更改生效:

$ sudo systemctl restart vsftpd

打开防火墙

如果你正在运行防火墙,则需要允许FTP流量。 要打开端口21(FTP命令端口)、端口20(FTP数据端口)和30000-31000(被动端口范围),请发出以下命令:

$ sudo firewall-cmd --permanent --add-port=20-21/tcp
$ sudo firewall-cmd --permanent --add-port=30000-31000/tcp

通过键入以下内容重新加载防火墙规则:

$ firewall-cmd --reload

创建一个FTP用户

为了测试我们的FTP服务器,我们将创建一个新用户。

  • 如果你已经有一个要授予FTP访问权限的用户,请跳过第1步。
  • 如果你在配置文件中设置了allow_writeable_chroot=YES,请跳过第3步。

1 创建一个名为newftpuser的新用户:

$ sudo adduser newftpuser

接下来,你需要设置用户密码

$ sudo passwd newftpuser

2 将用户添加到允许的FTP用户列表:

$ echo "newftpuser" | sudo tee -a /etc/vsftpd/user_list

3 创建FTP目录树并设置正确的权限

$ sudo mkdir -p /home/newftpuser/ftp/upload
$ sudo chmod 550 /home/newftpuser/ftp
$ sudo chmod 750 /home/newftpuser/ftp/upload
$ sudo chown -R newftpuser: /home/newftpuser/ftp

如上一节所述,用户将能够将其文件上传到ftp/upload目录。

此时,你的FTP服务器功能齐全,你应该能够使用任何可配置为使用TLS加密的FTP客户端(例如FileZilla)连接到你的服务器。

禁止访问shell

默认情况下,创建用户时,如果未明确指定,用户将拥有对服务器的SSH访问权限。

要禁止访问shell,我们将创建一个新的shell,它会简单地打印一条消息,告诉用户他们的账户仅限于FTP访问。

运行以下命令来创建/bin/ftponly shell并使其可执行:

$ echo -e '#!/bin/sh\necho "This account is limited to FTP access only."' | sudo tee -a  /bin/ftponly
$ sudo chmod a+x /bin/ftponly

将新的shell附加到/etc/shells文件中的有效shell列表中:

$ echo "/bin/ftponly" | sudo tee -a /etc/shells

将用户shell更改为/bin/ftponly

$ sudo usermod newftpuser -s /bin/ftponly

使用相同的命令为你希望仅授予FTP访问权限的其他用户更改shell。

总结

在本教程中,你学到了如何在CentOS 7系统上安装和配置安全快速的FTP服务器。

如果你有任何问题或反馈,请随时发表评论。

Go并发模式

本文翻译自《Go Concurrency Patterns》,是个PPT。

1 Go Concurrency Patterns

Rob Pike

Google

2 视频

这个演讲是在2012年6月在Google I/O大会上做的。

可在YouTube上观看这个演讲

3 介绍

4 Go中的并发特性

当Go语言首次发布时,人们似乎对Go的并发特性着迷。

问题:

  • 为什么支持并发?
  • 什么是并发?
  • 想法从何而来?
  • 到底有什么好处呢?
  • 我该如何使用它?

5 为什么?

看看你周围。你看到了什么?

你是否看到一个单步的世界一次只做一件事?

或者你是否看到了一个由相互作用、行为独立的片段组成的复杂世界?

这就是为什么。顺序处理(Sequential processing)本身并不能模拟世界的行为。

6 什么是并发?

并发是独立地执行计算的组合。

并发是一种构建软件的方式,特别是编写与现实世界交互良好的干净代码的方式。

它不是并行。

7 并发不是并行

并发不是并行,尽管它支持并行。

如果你只有一个处理器,你的程序仍然可以并发但不能并行。

另一方面,编写良好的并发程序可能会在多处理器上高效地并行运行。那个属性可能很重要…

有关该区别的更多信息,请参阅下面的链接。这里讨论的太多了。

golang.org/s/concurrency-is-not-parallelism

8 软件构建模型

容易理解。

便于使用。

容易推理。

你不需要成为专家!

(比处理并行的细节(线程、信号量、锁、屏障等)要好得多。)

9 历史

对许多人来说,Go的并发特性似乎很新。

但它们植根于悠久的历史,可以追溯到1978年Hoare的CSP甚至Dijkstra的守卫命令(1975)。

具有相似特征的语言:

  • Occam (May, 1983)
  • Erlang (Armstrong, 1986)
  • Newsqueak (Pike, 1988)
  • Concurrent ML (Reppy, 1993)
  • Alef (Winterbottom, 1995)
  • Limbo (Dorward, Pike, Winterbottom, 1996).

10 区别

Go是Newsqueak-Alef-Limbo分支的最新成员,以第一公民的通道而著称。

Erlang更接近于原始CSP,你可以通过名字而不是通过通道与进程通信。

这些模型是等效的,但表达方式不同。

粗略的类比:按名字写入文件(进程,Erlang)与写入文件描述符(通道,Go)。

11 基本例子

12 一个无聊的函数

我们需要一个例子来展示并发原语的有趣属性。

为了避免分心,我们把它作为一个无聊的例子。

func boring(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(time.Second)
    }
}

13 不那么无聊

使消息之间的间隔不可预测(仍不到一秒)。

func boring(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

14 运行它

无聊的功能永远运行,就像一个无聊的派对客人。

func main() {
    boring("boring!")
}

func boring(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

15 忽略它

go语句照常运行函数,但不会让调用者等待。

它启动一个goroutine。 该功能类似于shell命令末尾的&。

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    go boring("boring!")
}

16 少一点忽略

当main返回时,程序退出并带走boring函数。

我们可以闲逛一下,在路上显示main和已启动的goroutine都在运行。

func main() {
    go boring("boring!")
    fmt.Println("I'm listening.")
    time.Sleep(2 * time.Second)
    fmt.Println("You're boring; I'm leaving.")
}

17 协程(Goroutine)

什么是goroutine?它是一个独立执行的函数,由go语句启动。

它有自己的调用堆栈,可以根据需要增长和缩小。

它非常廉价。实践中,拥有数千甚至数十万个goroutine是很常见的。

这不是一个线程。

一个包含数千个goroutine的程序中可能只有一个线程。

相反,goroutines会根据需要动态地多路复用到线程上,以保持所有goroutines运行。

但是,如果你将其视为非常廉价的线程,那么你将不会走太远。

18 通信(Communication)

我们boring的例子被骗了:main函数看不到其他goroutine的输出。

它只是打印到屏幕上,我们假装看到了对话。

真正的对话需要通信。

19 通道(Channel)

Go中的通道提供了两个goroutine之间的连接,允许它们进行通信。

// 声明和初始化。
var c chan int
c = make(chan int)
// 或者
c := make(chan int)
// 在通道上发送数据。
c <- 1
// 从通道里取数据。
// 箭头指明了数据流的方向。
value = <-c

20 使用通道

通道连接main和boring的goroutine,以便它们可以通信。

func main() {
    c := make(chan string)
    go boring("boring!", c)
    for i := 0; i < 5; i++ {
        fmt.Printf("You say: %q\n", <-c) // 接收表达式仅是一个值。
    }
    fmt.Println("You're boring; I'm leaving.")
}

func boring(msg string, c chan string) {
    for i := 0; ; i++ {
        c <- fmt.Sprintf("%s %d", msg, i) // 将被发送的表达式可以是任意合适的值。
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

21 同步(Synchronization)

当main函数执行<–c时,它会等待一个值被发送。

同样,当boring函数执行c <– value时,它会等待接收者准备好,再把值发送到通道。

发送者和接收者都必须准备好在通信中发挥自己的作用。否则我们会等待直到他们准备好。 因此,通道既可以通信又可以同步。

22 关于带缓冲区的通道的旁白

专家提示:Go也可以创建带缓冲区的通道。

缓冲区消除了同步。

通道的缓冲区使它们更像Erlang的邮箱。

带缓冲区的通道对于某些问题可能很重要,但它们需要更加微妙的推理。

我们今天不需要它们。

23 Go的使用方式

不要通过共享内存来通信,而是通过通信来共享内存。

24 模式

25 生成器:返回一个通道的函数

通道是一等公民(first-class)的值,就像字符串或整数一样。

c := boring("boring!") // 返回一个通道的函数。
    for i := 0; i < 5; i++ {
        fmt.Printf("You say: %q\n", <-c)
    }
    fmt.Println("You're boring; I'm leaving.")

func boring(msg string) <-chan string { // 返回只能用来接收值(receive-only)的通道
    c := make(chan string)
    go func() { // 我们在函数内部开启一个goroutine。
        for i := 0; ; i++ {
            c <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
        }
    }()
    return c // 把通道返回给本函数的调用者。
}

26 通道作为服务的句柄

我们的boring函数返回一个通道,让我们可以与它提供的boring服务进行通信。

我们可以有该服务的更多实例。

func main() {
    joe := boring("Joe")
    ann := boring("Ann")
    for i := 0; i < 5; i++ {
        fmt.Println(<-joe)
        fmt.Println(<-ann)
    }
    fmt.Println("You're both boring; I'm leaving.")
}

27 多路复用

这些计划使Joe和Ann步调一致。

我们可以改为使用扇入功能让准备好的人说话。

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() { for { c <- <-input1 } }()
    go func() { for { c <- <-input2 } }()
    return c
}
func main() {
    c := fanIn(boring("Joe"), boring("Ann"))
    for i := 0; i < 10; i++ {
        fmt.Println(<-c)
    }
    fmt.Println("You're both boring; I'm leaving.")
}

28 扇入(Fan-in)

29 恢复顺序

在一个通道上发送一个通道,让goroutine等待轮到它。

接收所有消息,然后在专用通道上发送,再次启用它们。 首先,我们定义一个包含回复用的通道的消息类型。

type Message struct {
    str string
    wait chan bool
}

30 恢复顺序

每个发言者都必须等待批准。

for i := 0; i < 5; i++ {
        msg1 := <-c; fmt.Println(msg1.str)
        msg2 := <-c; fmt.Println(msg2.str)
        msg1.wait <- true
        msg2.wait <- true
}

waitForIt := make(chan bool) // 所有消息实例共享的通道。

// (译者注:以下是每个发言者协程的关键代码)
    c <- Message{ fmt.Sprintf("%s: %d", msg, i), waitForIt }
    time.Sleep(time.Duration(rand.Intn(2e3)) * time.Millisecond)
    <-waitForIt

(译者注:上述程序实现每个发言者发言后,必须等待批准,才能继续运行下去。)

31 select

并发独有的控制结构。

和通道、goroutine一起内置在语言中。

32 select

select语句提供了另一种处理多个通道的方法。

这就像一个开关,但每个case都是一个通信:

– 评估所有通道。

– 阻塞选择,直到一个通信可以继续,然后它会继续。

– 如果有多个可以继续,伪随机选择其中一个。 – 如果没有一个通道准备好,则默认子句(如果存在)立即执行。

select {
    case v1 := <-c1:
        fmt.Printf("received %v from c1\n", v1)
    case v2 := <-c2:
        fmt.Printf("received %v from c2\n", v1)
    case c3 <- 23:
        fmt.Printf("sent %v to c3\n", 23)
    default:
        fmt.Printf("no one was ready to communicate\n")
    }

33 再次扇入

重写我们原来的fanIn函数。实际上只需要一个goroutine就能实现。旧的:

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() { for { c <- <-input1 } }()
    go func() { for { c <- <-input2 } }()
    return c
}

34 使用select扇入

重写我们原来的fanIn函数。只需要一个goroutine就能实现。新的:

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case s := <-input1:  c <- s
            case s := <-input2:  c <- s
            }
        }
    }()
    return c
}

35 使用select超时

time.After函数返回一个阻塞指定持续时间段的通道。

在该时间段过去之后,通道将传递当前时间一次。

func main() {
    c := boring("Joe")
    for {
        select {
        case s := <-c:
            fmt.Println(s)
        case <-time.After(1 * time.Second):
            fmt.Println("You're too slow.")
            return
        }
    }
}

36 使用select的整个对话超时

在循环外创建一次计时器,以使整个对话超时。

(在前面的程序中,我们对每条消息都有一个超时。)

func main() {
    c := boring("Joe")
    timeout := time.After(5 * time.Second)
    for {
        select {
        case s := <-c:
            fmt.Println(s)
        case <-timeout:
            fmt.Println("You talk too much.")
            return
        }
    }
}

37 退出通道

我们可以扭转这种局面,告诉Joe在我们听腻了他的时候停下来。

quit := make(chan bool)
    c := boring("Joe", quit)
    for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
quit <- true

select {
            case c <- fmt.Sprintf("%s: %d", msg, i):
                // do nothing
            case <-quit:
                return
            }

38 从quit通道接收

我们怎么知道它已经完成了?等待它告诉我们它已经完成:在退出通道接收

quit := make(chan string)
    c := boring("Joe", quit)
    for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
    quit <- "Bye!"
fmt.Printf("Joe says: %q\n", <-quit)

select {
            case c <- fmt.Sprintf("%s: %d", msg, i):
                // do nothing
            case <-quit:
                cleanup()
                quit <- "See you!"
                return
            }

39 通道菊花链

func f(left, right chan int) {
    left <- 1 + <-right
}

func main() {
    const n = 10000
    leftmost := make(chan int)
    right := leftmost
    left := leftmost
    for i := 0; i < n; i++ {
        right = make(chan int)
        go f(left, right)
        left = right
    }
    go func(c chan int) { c <- 1 }(right)
    fmt.Println(<-leftmost)
}

40 中国耳语,地鼠风格

41 系统软件

Go是为编写系统软件而设计的。

让我们看看并发特性是如何发挥作用的。

42 示例:谷歌搜索

Q:谷歌搜索做什么?

A:给定一个查询,返回一页搜索结果(和一些广告)。

Q:我们如何获得搜索结果?

A:将查询发送到Web搜索、图像搜索、YouTube、地图、新闻等。然后混合结果。

我们如何实现这一点?

43 谷歌搜索:一个虚假的框架

我们可以模拟搜索功能,就像我们之前模拟对话一样。

var (
    Web = fakeSearch("web")
    Image = fakeSearch("image")
    Video = fakeSearch("video")
)

type Search func(query string) Result

func fakeSearch(kind string) Search {
        return func(query string) Result {
              time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
              return Result(fmt.Sprintf("%s result for %q\n", kind, query))
        }
}

44 Google 搜索:测试这个框架

func main() {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    results := Google("golang")
    elapsed := time.Since(start)
    fmt.Println(results)
    fmt.Println(elapsed)
}

45 谷歌搜索 1.0

Google函数接受查询并返回结果片段(只是字符串)。

Google会依次调用Web、图像和视频搜索,并将它们附加到结果切片中。

func Google(query string) (results []Result) {
    results = append(results, Web(query))
    results = append(results, Image(query))
    results = append(results, Video(query))
    return
}

46 谷歌搜索 2.0

同时运行Web、图像和视频搜索,并等待所有结果。

没有锁。没有条件变量。没有回调。

func Google(query string) (results []Result) {
    c := make(chan Result)
    go func() { c <- Web(query) } ()
    go func() { c <- Image(query) } ()
    go func() { c <- Video(query) } ()

    for i := 0; i < 3; i++ {
        result := <-c
        results = append(results, result)
    }
    return
}

47 谷歌搜索 2.1

不等待慢速服务器。没有锁。没有条件变量。没有回调。

    c := make(chan Result)
    go func() { c <- Web(query) } ()
    go func() { c <- Image(query) } ()
    go func() { c <- Video(query) } ()

    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return

(译者注:增加了超时逻辑。)

48 避免超时

问:我们如何避免丢弃来自慢速服务器的结果?

答:复制服务器。向多个副本发送请求,并使用第一个响应。

func First(query string, replicas ...Search) Result {
    c := make(chan Result)
    searchReplica := func(i int) { c <- replicas[i](query) }
    for i := range replicas {
        go searchReplica(i)
    }
    return <-c
}

49 使用第一个函数

func main() {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    result := First("golang",
        fakeSearch("replica 1"),
        fakeSearch("replica 2"))
    elapsed := time.Since(start)
    fmt.Println(result)
    fmt.Println(elapsed)
}

50 谷歌搜索 3.0

使用复制的搜索服务器减少尾部延迟。

c := make(chan Result)
    go func() { c <- First(query, Web1, Web2) } ()
    go func() { c <- First(query, Image1, Image2) } ()
    go func() { c <- First(query, Video1, Video2) } ()
    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return

51 还是…

没有锁。没有条件变量。没有回调。

52 总结

在几个简单的变换示例中,我们使用Go的并发原语来转换

  • 缓慢的
  • 顺序的
  • 故障敏感的

程序为一个

  • 快速的
  • 并发的
  • 有副本的
  • 健壮的

程序。

53 更多派对技巧

使用这些工具有无穷无尽的方法,许多在别处都有介绍。

Chatroulette toy:

golang.org/s/chat-roulette

Load balancer:

golang.org/s/load-balancer

Concurrent prime sieve:

golang.org/s/prime-sieve

Concurrent power series (by McIlroy):

golang.org/s/power-series

54 不要过用

玩起来很有趣,但不要过度使用这些想法。

Goroutines和通道是大创意。它们是程序构建的工具。

但有时你只需要一个引用计数器。

Go有”sync”和”sync/atomic”包,它们提供互斥锁、条件变量等。它们为较小的问题提供了工具。

通常,这些东西会一起解决更大的问题。

始终为工作使用正确的工具。

55 结论

Goroutines和channels使得表达复杂的操作变得容易

  • 多个输入
  • 多个输出
  • 超时
  • 失败

而且它们使用起来很有趣。

56 链接

Go主页:

golang.org

Go Tour (在浏览器里学习Go):

tour.golang.org

包(Package)文档:

golang.org/pkg

Articles galore:

golang.org/doc

并发不是并行:

golang.org/s/concurrency-is-not-parallelism

57 致谢

Rob Pike

Google

http://golang.org/s/plusrob

@rob_pike

http://golang.org

Go并发模式:管道和取消

本文翻译自《Go Concurrency Patterns: Pipelines and cancellation》。

Sameer Ajmani

13 March 2014

介绍

Go的并发原语使构建数据流管道变得容易,从而有效地利用I/O和多个CPU。本文介绍了此类管道的示例,重点介绍了操作失败时出现的细微差别,并介绍了干净地处理故障的技术。

什么是管道?

Go中没有关于管道的正式定义。它只是多种并发程序中的一种。通俗地说,管道(pipeline)是由通道(channel)连接的一系列阶段(stage),其中每个阶段是一组运行相同功能的goroutine。在每个阶段,goroutines

  • 通过入口(inbound)通道从上游接收值
  • 对该数据执行某些功能,通常会产生新值
  • 通过出口(outbound)通道向下游发送值

每个阶段都有任意数量的入口和出口通道,除了第一个阶段和最后一个阶段,它们分别只有出口或入口通道。第一阶段有时称为源头或生产者;最后一个阶段,称为水池(sink)或消费者。(译者注:一个阶段在代码实现上就是一个函数,入口通道或出口通道作为该函数的参数或返回值,在函数体内启动一个或多个goroutines来处理这一阶段的业务逻辑。见下文的各个例子。)

我们将从一个简单的示例管道开始来解释这些想法和技术。稍后,我们将展示一个更现实的例子。

平方数

考虑一个具有三个阶段的管道。

第一阶段gen是一个函数,将一个整数列表转换为一个通道,发出列表中的整数。gen函数启动一个goroutine,在通道上发出整数,并在发送完所有值后关闭该通道:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

第二阶段sq从通道接收整数并返回一个通道,该通道发出每个接收到的整数的平方值。在入口通道关闭并且此阶段已将所有值发送到下游后,它会关闭出口通道:

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

main函数设置管道并运行最后一个阶段:它从第二阶段接收值并打印输出每个值,直到通道关闭:

func main() {
    // 安装这个管道。
    c := gen(2, 3)
    out := sq(c)

    // 消费输出的值。
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

由于sq的入口通道和出口通道具有相同的类型,因此我们可以任意多次组合它。我们还可以将main重写为范围循环(译者注:即使用for-range语句),就像其他阶段一样:

func main() {
    // 安装管道并消费输出的值。
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 then 81
    }
}

扇出,扇入

多个函数可以从同一个通道读取,直到该通道关闭;这称为扇出(fan-out)。这提供了一种在一组工作者(workers)之间分配工作以并行化使用CPU和I/O的方法。

一个函数可以从多个输入中读取并继续执行,直到所有输入都关闭,方法是将输入通道多路复用到一个通道上,当所有输入都关闭时,该通道才关闭。这称为扇入(fan-in)

我们可以改变我们的管道来运行sq的两个实例,每个实例都从同一个输入通道读取。我们引入了一个新函数,merge,以扇入结果:

func main() {
    in := gen(2, 3)

    // 将sq工作分配给两个都从in通道读取的goroutine。
    c1 := sq(in)
    c2 := sq(in)

    // 消费来自c1和c2的合并输出。
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 then 9, or 9 then 4
    }
}

merge函数通过为每个入口通道启动一个goroutine,将值复制到唯一的出口通道,将通道列表转换为单个通道。一旦所有的输出goroutine都启动了,merge会再启动一个goroutine以在该通道上的所有发送完成后关闭出口通道。

往关闭的通道发送值将引发panic,因此在调用close之前确保所有发送都完成很重要。sync.WaitGroup类型提供了一种安排此同步的简单方法:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 为每个cs中的输入通道启动一个输出goroutine。output将值从c复制到out,直到c关闭,然后调用wg.Done。
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // 一旦所有输出goroutines都完成任务,就启动一个goroutine来关闭out通道。这必须在调用wg.Add之后开始。
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

止损

我们的管道函数有一个模式:

  • 当所有发送操作完成后,阶段关闭它们的出口通道。
  • 阶段不断从入口通道接收值,直到这些通道关闭。

这种模式允许将每个接收阶段编写为range循环,并确保一旦所有值都成功发送到下游,所有goroutines都会退出。

但在管道的实际使用中,并不总是接收所有入口值。有时这是设计使然:接收者可能只需要值的一个子集来取得进展。更常见的情况是,一个阶段提前退出,因为某个入口值表示前面阶段出现错误。在任何一种情况下,接收者都不应该等待剩余的值到达,我们希望前面阶段停止生产后面阶段不需要的值。

在我们的示例管道中,如果一个阶段未能消耗所有入口值,则尝试发送这些值的goroutine将无限期地阻塞:

    // 消耗输出中的第一个值。
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
    // 由于我们没有从out接收第二个值,因此其中一个输出值的goroutine被挂起,试图发送值。
}

这是资源泄漏:goroutine消耗内存和运行时资源,goroutine堆栈中的堆引用防止数据被垃圾回收。Goroutines不会被垃圾回收;他们必须自行退出。

即使下游阶段无法接收所有入口值,我们也需要安排管道的上游阶段退出。一种方法是将出口通道更改为具有缓冲区的通道。一个缓冲区可以保存固定数量的值;如果缓冲区中有空间,则发送操作立即完成:

c := make(chan int, 2) // 缓冲区大小是2
c <- 1  // 立即完成发生
c <- 2  // 立即完成发生
c <- 3  // 阻塞直到另一个goroutine执行<-c并且接收到1

当在创建通道时知道要发送的值的数量时,使用缓冲区可以简化代码。例如,我们可以重写gen以将整数列表复制到缓冲区通道中,并避免创建新的goroutine:

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

回到我们管道中被阻塞的goroutine,我们可以考虑在merge返回的出口通道中添加一个缓冲区:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int, 1) // 为未读输入提供足够的空间
    // ...剩余代码不改变...

虽然这修复了该程序中被阻塞的goroutine,但这是错误的代码。这里设置缓冲区大小为1,因为我们知道merge将接收的值的数量以及下游阶段将消耗的值的数量。这很脆弱:如果我们向gen传递一个额外的值,或者如果下游阶段读取的值更少,我们将再次阻塞goroutine。

相反,我们需要为下游阶段提供一种方法来向发送者表明他们将停止接受输入。

显式取消

main决定不从out接收所有值就退出时,它必须告诉上游阶段的goroutine放弃它们试图发送的值。它通过在一个名为done的通道上发送值来实现。它发送两个值,因为可能有两个被阻塞的发送者:

func main() {
    in := gen(2, 3)

    // 将sq工作分配给两个都从in读取的goroutine。
    c1 := sq(in)
    c2 := sq(in)

    // 从输出消费第一个值。
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // 告诉剩余的发送者我们准备离开了。
    done <- struct{}{}
    done <- struct{}{}
}

发送值的goroutine用一个select语句替换它们的发送操作,该语句在值发送给out时或者当它们从done接收到值时继续运行。done的值的类型是空结构体,因为值无关紧要:它只是一个接收事件,用来指示应该放弃给out继续发送值。输出值的goroutine继续在其入口通道c上循环,因此上游阶段不会被阻塞。(我们稍后会讨论如何让这个循环提前返回。)

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 为每个在cs中的输入通道启动一个output goroutine。output将值从c复制到out,直到c关闭或从done接收到值,然后output调用wg.Done。
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ...剩余代码不改变...

这种方法有一个问题:每个下游接收者都需要知道可能被阻塞的上游发送者的数量,并安排在提前返回时向这些发送者发出信号。跟踪这些数量既繁琐又容易出错。

我们需要一种方法来告诉未知且没有限定数量的goroutine停止向下游发送它们的值。在Go中,我们可以通过关闭通道来做到这一点,因为关闭通道上的接收操作总是可以立即进行,产生元素类型的零值

这意味着main可以通过关闭done通道来解除所有发送者的阻塞。这种关闭实际上是对所有发送者的一个广播信号。我们扩展了我们的每个管道函数以接受done作为参数,并通过defer语句安排关闭的发生,因此来自main的所有返回路径都会发出让管道阶段退出的信号。

func main() {
    // 设置一个由整个管道共享的done通道,并在该管道退出时关闭该通道,作为我们开启的所有goroutines的退出信号。
    done := make(chan struct{})
    defer close(done)          

    in := gen(done, 2, 3)

    // 将sq工作分配给两个都从in读取的goroutine。
    c1 := sq(done, in)
    c2 := sq(done, in)

    // 从output消耗第一个值。
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // done将被推迟的close调用关闭。
}

现在,一旦done关闭,我们的管道的每个阶段都可以自由返回。merge中的output例程可以在其入口通道没有排完水(不消费完入口通道的所有值)的情况下返回,因为它知道上游发送方sq将在done关闭时停止发送。output确保通过defer语句在所有返回路径上调用wg.Done

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 为每个cs中的输入通道启动一个output goroutine。output将值从c复制到out,直到c或done关闭,然后调用wg.Done。
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ...剩余代码不改变...

同样,sq可以在done关闭后立即返回。sq通过defer语句确保其输出通道在所有返回路径上都关闭:

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

(译者注:除了源头和水池,每个阶段都有一个或多个入口通道和一个或多个出口通道,用来接收或发送业务数据和错误信息,此外也可以有控制通道,用来接收或发送对通道的控制信息,这里的done就是控制通道。)

以下是管道建设的指导方针:

  • 当所有发送操作都完成后,阶段关闭它们的出口通道。
  • 阶段继续从入口通道接收值,直到这些通道关闭或发送者被解除阻塞。

管道通过确保所有发送的值有足够的缓冲区或者在接收者可能废弃通道时显式地向发送者发出信号来解除对发送者的阻塞。(译者注:本文通过done通道来控制阶段是否取消运行,是比较原始的方式,更加先进的方式是使用Go 1.14版本加入的context包提供的机制。)

计算一棵目录树里的所有文件的摘要值

让我们考虑一个更实际的管道。

MD5是一种消息摘要算法,可用作文件校验值。命令行实用程序md5sum打印文件列表的摘要值。

% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我们的示例程序与md5sum类似,但将单个目录作为参数,并打印该目录下每个常规文件的摘要值,按路径名排序。

% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我们程序的主函数调用一个辅助函数MD5All,它返回一个从路径名到摘要值的映射,然后对结果进行排序并打印:

func main() {
    // 计算指定目录下所有文件的MD5总和,然后按路径名排序打印结果。
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)
    }
}

MD5All函数是我们讨论的重点。在serial.go中,不使用并发,并且在遍历树时简单地读取和汇总每个文件。

// MD5All读取以root为根目录的文件树中的所有文件,并返回从文件路径到文件内容的MD5值的映射。如果目录遍历失败或任何读取操作失败,MD5All将返回错误。
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.Mode().IsRegular() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}

并行计算摘要值

parallel.go中,我们将MD5All拆分为两个阶段的管道。第一阶段sumFiles遍历目录树,在一个新的goroutine中计算每个文件的摘要值,并将结果发送到值类型为result的通道上:

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}

sumFiles返回两个通道:一个用于result,另一个用于filepath.Walk返回的错误。walk函数启动一个新的goroutine来处理每个常规文件,然后检查done。如果done已关闭,则立即停止遍历目录树:

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // 对于每个常规文件,启动一个对文件计算摘要值并将结果发送到c的 goroutine。在errc上发送walk的结果。
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // 如果done关闭了就停止遍历。
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk已返回,因此对wg.Add的所有调用都已完成。完成所有发送后,启动一个goroutine以关闭c。
        go func() {
            wg.Wait()
            close(c)
        }()
        // 这里不需要select,因为errc是有缓冲区的通道。
        errc <- err
    }()
    return c, errc
}

MD5Allc. MD5All接收摘要值。MD5All在出错时提前返回,通过defer关闭done

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All返回时关闭done通道;它可以在接收到来自c和errc的所有值之前这样做。
    done := make(chan struct{})
    defer close(done)          

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

有上界的并行数

parallel.go中的MD5All为每个文件启动一个新的goroutine。在包含许多大文件的目录中,这可能会分配比机器上可用的内存更多的内存。

我们可以通过限制并行读取的文件数量来限制内存分配。在bounded.go中,我们通过创建固定数量的goroutine来读取文件来做到这一点。我们的管道现在具有三个阶段:遍历目录树、读取和计算文件的摘要值以及收集摘要值。

第一阶段walkFiles发送目录树中常规文件的路径:

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // Walk返回时关闭paths通道。
        defer close(paths)
        // 这里的发送无需select,因为errc是有缓冲区的通道。
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}

中间阶段启动固定数量的digester goroutine,它们从paths接收文件名并在通道c上发送结果:

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}

与我们之前的示例不同,digester不会关闭其输出通道,因为多个goroutine在共享通道上发送。相反,MD5All中的代码安排在所有digester完成后关闭通道:

 // 启动固定数量的goroutine来读取和计算文件的摘要值。
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()

我们可以让每个digester创建并返回它自己的输出通道,但是我们需要额外的goroutine来扇入结果。

最后一个阶段接收来自c的所有results,然后检查来自errc的错误。此检查不能更早发生,因为在此之前,walkFiles可能会阻止向下游发送值:

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // 检查Walk是否失败。
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

总结

本文介绍了在Go中构建流数据管道的技术。处理此类管道中的故障很棘手,因为管道中的每个阶段都可能被阻塞而不能向下游发送值,并且下游阶段可能不再关心传入的数据值。我们展示了关闭通道如何向管道启动的所有goroutine广播“完成”信号,并定义了正确构建管道的准则。

进一步阅读:

Navicat出现Access violation at address 00000000. Read of address 00000000问题的解决方法

我的操作系统是Winodws 10,在使用Navicat Premium 16的模型或图表工作区时,弹出一个错误对话框说Access violation at address 00000000. Read of address 00000000。

出现这个问题的原因,应该是Winodws 10操作系统里安装的C++运行库与Navicat Premium 16所要求的版本不一致。Navicat Premium 16在Winodws 7操作系统里并不会出现这个问题。

因此这个问题的解决方法可以是,先在Winodws 10操作系统里安装VirtualBox,再安装Winodws 7虚拟机,然后在该虚拟机里安装和使用Navicat Premium 16。

为什么国外把邮件当微信一样发?

打个比喻,用微信和下属沟通,有点像皇帝和奴才太监的关系,随叫随到、不分场合、立马回应;用邮件沟通,是皇帝和大臣的关系,更正式、更有仪式感、且对上位者的限制更多。

真上过班的人,才会发现工作用邮件,比用微信幸福多了。邮件办公最大的好处,是不会期待对方“秒回”,所以收件方有足够的时间去整理信息,有更多的弹性时间,即便周末收到也可以等到周一上班再回。而且因为邮件“非即时性”的特点,发起邮件对话的一方(通常是老板、或者甲方)会对自己编写的信息更认真,会想清楚自己到底要布置什么任务才会发送, 不会像微信这种“即时性”软件一样,很多老板是突然兴起了,就给员工或者给工作群里丢一个信息,甚至连文字都没有就一段语音过来了,逻辑也不通顺、信息也不明了,搞的接收方一头雾水的,有时候还要猜,要是群里都不理还好,只要有一个卷的人兴致勃勃的回复,那大家都要跟着卷起来。

不知道有没有人体验过,一个大老板在群里转发一篇莫名其妙的文章后,一堆中层小领导排队写小作文感想的奇葩场景。这就是“移动办公无处不在”给劳动者带来的困境。

所以,客观上邮件是不方便的,但就是这种不方便反而利于劳动者,这是对劳动者私人空间的尊重,是对上/下班界限的明晰。

来源知乎用户@李小粥