golang学习笔记二:concurrency

Go在我个人理解里最独特的东西,对并发支持有着重大的作用。

线程,协程和goroutine

  • 进程:拥有自己独立的堆和栈,既不共享堆,亦不共享栈,进程由操作系统调度。
  • 线程:拥有自己独立的栈和共享的堆,共享堆,不共享栈,线程亦由操作系统调度。
  • 协程 :和线程一样共享堆,不共享栈,协程由程序员在协程的代码里显示调度。

进程懒得说了,线程按照道理应该在JAVA说的,协程按照道理应该在swoole里面说的(又欠了点东西要学)

简单说一下就是,协程全程都是用户态的,操作系统对于协程没有任何的感知,它所有的调度都由用户完成。goroutine其实就是不需要用户把所有的协程调度都写下来,而是用Go语言在runtime,系统调用等多方面对协程调度进行了封装和处理。

goroutine

其实就是个effective go的学习笔记(中文翻译):

https://golang.org/doc/effective_go.html#concurrency

https://github.com/golang/go/wiki/LearnConcurrency

中心思想:

Do not communicate by sharing memory; instead, share memory by communicating.

CSP

Communicating Sequential Processes (CSP)

Channel

channel是 goroutine 之间通信的一种方式,可以类比成 Unix 中的进程的通信方式管道。

使用

channel 使用内置的 make 函数创建,下面声明了一个 chan int 类型的 channel:

1
ch := make(chan int)

golang 提供了内置的 close 函数对 channel 进行关闭操作。

1
2
3
ch := make(chan int)

close(ch)

有关 channel 的关闭,你需要注意以下事项(抄的):

  • 关闭一个未初始化(nil) 的 channel 会产生 panic
  • 重复关闭同一个 channel 会产生 panic
  • 向一个已关闭的 channel 中发送消息会产生 panic
  • 从已关闭的 channel 读取消息不会产生 panic,且能读出 channel 中还未被读取的消息,若消息均已读出,则会读到类型的零值。从一个已关闭的 channel 中读取消息永远不会阻塞,并且会返回一个为 false 的 ok-idiom,可以用它来判断 channel 是否关闭
  • 关闭 channel 会产生一个广播机制,所有向 channel 读取消息的 goroutine 都会收到消息

buffer

简单来说,无缓存的channel像多线程的synchronize,向空channel读取消息或者向有数据的channel里发送数据都会阻塞,有缓存的 channel,当缓存未满时,向 channel 中发送消息时不会阻塞,当缓存满时,发送操作将被阻塞,直到有其他 goroutine 从中读取消息;相应的,当 channel 中消息不为空时,读取消息不会出现阻塞,当 channel 为空时,读取操作会造成阻塞,直到有 goroutine 向 channel 中写入消息。

有缓存的channel有点像semaphore,像下面的代码,sem buffer的大小决定了同时有几个协程在执行handle。

1
2
3
4
5
6
7
8
9
10
11
12
13
var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
sem <- 1 // Wait for active queue to drain.
process(r) // May take a long time.
<-sem // Done; enable next request to run.
}
func Serve(queue chan *Request) {
for {
req := <-queue
go handle(req) // Don't wait for handle to finish.
}
}

当然还有另一种不需要用channel缓存去限制的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
func handle(queue chan *Request) {
for r := range queue {
process(r)
}
}

func Serve(clientRequests chan *Request, quit chan bool) {
// Start handlers
for i := 0; i < MaxOutstanding; i++ {
go handle(clientRequests)
}
<-quit // Wait to be told to exit.
}

可以应用的算法

channel嵌套

来自effective go,但其实没有太理解书上说的 safe, parallel demultiplexing 是什么意思。

简单解释一下,代码接上面的最后一段代码,当解决多个不同返回值的request问题的时候,利用结构体和result channel可以让handle变成一个通用函数。

1
2
3
4
5
type Request struct {
args []int
f func([]int) int
resultChan chan int
}

由client自己提供需要解决的函数和它所有的参数,并且定一个result channel用来接收返回值,这样每个request都有属于自己的response路径。

1
2
3
4
5
6
7
8
9
10
11
12
func sum(a []int) (s int) {
for _, v := range a {
s += v
}
return
}

request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// Send request
clientRequests <- request
// Wait for response.
fmt.Printf("answer: %d\n", <-request.resultChan)

当request都定义好之后,handler就只要这样写就可以了而完全不需要针对每个不同的request进行修改。

1
2
3
4
5
func handle(queue chan *Request) {
for req := range queue {
req.resultChan <- req.f(req.args)
}
}
并行算法

这个和mpi几乎一模一样不想写了

leaky buffer

这个的是基于漏桶作出的设计,漏桶是在通信拥塞控制(congestion control)中使用的算法。(congestion control 就比如之前学的慢启动,快速恢复等等。)像漏斗一样,无论流入的流量多大,经过漏斗之后流出的流量速率都是稳定的。

为什么叫leaky是因为当漏斗满了之后的数据就会被丢弃,然后经过其他的处理。

effective go里写的漏桶算法不太清楚我找了另一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package leakybuf

type LeakyBuf struct {
bufSize int // size of each buffer
freeList chan []byte
}

// NewLeakyBuf creates a leaky buffer which can hold at most n buffer, each
// with bufSize bytes.
func NewLeakyBuf(n, bufSize int) *LeakyBuf {
return &LeakyBuf{
bufSize: bufSize,
freeList: make(chan []byte, n),
}
}

// Get returns a buffer from the leaky buffer or create a new buffer.
func (lb *LeakyBuf) Get() (b []byte) {
select {
case b = <-lb.freeList:
default:
b = make([]byte, lb.bufSize)
}
return
}

// Put add the buffer into the free buffer pool for reuse. Panic if the buffer
// size is not the same with the leaky buffer's. This is intended to expose
// error usage of leaky buffer.
func (lb *LeakyBuf) Put(b []byte) {
if len(b) != lb.bufSize {
panic("invalid buffer size that's put into leaky buffer")
}
select {
case lb.freeList <- b:
default:
}
return
}

简单来说就是用带buffer的channel freeList来做这个漏斗,当漏斗空了的时候get函数会创建一个新的buffer(具体干什么我也不知道),而对put来说如果漏斗满了就会进入default(也就是抛弃当前buffer),这个算法是想证明利用channel来实现一些算法会更加简单易懂。

一些注意事项

协程参数调用问题

首先回到最前面buffer的功能里提到的可以当semaphore用的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
sem <- 1 // Wait for active queue to drain.
process(r) // May take a long time.
<-sem // Done; enable next request to run.
}

func Serve(queue chan *Request) {
for {
req := <-queue
go handle(req) // Don't wait for handle to finish.
}
}

当然这个代码有问题,虽然同时只有 MaxOutstanding个协程会执行,但是在高并发场景下会同时创建非常非常多个协程。为了避免这样的问题做了一定的改进:

1
2
3
4
5
6
7
8
9
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func() {
process(req) // Buggy; see explanation below.
<-sem
}()
}
}

把sem这个channel用来控制协程的创建,这样解决了上面的问题,但是这就涉及到标题的参数调用问题。

其实和多线程是一样的,上面创建的所有协程的process(req)指向的req都是同一个地址,在循环的过程中req不断地变化,就会导致process的req并不一定是想执行的那一个,就多线程的经验来说循环次数少的话很有可能第一个协程创建好的时候req已经循环到了最后一个。(多线程踩过的坑再踩一遍)

所以又多了第二次改进:

1
2
3
4
5
6
7
8
9
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func(req *Request) {
process(req)
<-sem
}(req)
}
}

把req作为参数传进去就没问题了,按照c的经验会给它在本地复制一块地址空间的。当然也可以在循环一开始就:req := req,但是看着很奇怪。