var partner = make(chan io.ReadWriteCloser)
func match(c io.ReadWriteCloser) {
fmt.Fprint(c, "Waiting for a partner...")
select {
case partner <- c:
// 现在由另一个goroutine处理
case p := <-partner:
chat(p, c)
}
}
23 交谈
聊天功能向每个连接发送问候,然后将数据从一个连接复制到另一个,反之亦然。
请注意,它启动了另一个goroutine,以便复制操作可能同时发生。
func chat(a, b io.ReadWriteCloser) {
fmt.Fprintln(a, "Found one! Say hi.")
fmt.Fprintln(b, "Found one! Say hi.")
go io.Copy(a, b)
io.Copy(b, a)
}
type A struct{}
func (A) Hello() {
fmt.Println("Hello!")
}
type B struct {
A
}
// func (b B) Hello() { b.A.Hello() } // (隐式的!)
func main() {
var b B
b.Hello()
}
type socket struct {
io.ReadWriter
done chan bool
}
func (s socket) Close() error {
s.done <- true
return nil
}
func socketHandler(ws *websocket.Conn) {
s := socket{ws, make(chan bool)}
go match(s)
<-s.done
}
35 演示
36 缓解孤独
如果你连接了,但那里没有人怎么办?
如果我们能合成一个聊天伙伴不是很好吗?
我们开始做吧。
37 使用马尔可夫链生成文本
Source
"I am not a number! I am a free man!"
Prefix Suffix
"" "" "I"
"" "I" "am"
"I" "am" "a"
"I" "am" "not"
"a" "free" "man!"
"am" "a" "free"
"am" "not" "a"
"a" "number!" "I"
"number!" "I" "am"
"not" "a" "number!"
Generated sentences beginning with the prefix "I am"
"I am a free man!"
"I am not a number! I am a free man!"
"I am not a number! I am not a number! I am a free man!"
"I am not a number! I am not a number! I am not a number! I am a free man!"
func match(c io.ReadWriteCloser) {
fmt.Fprint(c, "Waiting for a partner...")
select {
case partner <- c:
// 现在由另一个goroutine处理
case p := <-partner:
chat(p, c)
case <-time.After(5 * time.Second):
chat(Bot(), c)
}
}
42 演示
43 还有一件事情
44 同时使用TCP和HTTP
func main() {
go netListen()
http.HandleFunc("/", rootHandler)
http.Handle("/socket", websocket.Handler(socketHandler))
err := http.ListenAndServe(listenAddr, nil)
if err != nil {
log.Fatal(err)
}
}
func netListen() {
l, err := net.Listen("tcp", "localhost:4001")
if err != nil {
log.Fatal(err)
}
for {
c, err := l.Accept()
if err != nil {
log.Fatal(err)
}
go match(c)
}
}
搞一段时间没有进展,团队里优秀的人会不断被人挖走。 第一个人被挖走的时候你觉得这哥们叛变革命了,多走几个人你就觉得革命叛变了自己,你就会质疑自己是方向选错了,还是行业选错了,还是做法有问题,还是自己能力不够,还是资源不够,还是投资人不行,会陷入自我否定。 此外帮人一起创业总有一个领头的,公司里领头的通常就是 CEO, CEO 平时要见投资人见媒体要招人,慢慢精力就不在业务上了,而 CTO 是实际管事的,业务发展方向是 CEO 定的,千了一段时间如果没有进展,实际干活的 CTO 就会受到很多职位的诱惑,并且会对业务的发展产生怀疑,如果 CEO 说没搞错大家接着干,CTO 会觉得 CEO 很难沟通,听不进团队意见反馈,可能自己不受认可和尊重,可能就离职了,如果 CEO 让 CTO 负责改版,CTO 改版通常不靠谱,这次改版可能把 CEO 原来的想法颠覆掉了,如果改版不成功,试个 2 次这个创业团队就会面临家里的压力,如果没有进展可能创业队就解散了,所以大部分创业团队会在第二年年底解散。
for {
if s.closed {
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
s.err = err
time.Sleep(10 * time.Second)
continue
}
for _, item := range items {
s.updates <- item
}
if now := time.Now(); next.After(now) {
time.Sleep(next.Sub(now))
}
}
func (s *naiveSub) Close() error {
s.closed = true
return s.err
}
19 Bug 1:对s.closed/s.err的非同步访问
for {
if s.closed {
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
s.err = err
time.Sleep(10 * time.Second)
continue
}
for _, item := range items {
s.updates <- item
}
if now := time.Now(); next.After(now) {
time.Sleep(next.Sub(now))
}
}
func (s *naiveSub) Close() error {
s.closed = true
return s.err
}
20 竞态(Race)检测器
go run -race naivemain.go
for {
if s.closed {
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
s.err = err
func (s *naiveSub) Close() error {
s.closed = true
return s.err
}
21 Bug 2:time.Sleep可能会保持循环运行
for {
if s.closed {
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
s.err = err
time.Sleep(10 * time.Second)
continue
}
for _, item := range items {
s.updates <- item
}
if now := time.Now(); next.After(now) {
time.Sleep(next.Sub(now))
}
}
22 Bug 3:循环可能会在s.updates上永远阻塞
for {
if s.closed {
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
s.err = err
time.Sleep(10 * time.Second)
continue
}
for _, item := range items {
s.updates <- item
}
if now := time.Now(); next.After(now) {
time.Sleep(next.Sub(now))
}
}
23 解决方案
将循环体更改为具有三种情况的select:
Close被调用时
调用Fetch时
在s.updates上发送条目
24 结构:for-select循环
循环在它自己的goroutine中运行。
select让循环避免在任何一种状态下无限期地阻塞。
func (s *sub) loop() {
... declare mutable state ...
for {
... set up channels for cases ...
select {
case <-c1:
... read/write state ...
case c2 <- x:
... read/write state ...
case y := <-c3:
... read/write state ...
}
}
}
var err error // 当Fetch失败时设置错误
for {
select {
case errc := <-s.closing:
errc <- err
close(s.updates) // 告诉接收者我们做完了
return
}
}
27 Case 2: Fetch
在延迟一段时间后安排下一次Fetch。
var pending []Item // 被fetch追加;被send消费
var next time.Time // 初始化值是January 1, year 0
var err error
for {
var fetchDelay time.Duration // 初始化值是0(没有延迟)
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
startFetch := time.After(fetchDelay)
select {
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
}
}
28 Case 3: Send
一次发送一个获取的条目。
var pending []Item // 被fetch追加;被send消费
for {
select {
case s.updates <- pending[0]:
pending = pending[1:]
}
}
哎呀。这崩溃了。
29 select和nil通道
在nil通道上发送和接收会被阻塞。
select从不选择阻塞的case。
func main() {
a, b := make(chan string), make(chan string)
go func() { a <- "a" }()
go func() { b <- "b" }()
if rand.Intn(2) == 0 {
a = nil
fmt.Println("nil a")
} else {
b = nil
fmt.Println("nil b")
}
select {
case s := <-a:
fmt.Println("got", s)
case s := <-b:
fmt.Println("got", s)
}
}
30 Case 3: Send (修复的)
仅在pending非空时启用send。
var pending []Item // 被fetch追加;被send消费
for {
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates // 启用send case
}
select {
case updates <- first:
pending = pending[1:]
}
}
31 select
将三个case放在一起:
select {
case errc := <-s.closing:
errc <- err
close(s.updates)
return
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
case updates <- first:
pending = pending[1:]
}
这些case通过err、next和pending交互。
没有锁,没有条件变量,没有回调函数。
32 已修复的Bug
Bug 1:对s.closed和s.err的非同步(unsynchronized)访问
Bug 2:time.Sleep可能会保持循环运行
Bug 3:循环可能会永远阻塞在发送s.updates上
select {
case errc := <-s.closing:
errc <- err
close(s.updates)
return
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
case updates <- first:
pending = pending[1:]
}
33 我们可以进一步改进循环
34 问题:Fetch可能会返回重复项
var pending []Item
var next time.Time
var err error
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
35 修复:在添加到pending之前过滤条目
var pending []Item
var next time.Time
var err error
var seen = make(map[string]bool) // item.GUIDs集合
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
if !seen[item.GUID] {
pending = append(pending, item)
seen[item.GUID] = true
}
}
36 问题:pending队列无限制地增长
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
if !seen[item.GUID] {
pending = append(pending, item)
seen[item.GUID] = true
}
}
37 问题:pending队列无限制地增长
const maxPending = 10
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
var startFetch <-chan time.Time
if len(pending) < maxPending {
startFetch = time.After(fetchDelay) // 启用fetch case
}
可以改为从pending的头部删除较旧的条目。
38 问题:Fetch上的循环被阻塞
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
if !seen[item.GUID] {
pending = append(pending, item)
seen[item.GUID] = true
}
}
39 修复:异步运行Fetch
为fetchDone添加一个新的select case。
type fetchResult struct{ fetched []Item; next time.Time; err error }
var fetchDone chan fetchResult // 如果不是nil,则表示Fetch正在运行
var startFetch <-chan time.Time
if fetchDone == nil && len(pending) < maxPending {
startFetch = time.After(fetchDelay) // 启用fetch case
}
select {
case <-startFetch:
fetchDone = make(chan fetchResult, 1)
go func() {
fetched, next, err := s.fetcher.Fetch()
fetchDone <- fetchResult{fetched, next, err}
}()
case result := <-fetchDone:
fetchDone = nil
// 使用result.fetched, result.next, result.err
c := boring("boring!") // 返回一个通道的函数。
for i := 0; i < 5; i++ {
fmt.Printf("You say: %q\n", <-c)
}
fmt.Println("You're boring; I'm leaving.")
func boring(msg string) <-chan string { // 返回只能用来接收值(receive-only)的通道
c := make(chan string)
go func() { // 我们在函数内部开启一个goroutine。
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // 把通道返回给本函数的调用者。
}
26 通道作为服务的句柄
我们的boring函数返回一个通道,让我们可以与它提供的boring服务进行通信。
我们可以有该服务的更多实例。
func main() {
joe := boring("Joe")
ann := boring("Ann")
for i := 0; i < 5; i++ {
fmt.Println(<-joe)
fmt.Println(<-ann)
}
fmt.Println("You're both boring; I'm leaving.")
}
27 多路复用
这些计划使Joe和Ann步调一致。
我们可以改为使用扇入功能让准备好的人说话。
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() { for { c <- <-input1 } }()
go func() { for { c <- <-input2 } }()
return c
}
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
fmt.Println("You're both boring; I'm leaving.")
}
select {
case v1 := <-c1:
fmt.Printf("received %v from c1\n", v1)
case v2 := <-c2:
fmt.Printf("received %v from c2\n", v1)
case c3 <- 23:
fmt.Printf("sent %v to c3\n", 23)
default:
fmt.Printf("no one was ready to communicate\n")
}
33 再次扇入
重写我们原来的fanIn函数。实际上只需要一个goroutine就能实现。旧的:
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() { for { c <- <-input1 } }()
go func() { for { c <- <-input2 } }()
return c
}
34 使用select扇入
重写我们原来的fanIn函数。只需要一个goroutine就能实现。新的:
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() {
for {
select {
case s := <-input1: c <- s
case s := <-input2: c <- s
}
}
}()
return c
}
35 使用select超时
time.After函数返回一个阻塞指定持续时间段的通道。
在该时间段过去之后,通道将传递当前时间一次。
func main() {
c := boring("Joe")
for {
select {
case s := <-c:
fmt.Println(s)
case <-time.After(1 * time.Second):
fmt.Println("You're too slow.")
return
}
}
}
36 使用select的整个对话超时
在循环外创建一次计时器,以使整个对话超时。
(在前面的程序中,我们对每条消息都有一个超时。)
func main() {
c := boring("Joe")
timeout := time.After(5 * time.Second)
for {
select {
case s := <-c:
fmt.Println(s)
case <-timeout:
fmt.Println("You talk too much.")
return
}
}
}
37 退出通道
我们可以扭转这种局面,告诉Joe在我们听腻了他的时候停下来。
quit := make(chan bool)
c := boring("Joe", quit)
for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
quit <- true
select {
case c <- fmt.Sprintf("%s: %d", msg, i):
// do nothing
case <-quit:
return
}
38 从quit通道接收
我们怎么知道它已经完成了?等待它告诉我们它已经完成:在退出通道接收
quit := make(chan string)
c := boring("Joe", quit)
for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
quit <- "Bye!"
fmt.Printf("Joe says: %q\n", <-quit)
select {
case c <- fmt.Sprintf("%s: %d", msg, i):
// do nothing
case <-quit:
cleanup()
quit <- "See you!"
return
}
39 通道菊花链
func f(left, right chan int) {
left <- 1 + <-right
}
func main() {
const n = 10000
leftmost := make(chan int)
right := leftmost
left := leftmost
for i := 0; i < n; i++ {
right = make(chan int)
go f(left, right)
left = right
}
go func(c chan int) { c <- 1 }(right)
fmt.Println(<-leftmost)
}
40 中国耳语,地鼠风格
41 系统软件
Go是为编写系统软件而设计的。
让我们看看并发特性是如何发挥作用的。
42 示例:谷歌搜索
Q:谷歌搜索做什么?
A:给定一个查询,返回一页搜索结果(和一些广告)。
Q:我们如何获得搜索结果?
A:将查询发送到Web搜索、图像搜索、YouTube、地图、新闻等。然后混合结果。
我们如何实现这一点?
43 谷歌搜索:一个虚假的框架
我们可以模拟搜索功能,就像我们之前模拟对话一样。
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
type Search func(query string) Result
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- Web(query) } ()
go func() { c <- Image(query) } ()
go func() { c <- Video(query) } ()
for i := 0; i < 3; i++ {
result := <-c
results = append(results, result)
}
return
}
47 谷歌搜索 2.1
不等待慢速服务器。没有锁。没有条件变量。没有回调。
c := make(chan Result)
go func() { c <- Web(query) } ()
go func() { c <- Image(query) } ()
go func() { c <- Video(query) } ()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return
(译者注:增加了超时逻辑。)
48 避免超时
问:我们如何避免丢弃来自慢速服务器的结果?
答:复制服务器。向多个副本发送请求,并使用第一个响应。
func First(query string, replicas ...Search) Result {
c := make(chan Result)
searchReplica := func(i int) { c <- replicas[i](query) }
for i := range replicas {
go searchReplica(i)
}
return <-c
}