Go 入门 (五) -- 协程(goroutine)

1. Go 程

Go 程(goroutine)是由 Go 运行时管理的轻量级线程。

1
go f(x, y, z)

会启动一个新的 Go 程并执行

1
f(x, y, z)

f, x, y 和 z 的求值发生在当前的 Go 程中,而 f 的执行发生在新的 Go 程中。

Go 程在相同的地址空间中运行,因此在访问共享的内存时必须进行同步。sync 包提供了这种能力,不过在 Go 中并不经常用到,因为还有其它的办法(见下一页)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"fmt"
"time"
)

func say(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}

func main() {
go say("world")
say("hello ")
}

输出结果:
hello
world
hello
world
hello
world
hello
world
hello

2. 信道

2.1. 创建一个信道

信道是带有类型的管道,你可以通过它用信道操作符 <- 来发送或者接收值。

1
2
ch <- v    // 将 v 发送至信道 ch。
v := <-ch // 从 ch 接收值并赋予 v。

(“箭头”就是数据流的方向。)

和映射与切片一样,信道在使用前必须创建:

1
ch := make(chan int)

默认情况下,发送和接收操作在另一端准备好之前都会阻塞。这使得 Go 程可以在没有显式的锁或竞态变量的情况下进行同步。

以下示例对切片中的数进行求和,将任务分配给两个 Go 程。一旦两个 Go 程完成了它们的计算,它就能算出最终的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import "fmt"

func sum(data []int, c chan int) {
sum := 0
for _, value := range data {
sum += value
}
c <- sum // 将和送入 c
}

func main() {
data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9}

c := make(chan int) // 定义一个信道,存放的是 int 类型

go sum(data[:len(data)/2], c)
go sum(data[len(data)/2:], c)

x, y := <-c, <-c

fmt.Println("sum of data is ", x+y)
}

输出结果:
sum of data is 45

2.2. 带缓冲的信道

信道可以是 带缓冲的。将缓冲长度作为第二个参数提供给 make 来初始化一个带缓冲的信道:

1
ch := make(chan int, 100)

仅当信道的缓冲区填满后,向其发送数据时才会阻塞。当缓冲区为空时,接受方会阻塞。
修改示例填满缓冲区,然后看看会发生什么。

1
2
3
4
5
6
7
8
9
10
11
12
package main

import "fmt"

func main() {
c := make(chan int,1) // 修改为 1 和大于等于 2 两种情况。
c<- 1
c<- 2
fmt.Println("sum of data is ", <-c)
fmt.Println("sum of data is ", <-c)
}
输出结果:

缓冲区为1时:

1
2
3
4
5
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
/Users/MoMo/go/src/momo-learn/main.go:9 +0x81

缓冲区为 大于等于 2 时:

1
2
sum of data is  1
sum of data is 2

2.3. range 和 close

发送者可通过 close 关闭一个信道来表示没有需要发送的值了。接收者可以通过为接收表达式分配第二个参数来测试信道是否被关闭:若没有值可以接收且信道已被关闭,那么在执行完

1
v, ok := <-ch

之后 ok 会被设置为 false。

循环 for i := range c 会不断从信道接收值,直到它被关闭。

注意: 只有发送者才能关闭信道,而接收者不能。向一个已经关闭的信道发送数据会引发程序恐慌(panic)。

还要注意: 信道与文件不同,通常情况下无需关闭它们。只有在必须告诉接收者不再有需要发送的值时才有必要关闭,例如终止一个 range 循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import "fmt"

func fibonacci(n int, c chan int) {
x, y := 0, 1

for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
close(c)
}

func main() {
c := make(chan int,10)
go fibonacci(cap(c),c)

for value := range c {
fmt.Println(value)
}
}

输出结果:
0
1
1
2
3
5
8
13
21
34

3. select 语句

select 语句使一个 Go 程可以等待多个通信操作。

select 会阻塞到某个分支可以继续执行为止,这时就会执行该分支。当多个分支都准备好时会随机选择一个执行。

1
2
3
4
5
6
7
8
9
10
11
select {
case v1 := <-c1:
fmt.Printf("received %v from c1\n", v1)
case v2 := <-c2:
fmt.Printf("received %v from c2\n", v1)
case c3 <- 23:
fmt.Printf("sent %v to c3\n", 23)
default:
// 所有 case 都评估不通过时,执行
fmt.Printf("no one was ready to communicate\n")
}

上面这段代码中,select 语句有四个 case 子语句,前两个是 receive 操作,第三个是 send 操作,最后一个是默认操作。代码执行到 select 时,case 语句会按照源代码的顺序被评估,且只评估一次,评估的结果会出现下面这几种情况:

  1. 除 default 外,如果只有一个 case 语句评估通过,那么就执行这个 case 里的语句;
  2. 除 default 外,如果有多个 case 语句评估通过,那么通过伪随机的方式随机选一个;
  3. 如果 default 外的 case 语句都没有通过评估,那么执行 default 里的语句;
  4. 如果没有 default,那么代码块会被阻塞,直到有一个 case 通过评估;否则一直阻塞
  5. 如果 case 语句中 的 receive 操作的对象是 nil channel,那么也会阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"fmt"
"time"
)

func selectTest(c1 <-chan time.Time, c3 <-chan time.Time) {
for { // 每隔一秒执行一次 select
select {
case it := <-c1:
println("Received from c1", it.String())
case it3 := <-c3:
println("Received from c3", it3.String())
return
default:
println("Nothing received !!!!")
time.Sleep(50 * time.Millisecond)
}
}
}

func main() {
fmt.Println("start....")
tick := time.Tick(100 * time.Millisecond)
boom := time.After(200 * time.Millisecond)
selectTest(tick, boom)
println("finish")
}


输出结果:
start....
Nothing received !!!!
Nothing received !!!!
Received from c1 2019-08-08 17:23:56.60859 +0800 CST m=+0.101829665
Nothing received !!!!
Nothing received !!!!
Received from c3 2019-08-08 17:23:56.709775 +0800 CST m=+0.203014953
finish

4. 练习:等价二叉查找树

  1. 实现 Walk 函数。

  2. 测试 Walk 函数。

函数 tree.New(k) 用于构造一个随机结构的已排序二叉查找树,它保存了值 k, 2k, 3k, …, 10k。

创建一个新的信道 ch 并且对其进行步进:

1
go Walk(tree.New(1), ch)

然后从信道中读取并打印 10 个值。应当是数字 1, 2, 3, …, 10。

  1. 用 Walk 实现 Same 函数来检测 t1 和 t2 是否存储了相同的值。

  2. 测试 Same 函数。

Same(tree.New(1), tree.New(1)) 应当返回 true,而 Same(tree.New(1), tree.New(2)) 应当返回 false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

package main

import (
"fmt"
"golang.org/x/tour/tree"
)

// Walk 步进 tree t 将所有的值从 tree 发送到 channel ch。
func Walk(t *tree.Tree, ch chan int) {
walk(t,ch)
close(ch)
}

//中序遍历
func walk(t *tree.Tree, ch chan int){
if t != nil {
walk(t.Left, ch)
ch <- t.Value
walk(t.Right, ch)
}
}

// Same 检测树 t1 和 t2 是否含有相同的值。
func Same(t1, t2 *tree.Tree) bool {
tChan1, tChan2 := make(chan int), make(chan int)

go Walk(t1, tChan1)
go Walk(t2, tChan2)

for i1 := range tChan1 {
if i1 != <- tChan2 {
return false
}
}
return true;
}

func main() {
treeChan := make(chan int)
go Walk(tree.New(1), treeChan)

for i := 0; i < 10; i++ {
fmt.Println(<-treeChan)
}
fmt.Println()
//对tree1和tree2进行比较
fmt.Println("tree 1 == tree 1:", Same(tree.New(1), tree.New(1)))
fmt.Println("tree 1 == tree 2:", Same(tree.New(1), tree.New(2)))
}

输出结果:
1
2
3
4
5
6
7
8
9
10

tree 1 == tree 1: true
tree 1 == tree 2: false

5. sync.Mutex

我们已经看到信道非常适合在各个 Go 程间进行通信。

但是如果我们并不需要通信呢?比如说,若我们只是想保证每次只有一个 Go 程能够访问一个共享的变量,从而避免冲突?

这里涉及的概念叫做 互斥(mutualexclusion)* ,我们通常使用 互斥锁(Mutex) 这一数据结构来提供这种机制。

Go 标准库中提供了 sync.Mutex 互斥锁类型及其两个方法:

1
2
Lock
Unlock

我们可以通过在代码前调用 Lock 方法,在代码后调用 Unlock 方法来保证一段代码的互斥执行。参见 Inc 方法。

我们也可以用 defer 语句来保证互斥锁一定会被解锁。参见 Value 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"fmt"
"sync"
"time"
)

// SafeCounter 的并发使用是安全的。
type SafeCounter struct {
value map[string]int
mux sync.Mutex
}

// 传递指针是为了保证 lock 是同一个对象
// Inc 增加给定 key 的计数器的值。
func (counter *SafeCounter) Inc(key string){
counter.mux.Lock()
counter.value[key] ++
counter.mux.Unlock()
}

// Value 返回给定 key 的计数器的当前值。
func (counter *SafeCounter) Value(key string) int{
// Lock 之后同一时刻只有一个 goroutine 能访问 c.v
counter.mux.Lock()
defer counter.mux.Unlock()
return counter.value[key]
}

func main() {
counter := SafeCounter{value:make(map[string]int)}

for i:=0; i<1000 ;i++ {
go counter.Inc("somekey")
}
time.Sleep(time.Second);
fmt.Println(counter.Value("somekey"))
}

输出结果:
1000

6. 练习:Web 爬虫

在这个练习中,我们将会使用 Go 的并发特性来并行化一个 Web 爬虫。

修改 Crawl 函数来并行地抓取 URL,并且保证不重复。

提示:你可以用一个 map 来缓存已经获取的 URL,但是要注意 map 本身并不是并发安全的!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package main

import (
"fmt"
"sync"
)

type Fetcher interface {
// Fetch 返回 URL 的 body 内容,并且将在这个页面上找到的 URL 放到一个 slice 中。
Fetch(url string) (body string, urls []string, err error)
}

// 定义一个保存爬虫进度的类型
type FetchedUrlStruct struct {
// 已经爬过的 url
fetchedUrl map[string]bool
// 当前正在并发爬的协程数量
crawlingCount int
// 锁
lock sync.Mutex
}

// 爬虫进度的实例
var Fetched = FetchedUrlStruct{fetchedUrl: make(map[string]bool)}

// 修改当前正在并发爬的协程数量
func (Fetched *FetchedUrlStruct) modify(count int) {
Fetched.lock.Lock()
Fetched.crawlingCount += count
Fetched.lock.Unlock()
}

// 获取当前正在并发爬的协程数量
func (Fetched *FetchedUrlStruct) value() int {
Fetched.lock.Lock()
defer Fetched.lock.Unlock()
return Fetched.crawlingCount
}

// 判断一个 url 是否已经爬过了
func (Fetched *FetchedUrlStruct) isCrawled(url string) bool {
Fetched.lock.Lock()
defer Fetched.lock.Unlock()
_, exists := Fetched.fetchedUrl[url]
if exists {
return true
}
Fetched.fetchedUrl[url] = true
return false;
}

// Crawl 使用 fetcher 从某个 URL 开始递归的爬取页面,直到达到最大深度。
func Crawl(url string, depth int, fetcher Fetcher) {
//fmt.Printf("start to crawl from url : %s \n", url)

// 已经到了最大深度
if depth <= 0 {
Fetched.modify(-1)
return
}

// TODO: 不重复抓取页面。
if Fetched.isCrawled(url) {
Fetched.modify(-1)
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
Fetched.modify(-1)
return
}
fmt.Printf("found: %s %q\n", url, body)
for _, u := range urls {
// TODO: 并行的抓取 URL。
Fetched.modify(+1)
go Crawl(u, depth-1, fetcher)
}
Fetched.modify(-1)
return
}

func main() {
Fetched.modify(+1)
Crawl("https://golang.org/", 4, fetcher)

for {
if Fetched.value() == 0 {
break;
}
}
}

// fakeFetcher 是返回若干结果的 Fetcher。
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
body string
urls []string
}

func (f fakeFetcher) Fetch(url string) (string, []string, error) {
if res, ok := f[url]; ok {
return res.body, res.urls, nil
}
return "", nil, fmt.Errorf("not found: %s", url)
}

// fetcher 是填充后的 fakeFetcher。
var fetcher = fakeFetcher{
"https://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"https://golang.org/pkg/",
"https://golang.org/cmd/",
},
},
"https://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"https://golang.org/",
"https://golang.org/cmd/",
"https://golang.org/pkg/fmt/",
"https://golang.org/pkg/os/",
},
},
"https://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"https://golang.org/",
"https://golang.org/pkg/",
},
},
"https://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"https://golang.org/",
"https://golang.org/pkg/",
},
},
}

输出结果:
found: https://golang.org/ "The Go Programming Language"
found: https://golang.org/pkg/ "Packages"
not found: https://golang.org/cmd/
found: https://golang.org/pkg/os/ "Package os"
found: https://golang.org/pkg/fmt/ "Package fmt"
Just for my love !!