Gopl 第八章 goroutine和通道

说明

本文为GOPL第八章学习笔记

最简单的goroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//建立一个主goroutine(main)和一个副goroutine(spinner)
//两者并发运行,到主goroutine结束时副goroutine自动结束
func main() {
go spinner(100 * time.Millisecond)
const n = 100
fibN := fib(n) // slow
fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
}

func spinner(delay time.Duration) {
for {
for _, r := range `0123456789` {
fmt.Printf("\r%c", r)
time.Sleep(delay)
}
}
}

func fib(x int) int {
if x < 2 {
return x
}
return fib(x-1) + fib(x-2)
}

并发时钟服务器

时钟服务器为每个连接提供一个goroutine

服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func main(){
//监听
listener, err := net.Listen("tcp","localhost:8008")
if err!=nil{
log.Fatal(err)
}
for{
//接收连接
conn ,err := listener.Accept()
if err != nil{
log.Print(err)
continue
}
//并发处理多个客户端连接
go handleConn(conn)
}
}
//循环往conn写入本机时间
func handleConn(c net.Conn){
defer c.Close()
for {
_, err := io.WriteString(c,time.Now().Format("15:04:05\n"))
if err != nil{
return
}
time.Sleep(1*time.Second)
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

func main(){
//建立连接
conn, err := net.Dial("tcp","localhost:8008")
if err != nil{
log.Fatal(err)
}
//延迟关闭连接
defer conn.Close()
//处理连接,将连接内容写入到标准输出
mustCopy(os.Stdout,conn)
}
//输入内容写入到输出
func mustCopy(w io.Writer,r io.Reader){
_, err:= io.Copy(w,r)
if err!= nil {
log.Fatal(err)
}
}

并发回声服务器

并发回声服务器中对每个连接都采用多个goroutine处理。服务器并发处理每个客户端的连接且对每个连接的处理本身也是并发的

服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func main(){
listener, err := net.Listen("tcp","localhost:8008")
if err!=nil{
log.Fatal(err)
}
for{
conn ,err := listener.Accept()
if err != nil{
log.Print(err)
continue
}
go handleConn1(conn)
}
}
//循环往conn写入本机时间
func handleConn1( c net.Conn){
input := bufio.NewScanner(c)
for input.Scan(){
//每个连接都采用多个goroutine进行处理
go echo(c,input.Text(),1*time.Second)
}
c.Close()
}
func echo(c net.Conn,source string,delay time.Duration){
fmt.Fprintln(c,"\t",strings.ToUpper(source))
time.Sleep(delay)
fmt.Fprintln(c,"\t",source)
time.Sleep(delay)
fmt.Fprintln(c,"\t",strings.ToLower(source))
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func main(){
//建立连接
conn, err := net.Dial("tcp","localhost:8008")
if err != nil{
log.Fatal(err)
}
//延迟关闭连接
defer conn.Close()
//处理连接,将连接内容写入到标准输出
//并发处理连接
go mustCopy(os.Stdout,conn)
//将输入内容写入conn
mustCopy(conn,os.Stdin)
}
//输入内容写入到输出
func mustCopy(w io.Writer,r io.Reader){
_, err:= io.Copy(w,r)
if err!= nil {
log.Fatal(err)
}
}

无缓冲通道

对于上述并发回声服务器的客户端而言,若客户端接收到结束符(Ctrl+D)则mustCopy(conn,os.Stdin)运行结束,程序结束。不管后台goroutine是否还在运行。加入无缓冲通道进行同步。
客户端

初始版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func main(){
//建立连接
conn, err := net.Dial("tcp","localhost:8008")
if err != nil{
log.Fatal(err)
}
//同步通道
done := make(chan struct{})
go func(){
io.Copy(os.Stdout,conn)
log.Println("done")
done <- struct{}{}
}()
//将输入内容写入conn
mustCopy(conn,os.Stdin)
conn.Close()
<- done
}
//输入内容写入到输出
func mustCopy(w io.Writer,r io.Reader){
_, err:= io.Copy(w,r)
if err!= nil {
log.Fatal(err)
}
}

流程:若客户端输入(Ctrl+D):mustCopy(conn,os.Stdin)结束->服务端看到EOF,关闭连接—>io.Copy结束,输出“done”,结束运行

改进版本

读写分离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//输入内容写入到输出
func mustCopy(w io.Writer,r io.Reader){
_, err:= io.Copy(w,r)
if err!= nil {
log.Fatal(err)
}
}
func main(){
//解析tcp地址
tcpAddr, err := net.ResolveTCPAddr("","localhost:8008")
if err != nil{
log.Fatal(err)
}
conn , err := net.DialTCP("tcp",nil,tcpAddr)
if err != nil {
log.Fatal(err)
}
//写同步通道
writeDone := make(chan struct{})
//读同步通道
readDone := make(chan struct{})
go func(){
//从conn中读入内容到输出
io.Copy(os.Stdout,conn)
log.Println("readDone")
//已经读了
readDone <- struct{}{}
}()
//从输入写入到conn
mustCopy(conn,os.Stdin)
//已经写了
writeDone <- struct{}{}
//先关闭写通道,服务端不会再接收内容了
conn.CloseWrite()
<- writeDone
//再关闭读通道,已经读完服务端内容了
conn.CloseRead()
<- readDone
}

并发web爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
func crawl(url string) []string {
fmt.Println(url)
list, err := links.Extract(url)
if err != nil {
log.Print(err)
}
return list
}
//限制Extract的并发数为20
var tokens= make(chan struct{},20)
func crawl2(url string) []string {
fmt.Println(url)
tokens <- struct{}{}
list, err := links.Extract(url)
<- tokens
if err != nil {
log.Print(err)
}
return list
}
//若main函数使用crawl()函数由以下缺点
//缺点1:没有做并行度限制。并行度太高,同时创建了太多网络连接,超过程序能打开的文件数限制
//导致DNS查询和net.Dial的连接失败

//缺点2:程序不会结束
func main0(){
//通道用于存储待爬取URL列表
worklist := make(chan []string)
//从命令行读取参数
go func() {
//worklist <- os.Args[1:]
worklist <- []string{"http://baidu.com"}
}()
//并发爬取Web
seen := make(map[string]bool)
for list := range worklist{
for _, link := range list {
if !seen[link] {
seen[link] = true
go func(link string) {
worklist <- crawl2(link)
}(link)
}
}
}
}

//当任务列表为空且爬取goroutine结束时,从主循环退出
//引入计数器n ,每启动一个新的goroutine,n+1,;结束该goroutine后,n-1
func main(){
//通道用于存储待爬取URL列表
worklist := make(chan []string)
//从命令行读取参数
var n int
n++
go func() {
//worklist <- os.Args[1:]
worklist <- []string{"http://hao123.com"}
}()
//并发爬取Web
seen := make(map[string]bool)
for ; n>0 ;n--{
list := <- worklist
for _, link := range list {
if !seen[link] {
seen[link] = true
n++
go func(link string) {
worklist <- crawl2(link)
}(link)
}
}
}
}

select 多路复用

select 类似switch,case语句指定一次通信,若此次通信发生则该情况下对应语句,其他通信将不会发生

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func main() {

abort := make(chan struct{})
go func() {
//等待从输入读取一个字节
os.Stdin.Read(make([]byte, 1))
abort <- struct{}{}
}()

fmt.Println("Commencing countdown. Press return to abort.")
//tick 每过一秒,进行发送
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
select {
//若tick完成发送了,则接收,完成本次循环
case <-tick:
// Do nothing.
//若abort 可接收了,则接收,完成本次循环
case <-abort:
fmt.Println("Launch aborted!")
return
}
}
launch()
}

func launch() {
fmt.Println("Lift off!")


并发目录遍历

不限制并发数的目录遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package main

import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
)

func main(){
//命令行解析
flag.Parse()
//roots 为[]string, 内容为命令行参数
roots := flag.Args()
//如果命令行参数个数为0,则roots为当前目录
if len(roots) == 0{
roots = []string{"."}
}

fileSizes := make(chan int64)
go func() {
for _,root := range roots{
walkDir(root, fileSizes)
}

close(fileSizes)
}()

var nfiles, nbytes int64
//for循环操作在fileSizes不为空时: size <- fileSizes
//通道关闭时,for循环退出
for size := range fileSizes{
nfiles++
nbytes += size
}
printDiskUsage(nfiles,nbytes)
}

//共有:nfiles个文件,大小为:nbytesKB
func printDiskUsage(nfiles, nbytes int64){
fmt.Printf("%d files %.1f KB\n",nfiles,float64(nbytes)/1e3)
}

func walkDir(dir string, fileSizes chan<- int64) {
//遍历dir下所有目录
for _, entry := range dirents(dir) {
//如果子目录是目录,遍历子目录
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, fileSizes)
} else {
//若子目录是文件,发送文件大小到通道
fileSizes <- entry.Size()
}
}
}

/*
//os.FileInfo 接口的结构
type FileInfo interface {
Name() string // base name of the file
Size() int64 // length in bytes for regular files; system-dependent for others
Mode() FileMode // file mode bits
ModTime() time.Time // modification time
IsDir() bool // abbreviation for Mode().IsDir()
Sys() interface{} // underlying data source (can return nil)
}
*/

//返回一个根据文件名排序的目录列表
func dirents(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
return nil
}
return entries
}

限制并发数的目录遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package main
//使用计数型号量来避免同时打开太多文件
import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"
)

var vFlag = flag.Bool("v", false, "show verbose progress messages")

//!+
func main() {
//解析命令行
flag.Parse()

//确定初始目录
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}

//!+
// 平行遍历每个文件树
fileSizes := make(chan int64)
var n sync.WaitGroup
for _, root := range roots {
//goroutine加一
n.Add(1)
go walkDir(root, &n, fileSizes)
}
go func() {
n.Wait()
close(fileSizes)
}()
//!-

// 周期打印结果
var tick <-chan time.Time
if *vFlag {
tick = time.Tick(500 * time.Millisecond)
}
var nfiles, nbytes int64

loop://标签,用于goto语句。标签化的break语句将跳出for和select循环逻辑
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop // fileSizes 已经关闭,退出循环
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}

printDiskUsage(nfiles, nbytes) // 最终结果
// ...select loop...
}

//!-

func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}

// walkDir 递归遍历目录
// 并将每个文件大小发送给fileSizes通道.
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
for _, entry := range dirents(dir) {
if entry.IsDir() {
//并发数加一
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}

//!+sema
// sema是限制 dirents函数并发数的计数信号量
var sema = make(chan struct{}, 20)

// dirents返回子目录列表
func dirents(dir string) []os.FileInfo {
sema <- struct{}{} // acquire token
defer func() { <-sema }() // release token
//!-sema

entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
return entries
}

增加取消功能的目录遍历

关于通过的关闭

关闭通道后,无法进行发送操作,但可以进行接收操作。若原通道为空,关闭后通道内将包含一个默认值。其值为通道类型的默认值(对int类型为0,对string类型为空字符)

1
2
3
4
5
6
7
func main(){
done := make(chan int )
//<-done //报错
close(done)
fmt.Println(<-done)//输出0
fmt.Println(<-done)//输出0
}

取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package main

//在原来基础上增加取消功能
import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
)


//创建取消通道
var done = make(chan struct{})

func cancelled() bool {
select {
case <-done://当通道关闭后将执行该语句
return true
default:
return false
}
}

func main() {
// Determine the initial directories.
roots := os.Args[1:]
if len(roots) == 0 {
roots = []string{"."}
}

// 监控输入,若有输入则关闭通道
go func() {
os.Stdin.Read(make([]byte, 1))
close(done)
}()

// Traverse each root of the file tree in parallel.
fileSizes := make(chan int64)
var n sync.WaitGroup
for _, root := range roots {
n.Add(1)
go walkDir(root, &n, fileSizes)
}
go func() {
n.Wait()
close(fileSizes)
}()

// Print the results periodically.
tick := time.Tick(500 * time.Millisecond)
var nfiles, nbytes int64
loop:
for {
select {
case <-done:
// 耗尽所有fileSizes通道后再退出主goroutine
for range fileSizes {
// Do nothing.
}
return
case size, ok := <-fileSizes:
if !ok {
break loop // fileSizes was closed
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
printDiskUsage(nfiles, nbytes) // final totals
}

func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}


func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
if cancelled() {
return
}
for _, entry := range dirents(dir) {

if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}

}
}

var sema = make(chan struct{}, 20)

func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}: // acquire token
case <-done:
return nil // cancelled
}
defer func() { <-sema }() // release token

f, err := os.Open(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
defer f.Close()

entries, err := f.Readdir(0) // 0 => no limit; read all entries
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
// Don't return: Readdir may return partial results.
}
return entries
}

聊天服务器

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package main

import (
"bufio"
"fmt"
"log"
"net"
)

//!+broadcaster
type client chan<- string // 对外发送消息的通道

var (
entering = make(chan client) //客户端接入
leaving = make(chan client) // 客户端退出
messages = make(chan string) // 接收到的客户端消息
)

func broadcaster() {
clients := make(map[client]bool) // 存储所有接入的客户端
for {
select {
case msg := <-messages:
// 广播messages的消息到所有客户端
// clients' outgoing message channels.
for cli := range clients {
cli <- msg
}

case cli := <-entering:
clients[cli] = true

case cli := <-leaving:
delete(clients, cli)
close(cli)
}
}
}

//!-broadcaster

//!+handleConn
func handleConn(conn net.Conn) {
ch := make(chan string) // outgoing client messages
//将通道ch的内容写入conn
go clientWriter(conn, ch)

//本机地址
who := conn.RemoteAddr().String()
ch <- "You are " + who
messages <- who + " has arrived"
entering <- ch

input := bufio.NewScanner(conn)
for input.Scan() {
messages <- who + ": " + input.Text()
}
// NOTE: ignoring potential errors from input.Err()

leaving <- ch
messages <- who + " has left"
conn.Close()
}

func clientWriter(conn net.Conn, ch <-chan string) {
for msg := range ch {
fmt.Fprintln(conn, msg) // NOTE: ignoring network errors
}
}

//!-handleConn

//!+main
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}

go broadcaster()
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
go handleConn(conn)
}
}

//!-main

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"io"
"log"
"net"
"os"
)

//!+
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // NOTE: ignoring errors
log.Println("done")
done <- struct{}{} // signal the main goroutine
}()
mustCopy(conn, os.Stdin)
conn.Close()
<-done // wait for background goroutine to finish
}

func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}