在Go语言里,把nil赋值给通道变量c,c就成为了nil通道。在nil通道上发送或接收都会被阻塞。
nil通道的使用场景之一是,select的某个case的通道根据某个条件来启用或禁用,当条件为true时,启用通道,case块代码有机会被执行,当条件为false时,把nil赋值给该通道使其成为nil通道,以禁用该通道(一直阻塞直到条件变为true),case块代码不会被执行。
在Go语言里,把nil赋值给通道变量c,c就成为了nil通道。在nil通道上发送或接收都会被阻塞。
nil通道的使用场景之一是,select的某个case的通道根据某个条件来启用或禁用,当条件为true时,启用通道,case块代码有机会被执行,当条件为false时,把nil赋值给该通道使其成为nil通道,以禁用该通道(一直阻塞直到条件变为true),case块代码不会被执行。
本文翻译自《How to Setup FTP Server with VSFTPD on CentOS 7》。
FTP(File Transfer Protocol,文件传输协议)是一种标准的客户端-服务器(client-server)网络协议,允许用户在远程网络之间传输文件。
有几个可用于Linux的开源FTP服务器。最流行和使用最广泛的是PureFTPd、ProFTPD和vsftpd。
在本教程中,我们将在CentOS 7上安装vsftpd(非常安全的Ftp守护进程)。它是一个稳定、安全和快速的FTP服务器。我们还将向你展示如何配置vsftpd以将用户限制在他们的家目录中并使用SSL/TLS加密整个传输。
先决条件
在继续本教程之前,请确保你以具有sudo权限的用户身份登录。
在CentOS 7上安装vsftpd
vsftpd软件包在默认的CentOS存储库中可用。要安装它,请执行以下命令:
$ sudo yum install vsftpd
安装软件包后,启动vsftpd守护进程并使其在操作系统启动时自动启动:
$ sudo systemctl start vsftpd
$ sudo systemctl enable vsftpd
你可以通过打印其状态来验证vsftpd服务是否正在运行:
$ sudo systemctl status vsftpd
输出如下所示,表明vsftpd服务处于活动状态并正在运行:
● vsftpd.service - Vsftpd ftp daemon
Loaded: loaded (/usr/lib/systemd/system/vsftpd.service; enabled; vendor preset: disabled)
Active: active (running) since Thu 2018-11-22 09:42:37 UTC; 6s ago
Main PID: 29612 (vsftpd)
CGroup: /system.slice/vsftpd.service
└─29612 /usr/sbin/vsftpd /etc/vsftpd/vsftpd.conf
配置vsftpd
配置vsftpd服务涉及编辑/etc/vsftpd/vsftpd.conf
配置文件。大多数设置都在配置文件中详细记录。有关所有可用选项,请访问官方vsftpd页面。
在以下部分中,我们将介绍配置安全的vsftpd安装所需的一些重要设置。
首先打开vsftpd配置文件:
$ sudo nano /etc/vsftpd/vsftpd.conf
1. FTP访问
我们将只允许本地用户访问FTP服务器,找到anonymous_enable
和local_enable
指令并验证你的配置是否与以下行匹配:
anonymous_enable=NO
local_enable=YES
2. 启用上传
取消注释write_enable
设置以允许更改文件系统,例如上传和删除文件。
write_enable=YES
3. Chroot监狱
通过取消注释chroot指令来防止FTP用户访问其家目录之外的任何文件。
chroot_local_user=YES
默认情况下,启用chroot时,如果用户锁定的目录是可写的,vsftpd将拒绝上传文件。这是为了防止安全漏洞。
启用chroot时,使用以下方法之一允许上传。
方法 1. – 允许上传的推荐方法是启用chroot并配置FTP目录。在本教程中,我们将在用户家目录中创建一个ftp目录,该目录将用作chroot和一个可写的上传目录,用于上传文件。
user_sub_token=$USER
local_root=/home/$USER/ftp
方法 2. – 另一种选择是在vsftpd配置文件中添加以下指令。如果你必须授予用户对其家目录的可写访问权限,请使用此选项。
allow_writeable_chroot=YES
4. 被动FTP连接
vsftpd可以使用任何端口进行被动FTP连接。我们将指定端口的最小和最大范围,然后在我们的防火墙中打开该范围。
将以下行添加到配置文件中:
pasv_min_port=30000
pasv_max_port=31000
5. 限制用户登录
要仅允许某些用户登录FTP服务器,请在userlist_enable=YES
行之后添加以下行:
userlist_file=/etc/vsftpd/user_list
userlist_deny=NO
启用此选项后,你需要通过将用户名添加到/etc/vsftpd/user_list
文件(每行一个用户名)来明确指定哪些用户能够登录。
6. 使用SSL/TLS保护传输
为了使用SSL/TLS加密FTP传输,你需要拥有SSL证书并配置FTP服务器以使用它。
你可以使用由受信任的证书颁发机构签名的现有的SSL证书或创建自签名证书。
如果你有一个指向FTP服务器IP地址的域名或子域名,你可以轻松生成免费的Let’s Encrypt SSL证书。
在本教程中,我们将使用openssl
命令生成自签名的SSL证书。
以下命令将创建一个有效期为10年的2048位私钥和自签名证书。私钥和证书都将保存在同一个文件中:
$ sudo openssl req -x509 -nodes -days 3650 -newkey rsa:2048 -keyout /etc/vsftpd/vsftpd.pem -out /etc/vsftpd/vsftpd.pem
创建SSL证书后,打开vsftpd配置文件:
$ sudo nano /etc/vsftpd/vsftpd.conf
找到rsa_cert_file
和rsa_private_key_file
指令,将它们的值更改为pem文件路径并将ssl_enable
指令设置为YES
:
rsa_cert_file=/etc/vsftpd/vsftpd.pem
rsa_private_key_file=/etc/vsftpd/vsftpd.pem
ssl_enable=YES
如果没有另外指定,FTP服务器将只使用TLS来建立安全连接。
重启vsftpd服务
完成编辑后,vsftpd配置文件(不包括注释)应如下所示:
anonymous_enable=NO
local_enable=YES
write_enable=YES
local_umask=022
dirmessage_enable=YES
xferlog_enable=YES
connect_from_port_20=YES
xferlog_std_format=YES
chroot_local_user=YES
listen=NO
listen_ipv6=YES
pam_service_name=vsftpd
userlist_enable=YES
userlist_file=/etc/vsftpd/user_list
userlist_deny=NO
tcp_wrappers=YES
user_sub_token=$USER
local_root=/home/$USER/ftp
pasv_min_port=30000
pasv_max_port=31000
rsa_cert_file=/etc/vsftpd/vsftpd.pem
rsa_private_key_file=/etc/vsftpd/vsftpd.pem
ssl_enable=YES
保存文件并重新启动vsftpd服务以使更改生效:
$ sudo systemctl restart vsftpd
打开防火墙
如果你正在运行防火墙,则需要允许FTP流量。 要打开端口21
(FTP命令端口)、端口20
(FTP数据端口)和30000-31000
(被动端口范围),请发出以下命令:
$ sudo firewall-cmd --permanent --add-port=20-21/tcp
$ sudo firewall-cmd --permanent --add-port=30000-31000/tcp
通过键入以下内容重新加载防火墙规则:
$ firewall-cmd --reload
创建一个FTP用户
为了测试我们的FTP服务器,我们将创建一个新用户。
allow_writeable_chroot=YES
,请跳过第3步。1 创建一个名为newftpuser
的新用户:
$ sudo adduser newftpuser
接下来,你需要设置用户密码:
$ sudo passwd newftpuser
2 将用户添加到允许的FTP用户列表:
$ echo "newftpuser" | sudo tee -a /etc/vsftpd/user_list
3 创建FTP目录树并设置正确的权限:
$ sudo mkdir -p /home/newftpuser/ftp/upload
$ sudo chmod 550 /home/newftpuser/ftp
$ sudo chmod 750 /home/newftpuser/ftp/upload
$ sudo chown -R newftpuser: /home/newftpuser/ftp
如上一节所述,用户将能够将其文件上传到ftp/upload
目录。
此时,你的FTP服务器功能齐全,你应该能够使用任何可配置为使用TLS加密的FTP客户端(例如FileZilla)连接到你的服务器。
禁止访问shell
默认情况下,创建用户时,如果未明确指定,用户将拥有对服务器的SSH访问权限。
要禁止访问shell,我们将创建一个新的shell,它会简单地打印一条消息,告诉用户他们的账户仅限于FTP访问。
运行以下命令来创建/bin/ftponly
shell并使其可执行:
$ echo -e '#!/bin/sh\necho "This account is limited to FTP access only."' | sudo tee -a /bin/ftponly
$ sudo chmod a+x /bin/ftponly
将新的shell附加到/etc/shells
文件中的有效shell列表中:
$ echo "/bin/ftponly" | sudo tee -a /etc/shells
将用户shell更改为/bin/ftponly
:
$ sudo usermod newftpuser -s /bin/ftponly
使用相同的命令为你希望仅授予FTP访问权限的其他用户更改shell。
总结
在本教程中,你学到了如何在CentOS 7系统上安装和配置安全快速的FTP服务器。
如果你有任何问题或反馈,请随时发表评论。
本文翻译自《Go Concurrency Patterns》,是个PPT。
1 Go Concurrency Patterns
Rob Pike
2 视频
这个演讲是在2012年6月在Google I/O大会上做的。
3 介绍
4 Go中的并发特性
当Go语言首次发布时,人们似乎对Go的并发特性着迷。
问题:
5 为什么?
看看你周围。你看到了什么?
你是否看到一个单步的世界一次只做一件事?
或者你是否看到了一个由相互作用、行为独立的片段组成的复杂世界?
这就是为什么。顺序处理(Sequential processing)本身并不能模拟世界的行为。
6 什么是并发?
并发是独立地执行计算的组合。
并发是一种构建软件的方式,特别是编写与现实世界交互良好的干净代码的方式。
它不是并行。
7 并发不是并行
并发不是并行,尽管它支持并行。
如果你只有一个处理器,你的程序仍然可以并发但不能并行。
另一方面,编写良好的并发程序可能会在多处理器上高效地并行运行。那个属性可能很重要…
有关该区别的更多信息,请参阅下面的链接。这里讨论的太多了。
golang.org/s/concurrency-is-not-parallelism
8 软件构建模型
容易理解。
便于使用。
容易推理。
你不需要成为专家!
(比处理并行的细节(线程、信号量、锁、屏障等)要好得多。)
9 历史
对许多人来说,Go的并发特性似乎很新。
但它们植根于悠久的历史,可以追溯到1978年Hoare的CSP甚至Dijkstra的守卫命令(1975)。
具有相似特征的语言:
10 区别
Go是Newsqueak-Alef-Limbo分支的最新成员,以第一公民的通道而著称。
Erlang更接近于原始CSP,你可以通过名字而不是通过通道与进程通信。
这些模型是等效的,但表达方式不同。
粗略的类比:按名字写入文件(进程,Erlang)与写入文件描述符(通道,Go)。
11 基本例子
12 一个无聊的函数
我们需要一个例子来展示并发原语的有趣属性。
为了避免分心,我们把它作为一个无聊的例子。
func boring(msg string) {
for i := 0; ; i++ {
fmt.Println(msg, i)
time.Sleep(time.Second)
}
}
13 不那么无聊
使消息之间的间隔不可预测(仍不到一秒)。
func boring(msg string) {
for i := 0; ; i++ {
fmt.Println(msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}
14 运行它
无聊的功能永远运行,就像一个无聊的派对客人。
func main() {
boring("boring!")
}
func boring(msg string) {
for i := 0; ; i++ {
fmt.Println(msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}
15 忽略它
go语句照常运行函数,但不会让调用者等待。
它启动一个goroutine。 该功能类似于shell命令末尾的&。
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
go boring("boring!")
}
16 少一点忽略
当main返回时,程序退出并带走boring函数。
我们可以闲逛一下,在路上显示main和已启动的goroutine都在运行。
func main() {
go boring("boring!")
fmt.Println("I'm listening.")
time.Sleep(2 * time.Second)
fmt.Println("You're boring; I'm leaving.")
}
17 协程(Goroutine)
什么是goroutine?它是一个独立执行的函数,由go语句启动。
它有自己的调用堆栈,可以根据需要增长和缩小。
它非常廉价。实践中,拥有数千甚至数十万个goroutine是很常见的。
这不是一个线程。
一个包含数千个goroutine的程序中可能只有一个线程。
相反,goroutines会根据需要动态地多路复用到线程上,以保持所有goroutines运行。
但是,如果你将其视为非常廉价的线程,那么你将不会走太远。
18 通信(Communication)
我们boring的例子被骗了:main函数看不到其他goroutine的输出。
它只是打印到屏幕上,我们假装看到了对话。
真正的对话需要通信。
19 通道(Channel)
Go中的通道提供了两个goroutine之间的连接,允许它们进行通信。
// 声明和初始化。
var c chan int
c = make(chan int)
// 或者
c := make(chan int)
// 在通道上发送数据。
c <- 1
// 从通道里取数据。
// 箭头指明了数据流的方向。
value = <-c
20 使用通道
通道连接main和boring的goroutine,以便它们可以通信。
func main() {
c := make(chan string)
go boring("boring!", c)
for i := 0; i < 5; i++ {
fmt.Printf("You say: %q\n", <-c) // 接收表达式仅是一个值。
}
fmt.Println("You're boring; I'm leaving.")
}
func boring(msg string, c chan string) {
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i) // 将被发送的表达式可以是任意合适的值。
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}
21 同步(Synchronization)
当main函数执行<–c时,它会等待一个值被发送。
同样,当boring函数执行c <– value时,它会等待接收者准备好,再把值发送到通道。
发送者和接收者都必须准备好在通信中发挥自己的作用。否则我们会等待直到他们准备好。 因此,通道既可以通信又可以同步。
22 关于带缓冲区的通道的旁白
专家提示:Go也可以创建带缓冲区的通道。
缓冲区消除了同步。
通道的缓冲区使它们更像Erlang的邮箱。
带缓冲区的通道对于某些问题可能很重要,但它们需要更加微妙的推理。
我们今天不需要它们。
23 Go的使用方式
不要通过共享内存来通信,而是通过通信来共享内存。
24 模式
25 生成器:返回一个通道的函数
通道是一等公民(first-class)的值,就像字符串或整数一样。
c := boring("boring!") // 返回一个通道的函数。
for i := 0; i < 5; i++ {
fmt.Printf("You say: %q\n", <-c)
}
fmt.Println("You're boring; I'm leaving.")
func boring(msg string) <-chan string { // 返回只能用来接收值(receive-only)的通道
c := make(chan string)
go func() { // 我们在函数内部开启一个goroutine。
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // 把通道返回给本函数的调用者。
}
26 通道作为服务的句柄
我们的boring函数返回一个通道,让我们可以与它提供的boring服务进行通信。
我们可以有该服务的更多实例。
func main() {
joe := boring("Joe")
ann := boring("Ann")
for i := 0; i < 5; i++ {
fmt.Println(<-joe)
fmt.Println(<-ann)
}
fmt.Println("You're both boring; I'm leaving.")
}
27 多路复用
这些计划使Joe和Ann步调一致。
我们可以改为使用扇入功能让准备好的人说话。
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() { for { c <- <-input1 } }()
go func() { for { c <- <-input2 } }()
return c
}
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
fmt.Println("You're both boring; I'm leaving.")
}
28 扇入(Fan-in)
29 恢复顺序
在一个通道上发送一个通道,让goroutine等待轮到它。
接收所有消息,然后在专用通道上发送,再次启用它们。 首先,我们定义一个包含回复用的通道的消息类型。
type Message struct {
str string
wait chan bool
}
30 恢复顺序
每个发言者都必须等待批准。
for i := 0; i < 5; i++ {
msg1 := <-c; fmt.Println(msg1.str)
msg2 := <-c; fmt.Println(msg2.str)
msg1.wait <- true
msg2.wait <- true
}
waitForIt := make(chan bool) // 所有消息实例共享的通道。
// (译者注:以下是每个发言者协程的关键代码)
c <- Message{ fmt.Sprintf("%s: %d", msg, i), waitForIt }
time.Sleep(time.Duration(rand.Intn(2e3)) * time.Millisecond)
<-waitForIt
(译者注:上述程序实现每个发言者发言后,必须等待批准,才能继续运行下去。)
31 select
并发独有的控制结构。
和通道、goroutine一起内置在语言中。
32 select
select语句提供了另一种处理多个通道的方法。
这就像一个开关,但每个case都是一个通信:
– 评估所有通道。
– 阻塞选择,直到一个通信可以继续,然后它会继续。
– 如果有多个可以继续,伪随机选择其中一个。 – 如果没有一个通道准备好,则默认子句(如果存在)立即执行。
select {
case v1 := <-c1:
fmt.Printf("received %v from c1\n", v1)
case v2 := <-c2:
fmt.Printf("received %v from c2\n", v1)
case c3 <- 23:
fmt.Printf("sent %v to c3\n", 23)
default:
fmt.Printf("no one was ready to communicate\n")
}
33 再次扇入
重写我们原来的fanIn函数。实际上只需要一个goroutine就能实现。旧的:
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() { for { c <- <-input1 } }()
go func() { for { c <- <-input2 } }()
return c
}
34 使用select扇入
重写我们原来的fanIn函数。只需要一个goroutine就能实现。新的:
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() {
for {
select {
case s := <-input1: c <- s
case s := <-input2: c <- s
}
}
}()
return c
}
35 使用select超时
time.After函数返回一个阻塞指定持续时间段的通道。
在该时间段过去之后,通道将传递当前时间一次。
func main() {
c := boring("Joe")
for {
select {
case s := <-c:
fmt.Println(s)
case <-time.After(1 * time.Second):
fmt.Println("You're too slow.")
return
}
}
}
36 使用select的整个对话超时
在循环外创建一次计时器,以使整个对话超时。
(在前面的程序中,我们对每条消息都有一个超时。)
func main() {
c := boring("Joe")
timeout := time.After(5 * time.Second)
for {
select {
case s := <-c:
fmt.Println(s)
case <-timeout:
fmt.Println("You talk too much.")
return
}
}
}
37 退出通道
我们可以扭转这种局面,告诉Joe在我们听腻了他的时候停下来。
quit := make(chan bool)
c := boring("Joe", quit)
for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
quit <- true
select {
case c <- fmt.Sprintf("%s: %d", msg, i):
// do nothing
case <-quit:
return
}
38 从quit通道接收
我们怎么知道它已经完成了?等待它告诉我们它已经完成:在退出通道接收
quit := make(chan string)
c := boring("Joe", quit)
for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
quit <- "Bye!"
fmt.Printf("Joe says: %q\n", <-quit)
select {
case c <- fmt.Sprintf("%s: %d", msg, i):
// do nothing
case <-quit:
cleanup()
quit <- "See you!"
return
}
39 通道菊花链
func f(left, right chan int) {
left <- 1 + <-right
}
func main() {
const n = 10000
leftmost := make(chan int)
right := leftmost
left := leftmost
for i := 0; i < n; i++ {
right = make(chan int)
go f(left, right)
left = right
}
go func(c chan int) { c <- 1 }(right)
fmt.Println(<-leftmost)
}
40 中国耳语,地鼠风格
41 系统软件
Go是为编写系统软件而设计的。
让我们看看并发特性是如何发挥作用的。
42 示例:谷歌搜索
Q:谷歌搜索做什么?
A:给定一个查询,返回一页搜索结果(和一些广告)。
Q:我们如何获得搜索结果?
A:将查询发送到Web搜索、图像搜索、YouTube、地图、新闻等。然后混合结果。
我们如何实现这一点?
43 谷歌搜索:一个虚假的框架
我们可以模拟搜索功能,就像我们之前模拟对话一样。
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
type Search func(query string) Result
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
44 Google 搜索:测试这个框架
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
45 谷歌搜索 1.0
Google函数接受查询并返回结果片段(只是字符串)。
Google会依次调用Web、图像和视频搜索,并将它们附加到结果切片中。
func Google(query string) (results []Result) {
results = append(results, Web(query))
results = append(results, Image(query))
results = append(results, Video(query))
return
}
46 谷歌搜索 2.0
同时运行Web、图像和视频搜索,并等待所有结果。
没有锁。没有条件变量。没有回调。
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- Web(query) } ()
go func() { c <- Image(query) } ()
go func() { c <- Video(query) } ()
for i := 0; i < 3; i++ {
result := <-c
results = append(results, result)
}
return
}
47 谷歌搜索 2.1
不等待慢速服务器。没有锁。没有条件变量。没有回调。
c := make(chan Result)
go func() { c <- Web(query) } ()
go func() { c <- Image(query) } ()
go func() { c <- Video(query) } ()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return
(译者注:增加了超时逻辑。)
48 避免超时
问:我们如何避免丢弃来自慢速服务器的结果?
答:复制服务器。向多个副本发送请求,并使用第一个响应。
func First(query string, replicas ...Search) Result {
c := make(chan Result)
searchReplica := func(i int) { c <- replicas[i](query) }
for i := range replicas {
go searchReplica(i)
}
return <-c
}
49 使用第一个函数
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
result := First("golang",
fakeSearch("replica 1"),
fakeSearch("replica 2"))
elapsed := time.Since(start)
fmt.Println(result)
fmt.Println(elapsed)
}
50 谷歌搜索 3.0
使用复制的搜索服务器减少尾部延迟。
c := make(chan Result)
go func() { c <- First(query, Web1, Web2) } ()
go func() { c <- First(query, Image1, Image2) } ()
go func() { c <- First(query, Video1, Video2) } ()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return
51 还是…
没有锁。没有条件变量。没有回调。
52 总结
在几个简单的变换示例中,我们使用Go的并发原语来转换
程序为一个
程序。
53 更多派对技巧
使用这些工具有无穷无尽的方法,许多在别处都有介绍。
Chatroulette toy:
Load balancer:
Concurrent prime sieve:
Concurrent power series (by McIlroy):
54 不要过用
玩起来很有趣,但不要过度使用这些想法。
Goroutines和通道是大创意。它们是程序构建的工具。
但有时你只需要一个引用计数器。
Go有”sync”和”sync/atomic”包,它们提供互斥锁、条件变量等。它们为较小的问题提供了工具。
通常,这些东西会一起解决更大的问题。
始终为工作使用正确的工具。
55 结论
Goroutines和channels使得表达复杂的操作变得容易
而且它们使用起来很有趣。
56 链接
Go主页:
Go Tour (在浏览器里学习Go):
包(Package)文档:
Articles galore:
并发不是并行:
golang.org/s/concurrency-is-not-parallelism
57 致谢
Rob Pike
@rob_pike
本文翻译自《Go Concurrency Patterns: Pipelines and cancellation》。
Sameer Ajmani
13 March 2014
介绍
Go的并发原语使构建数据流管道变得容易,从而有效地利用I/O和多个CPU。本文介绍了此类管道的示例,重点介绍了操作失败时出现的细微差别,并介绍了干净地处理故障的技术。
什么是管道?
Go中没有关于管道的正式定义。它只是多种并发程序中的一种。通俗地说,管道(pipeline)是由通道(channel)连接的一系列阶段(stage),其中每个阶段是一组运行相同功能的goroutine。在每个阶段,goroutines
每个阶段都有任意数量的入口和出口通道,除了第一个阶段和最后一个阶段,它们分别只有出口或入口通道。第一阶段有时称为源头或生产者;最后一个阶段,称为水池(sink)或消费者。(译者注:一个阶段在代码实现上就是一个函数,入口通道或出口通道作为该函数的参数或返回值,在函数体内启动一个或多个goroutines来处理这一阶段的业务逻辑。见下文的各个例子。)
我们将从一个简单的示例管道开始来解释这些想法和技术。稍后,我们将展示一个更现实的例子。
平方数
考虑一个具有三个阶段的管道。
第一阶段gen
是一个函数,将一个整数列表转换为一个通道,发出列表中的整数。gen
函数启动一个goroutine,在通道上发出整数,并在发送完所有值后关闭该通道:
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
第二阶段sq
从通道接收整数并返回一个通道,该通道发出每个接收到的整数的平方值。在入口通道关闭并且此阶段已将所有值发送到下游后,它会关闭出口通道:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
main
函数设置管道并运行最后一个阶段:它从第二阶段接收值并打印输出每个值,直到通道关闭:
func main() {
// 安装这个管道。
c := gen(2, 3)
out := sq(c)
// 消费输出的值。
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}
由于sq
的入口通道和出口通道具有相同的类型,因此我们可以任意多次组合它。我们还可以将main
重写为范围循环(译者注:即使用for-range语句),就像其他阶段一样:
func main() {
// 安装管道并消费输出的值。
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n) // 16 then 81
}
}
扇出,扇入
多个函数可以从同一个通道读取,直到该通道关闭;这称为扇出(fan-out)。这提供了一种在一组工作者(workers)之间分配工作以并行化使用CPU和I/O的方法。
一个函数可以从多个输入中读取并继续执行,直到所有输入都关闭,方法是将输入通道多路复用到一个通道上,当所有输入都关闭时,该通道才关闭。这称为扇入(fan-in)。
我们可以改变我们的管道来运行sq
的两个实例,每个实例都从同一个输入通道读取。我们引入了一个新函数,merge
,以扇入结果:
func main() {
in := gen(2, 3)
// 将sq工作分配给两个都从in通道读取的goroutine。
c1 := sq(in)
c2 := sq(in)
// 消费来自c1和c2的合并输出。
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
merge
函数通过为每个入口通道启动一个goroutine,将值复制到唯一的出口通道,将通道列表转换为单个通道。一旦所有的输出goroutine都启动了,merge
会再启动一个goroutine以在该通道上的所有发送完成后关闭出口通道。
往关闭的通道发送值将引发panic,因此在调用close之前确保所有发送都完成很重要。sync.WaitGroup
类型提供了一种安排此同步的简单方法:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为每个cs中的输入通道启动一个输出goroutine。output将值从c复制到out,直到c关闭,然后调用wg.Done。
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// 一旦所有输出goroutines都完成任务,就启动一个goroutine来关闭out通道。这必须在调用wg.Add之后开始。
go func() {
wg.Wait()
close(out)
}()
return out
}
止损
我们的管道函数有一个模式:
这种模式允许将每个接收阶段编写为range
循环,并确保一旦所有值都成功发送到下游,所有goroutines都会退出。
但在管道的实际使用中,并不总是接收所有入口值。有时这是设计使然:接收者可能只需要值的一个子集来取得进展。更常见的情况是,一个阶段提前退出,因为某个入口值表示前面阶段出现错误。在任何一种情况下,接收者都不应该等待剩余的值到达,我们希望前面阶段停止生产后面阶段不需要的值。
在我们的示例管道中,如果一个阶段未能消耗所有入口值,则尝试发送这些值的goroutine将无限期地阻塞:
// 消耗输出中的第一个值。
out := merge(c1, c2)
fmt.Println(<-out) // 4 or 9
return
// 由于我们没有从out接收第二个值,因此其中一个输出值的goroutine被挂起,试图发送值。
}
这是资源泄漏:goroutine消耗内存和运行时资源,goroutine堆栈中的堆引用防止数据被垃圾回收。Goroutines不会被垃圾回收;他们必须自行退出。
即使下游阶段无法接收所有入口值,我们也需要安排管道的上游阶段退出。一种方法是将出口通道更改为具有缓冲区的通道。一个缓冲区可以保存固定数量的值;如果缓冲区中有空间,则发送操作立即完成:
c := make(chan int, 2) // 缓冲区大小是2
c <- 1 // 立即完成发生
c <- 2 // 立即完成发生
c <- 3 // 阻塞直到另一个goroutine执行<-c并且接收到1
当在创建通道时知道要发送的值的数量时,使用缓冲区可以简化代码。例如,我们可以重写gen
以将整数列表复制到缓冲区通道中,并避免创建新的goroutine:
func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
回到我们管道中被阻塞的goroutine,我们可以考虑在merge
返回的出口通道中添加一个缓冲区:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1) // 为未读输入提供足够的空间
// ...剩余代码不改变...
虽然这修复了该程序中被阻塞的goroutine,但这是错误的代码。这里设置缓冲区大小为1,因为我们知道merge
将接收的值的数量以及下游阶段将消耗的值的数量。这很脆弱:如果我们向gen
传递一个额外的值,或者如果下游阶段读取的值更少,我们将再次阻塞goroutine。
相反,我们需要为下游阶段提供一种方法来向发送者表明他们将停止接受输入。
显式取消
当main
决定不从out
接收所有值就退出时,它必须告诉上游阶段的goroutine放弃它们试图发送的值。它通过在一个名为done
的通道上发送值来实现。它发送两个值,因为可能有两个被阻塞的发送者:
func main() {
in := gen(2, 3)
// 将sq工作分配给两个都从in读取的goroutine。
c1 := sq(in)
c2 := sq(in)
// 从输出消费第一个值。
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// 告诉剩余的发送者我们准备离开了。
done <- struct{}{}
done <- struct{}{}
}
发送值的goroutine用一个select
语句替换它们的发送操作,该语句在值发送给out
时或者当它们从done
接收到值时继续运行。done
的值的类型是空结构体,因为值无关紧要:它只是一个接收事件,用来指示应该放弃给out
继续发送值。输出值的goroutine继续在其入口通道c上循环,因此上游阶段不会被阻塞。(我们稍后会讨论如何让这个循环提前返回。)
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为每个在cs中的输入通道启动一个output goroutine。output将值从c复制到out,直到c关闭或从done接收到值,然后output调用wg.Done。
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}
// ...剩余代码不改变...
这种方法有一个问题:每个下游接收者都需要知道可能被阻塞的上游发送者的数量,并安排在提前返回时向这些发送者发出信号。跟踪这些数量既繁琐又容易出错。
我们需要一种方法来告诉未知且没有限定数量的goroutine停止向下游发送它们的值。在Go中,我们可以通过关闭通道来做到这一点,因为关闭通道上的接收操作总是可以立即进行,产生元素类型的零值。
这意味着main
可以通过关闭done
通道来解除所有发送者的阻塞。这种关闭实际上是对所有发送者的一个广播信号。我们扩展了我们的每个管道函数以接受done
作为参数,并通过defer
语句安排关闭的发生,因此来自main
的所有返回路径都会发出让管道阶段退出的信号。
func main() {
// 设置一个由整个管道共享的done通道,并在该管道退出时关闭该通道,作为我们开启的所有goroutines的退出信号。
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// 将sq工作分配给两个都从in读取的goroutine。
c1 := sq(done, in)
c2 := sq(done, in)
// 从output消耗第一个值。
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// done将被推迟的close调用关闭。
}
现在,一旦done
关闭,我们的管道的每个阶段都可以自由返回。merge
中的output
例程可以在其入口通道没有排完水(不消费完入口通道的所有值)的情况下返回,因为它知道上游发送方sq
将在done
关闭时停止发送。output
确保通过defer
语句在所有返回路径上调用wg.Done
:
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为每个cs中的输入通道启动一个output goroutine。output将值从c复制到out,直到c或done关闭,然后调用wg.Done。
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
// ...剩余代码不改变...
同样,sq
可以在done
关闭后立即返回。sq
通过defer
语句确保其输出通道在所有返回路径上都关闭:
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
(译者注:除了源头和水池,每个阶段都有一个或多个入口通道和一个或多个出口通道,用来接收或发送业务数据和错误信息,此外也可以有控制通道,用来接收或发送对通道的控制信息,这里的done
就是控制通道。)
以下是管道建设的指导方针:
管道通过确保所有发送的值有足够的缓冲区或者在接收者可能废弃通道时显式地向发送者发出信号来解除对发送者的阻塞。(译者注:本文通过done
通道来控制阶段是否取消运行,是比较原始的方式,更加先进的方式是使用Go 1.14版本加入的context
包提供的机制。)
计算一棵目录树里的所有文件的摘要值
让我们考虑一个更实际的管道。
MD5是一种消息摘要算法,可用作文件校验值。命令行实用程序md5sum打印文件列表的摘要值。
% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我们的示例程序与md5sum
类似,但将单个目录作为参数,并打印该目录下每个常规文件的摘要值,按路径名排序。
% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我们程序的主函数调用一个辅助函数MD5All
,它返回一个从路径名到摘要值的映射,然后对结果进行排序并打印:
func main() {
// 计算指定目录下所有文件的MD5总和,然后按路径名排序打印结果。
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}
MD5All
函数是我们讨论的重点。在serial.go中,不使用并发,并且在遍历树时简单地读取和汇总每个文件。
// MD5All读取以root为根目录的文件树中的所有文件,并返回从文件路径到文件内容的MD5值的映射。如果目录遍历失败或任何读取操作失败,MD5All将返回错误。
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
m[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return m, nil
}
并行计算摘要值
在parallel.go中,我们将MD5All
拆分为两个阶段的管道。第一阶段sumFiles
遍历目录树,在一个新的goroutine中计算每个文件的摘要值,并将结果发送到值类型为result
的通道上:
type result struct {
path string
sum [md5.Size]byte
err error
}
sumFiles
返回两个通道:一个用于result
,另一个用于filepath.Walk
返回的错误。walk
函数启动一个新的goroutine来处理每个常规文件,然后检查done
。如果done
已关闭,则立即停止遍历目录树:
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// 对于每个常规文件,启动一个对文件计算摘要值并将结果发送到c的 goroutine。在errc上发送walk的结果。
c := make(chan result)
errc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// 如果done关闭了就停止遍历。
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// Walk已返回,因此对wg.Add的所有调用都已完成。完成所有发送后,启动一个goroutine以关闭c。
go func() {
wg.Wait()
close(c)
}()
// 这里不需要select,因为errc是有缓冲区的通道。
errc <- err
}()
return c, errc
}
MD5All
从c. MD5All
接收摘要值。MD5All
在出错时提前返回,通过defer
关闭done
:
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All返回时关闭done通道;它可以在接收到来自c和errc的所有值之前这样做。
done := make(chan struct{})
defer close(done)
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
有上界的并行数
parallel.go中的MD5All
为每个文件启动一个新的goroutine。在包含许多大文件的目录中,这可能会分配比机器上可用的内存更多的内存。
我们可以通过限制并行读取的文件数量来限制内存分配。在bounded.go中,我们通过创建固定数量的goroutine来读取文件来做到这一点。我们的管道现在具有三个阶段:遍历目录树、读取和计算文件的摘要值以及收集摘要值。
第一阶段walkFiles
发送目录树中常规文件的路径:
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() {
// Walk返回时关闭paths通道。
defer close(paths)
// 这里的发送无需select,因为errc是有缓冲区的通道。
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
})
}()
return paths, errc
}
中间阶段启动固定数量的digester
goroutine,它们从paths
接收文件名并在通道c
上发送结果:
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
与我们之前的示例不同,digester
不会关闭其输出通道,因为多个goroutine在共享通道上发送。相反,MD5All
中的代码安排在所有digester
完成后关闭通道:
// 启动固定数量的goroutine来读取和计算文件的摘要值。
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
我们可以让每个digester
创建并返回它自己的输出通道,但是我们需要额外的goroutine来扇入结果。
最后一个阶段接收来自c
的所有results
,然后检查来自errc
的错误。此检查不能更早发生,因为在此之前,walkFiles
可能会阻止向下游发送值:
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// 检查Walk是否失败。
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
总结
本文介绍了在Go中构建流数据管道的技术。处理此类管道中的故障很棘手,因为管道中的每个阶段都可能被阻塞而不能向下游发送值,并且下游阶段可能不再关心传入的数据值。我们展示了关闭通道如何向管道启动的所有goroutine广播“完成”信号,并定义了正确构建管道的准则。
进一步阅读:
select
。我的操作系统是Winodws 10,在使用Navicat Premium 16的模型或图表工作区时,弹出一个错误对话框说Access violation at address 00000000. Read of address 00000000。
出现这个问题的原因,应该是Winodws 10操作系统里安装的C++运行库与Navicat Premium 16所要求的版本不一致。Navicat Premium 16在Winodws 7操作系统里并不会出现这个问题。
因此这个问题的解决方法可以是,先在Winodws 10操作系统里安装VirtualBox,再安装Winodws 7虚拟机,然后在该虚拟机里安装和使用Navicat Premium 16。
打个比喻,用微信和下属沟通,有点像皇帝和奴才太监的关系,随叫随到、不分场合、立马回应;用邮件沟通,是皇帝和大臣的关系,更正式、更有仪式感、且对上位者的限制更多。
真上过班的人,才会发现工作用邮件,比用微信幸福多了。邮件办公最大的好处,是不会期待对方“秒回”,所以收件方有足够的时间去整理信息,有更多的弹性时间,即便周末收到也可以等到周一上班再回。而且因为邮件“非即时性”的特点,发起邮件对话的一方(通常是老板、或者甲方)会对自己编写的信息更认真,会想清楚自己到底要布置什么任务才会发送, 不会像微信这种“即时性”软件一样,很多老板是突然兴起了,就给员工或者给工作群里丢一个信息,甚至连文字都没有就一段语音过来了,逻辑也不通顺、信息也不明了,搞的接收方一头雾水的,有时候还要猜,要是群里都不理还好,只要有一个卷的人兴致勃勃的回复,那大家都要跟着卷起来。
不知道有没有人体验过,一个大老板在群里转发一篇莫名其妙的文章后,一堆中层小领导排队写小作文感想的奇葩场景。这就是“移动办公无处不在”给劳动者带来的困境。
所以,客观上邮件是不方便的,但就是这种不方便反而利于劳动者,这是对劳动者私人空间的尊重,是对上/下班界限的明晰。
来源知乎用户@李小粥
由于PowerBuilder 12.5是一个32位应用程序,因此SQL Anywhere也需要安装32位版本。如果二者位数不一致,那么PowerBuilder将无法创建SQL Anywhere数据库,报错“ODBC驱动管理器 连接未打开”,并且PowerBuilder通过SQL Anywhere ODBC连接到数据源时,会出现错误:SQLSTATE = IM003由于系统错误193,无法加载指定的驱动程序:(SQL Anywhere 12,C:\Programs\SQL Anywhere 12\Bin64\dbodbc12.dll)。
参考
RFC文档说根域名不支持CNAME映射到其他域名,否则和MX记录有冲突。即配置为CNAME zuotijia.me www.zuotijia.me,Cloudflare DNS CNAME拉平操作会把根域名zuotijia.me直接解析到IP地址,因此默认情况下,在浏览器输入zuotijia.me访问不到www.zuotijia.me。
要在浏览器输入根域名zuotijia.me访问到https://www.zuotijia.me,可以使用Nginx重定向。新建一个Nginx配置块/etc/nginx/conf.d/zuotijia.me.conf,输入以下内容:
server {
listen 80;
listen [::]:80;
server_name zuotijia.me;
return 301 https://www.zuotijia.me$request_uri;
}
server {
listen 443;
listen [::]:443;
server_name zuotijia.me;
return 301 https://www.zuotijia.me$request_uri;
}
重启Nginx使配置生效:
$ sudo systemctl restart nginx
然后以下网址都能重定向到https://www.zuotijia.me了:
https://zuotijia.me
http://zuotijia.me
即使在URL后面加了参数也能连带参数重定向。
VirtualBox启动CentOS 7虚拟系统时,遇到Kernel Panic – not syncing: VFS: Unable to mount root fs on unknown-block(0,0)问题,无法启动系统。
引起该问题的原因是缺少该内核的初始化文件。
解决方法是从启动界面的GRUB菜单中选择另一个内核来启动系统。进入系统后运行sudo update-initramfs -u -k version为version生成初始化文件(将version替换为内核版本字符串,例如4.15.0-36-generic),然后运行sudo update-grub更新GRUB。
参考
https://askubuntu.com/questions/41930/kernel-panic-not-syncing-vfs-unable-to-mount-root-fs-on-unknown-block0-0
关于时间管理,大家都听说过,先做紧急的事情,再做重要的事情,不紧急也不重要的事情可做可不做。紧急的事情大多是突发情况,不得不做。重要的事情往往有很多件,有些很难,我们不愿意甚至抵触去做;有些很简单,让人感到轻松愉快。人们普遍选择先做简单的事情,再做困难事情,结果越做越没有动力,产生了挫败感,甚至被惰性缠绕,只想躺平不动。
以心理学操作性反射的原则为基础,对于人类的行为方式进行观察后,心理学家提出一种改进方式,以纠正惰性生活方式,并由这种惰性生活方式的结束而带来整个人生的良性改变。这种改进方式叫做普瑞马法则。具体做法如下:
1 行为记录
先可以用一天到两天时间给自己做一个行为记录,把你通常每天要做的事情记下来,包括记录你所有的生活活动。这样,即使粗粗地记,大约也会有几十件。
2 按兴趣排列
然后把其中一些吃饭穿衣等必须完成的事情剔除。此后,你把剩余下来的几十件事情按照你的兴趣排列,把你最不喜欢做的事情放在第一位,把你最喜欢做的事情放在最后一位。
3 开始行动
每天一早起来,从你最不喜欢的事情开始做起,并且坚持做完第一件事情,再做第二件事情……一直做到最后一件你喜欢的事情。
4 坚持进行下去
在整个过程中,你开始会稍觉得困难,但你只要花很少的力气稍稍坚持,你就能顺利进行下去。千万不要在中途跳过那些你不喜欢做的事情。
5 强化作用、正反馈
这种方式是一种强化作用的方式———先处理困难的事情,再处理稍困难的事情,是一种对于前面行动的强化,然后继续,强化的效果会越来越大,一直大到你觉得你有力量来完成任何事情!
当你做到最后最喜欢做的事情时,你会觉得很放松很愉悦,这个时候你也没有烦人的事需要继续面对了,于是你的心情必然是很好的,你在这一天的结束将拥有一个很好的睡眠,第二天早起后一定会精力充沛。
你遵循这个法则,并且多一些坚持,你将发现,生活和工作将变得多么轻松有趣啊!