首页 > 技术文章 > Go语言学习:04-协程与通道

psbec 2021-02-28 20:22 原文

协程与通讯

协程

概念

在 Go 中,应用程序并发处理的部分被称作协程 (goroutines),它可以进行更有效的并发运算。协程和操作系统线程之间并无一对一的关系,协程由Go的协程调度器进行调度,调度器会将协程调度到操作系统线程上运行。

协程工作在相同的地址空间中,所以共享内存的方式一定是同步的,这个可以使用 sync 包来实现,不过我们很不鼓励这样做:Go 使用 channels 来同步协程

协程是轻量的,比线程更轻,使用 4K 的栈内存就可以在堆中创建它们,协程的栈会根据需要进行伸缩,不出现栈溢出。

使用GOMAXPROCS

GOMAXPROCS用于设置需要参与运算的CPU核数,例如一个8核的系统中,将GOMAXPROCS设置为8后,Go调度器会将协程调度到8个核心上执行。

设置GOMAXPROCS的方法:

runtime.GOMAXPROCS(numCores)

创建协程

创建协程的方法非常简单:

go somefunc()

通道

Go提供了通道(channel)用于协程之间通讯,通过通道进行通信的方式保证了同步性。通道就像一个可以用于发送类型化数据的管道:在任何给定时间,一个数据被设计为只有一个协程可以对其访问,所以不会发生数据竞争。 数据的所有权(可以读写数据的能力)也因此被传递。

通道定义(一个通道只能传输一种类型的数据):

var chName chan datatype

通道需要使用make创建,未初始化的通道的值是nil,范例:

var ch1 chan string = make(chan string)
ch2 := make(chan string)
ch3 := make(chan int)

通讯操作符<-

此操作符体现了通道的方向:

  • 流向通道(发送):ch <- int1表示:用通道 ch 发送变量 int1;
  • 从通道流出(接收),有几种方式:
  1. int2 = <- ch:变量 int2 从通道 ch接收数据;
  2. <- ch:获取通道中的当前值,例如使用if判断:if <- ch != 1000

通道缓冲

默认情况下,通信是同步且无缓冲的:在有接受者接收数据之前,发送是阻塞的;同样对于接收者来说,再收到数据之前,接收是阻塞的。

我们可以给通道设置缓冲区,在缓冲器满了之前,发送是不会阻塞的,这样就变成了异步通讯。
方法:make通道时指定缓冲区大小:

ch :=make(chan type, bufferSize)
  • bufferSize == 0 -> synchronous, unbuffered (阻塞)
  • bufferSize > 0 -> asynchronous, buffered(非阻塞)取决于value元素

通道的方向

通道本身是双向的,但是作为参数传递时,可以增加注解标识通道的方向:

var send_only chan<- int 		// channel can only send data
var recv_only <-chan int		// channel can only receive data

实际应用:

var c = make(chan int) // 双向通道
go source(c)
go sink(c)

func source(ch chan<- int){	// 这个函数限制通道为发送
	for { ch <- 1 }
}

func sink(ch <-chan int) {	// 限制通道为接收
	for { <-ch }
}

关闭通道

发送端可以关闭通道,接收端永远不需要关闭通道。

一般可以通过defer来关闭通道:

ch := make(chan float64)
defer close(ch)

通过下面的语句可以实现检测通道是否关闭:

if v, ok := <-ch; ok {	// ok为true时表示通道未关闭
  process(v)
}

还有一个更简单的办法,使用 for-range 语句来读取通道是更好的办法,因为这会自动检测通道是否关闭:

for input := range ch {
  	process(input)
}

使用Select选择通道

select监听所有case指定的通道,当有数据到达时,就调用对应的case下的语句:

select {
    case u:= <- ch1:
            ...
    case v:= <- ch2:
            ...
            ...
    default: // no value ready to be received
            ...
}

需要注意的是,select不支持fallthrough,任何case中执行break或者return将会导致select函数结束。

select 做的就是:选择处理列出的多个通信情况中的一个。

  • 如果都阻塞了,会等待直到其中一个可以处理
  • 如果多个可以处理,随机选择一个
  • 如果没有通道操作可以处理并且写了 default 语句,它就会执行:default 永远是可运行的(这就是准备好了,可以执行)。

在 select 中使用发送操作并且有 default 可以确保发送不被阻塞!如果没有 default,select 就会一直阻塞。

通道定时器

time.Ticker 结构体,这个对象以指定的时间间隔重复的向通道 C 发送时间值:

type Ticker struct {
    C <-chan Time // the channel on which the ticks are delivered.
    ...
}

创建time.Ticker的函数:

func NewTicker(dur) *Ticker // `time.Ticker`的时间间隔的单位是 ns

使用范例:

ticker := time.NewTicker(updateInterval)
defer ticker.Stop()
...
select {
    case u:= <-ch1:
        ...
    case v:= <-ch2:
        ...
    case <-ticker.C:
        logState(status) // call some logging function logState
    default: // no value ready to be received
        ...
}

select会定时收到ticker.C通道的数据,相当于周期性唤醒。
如果仅需要一次,那么就不需要创建Ticker变量了,直接使用time.After

ch := make(chan error, 1)
select {
    case resp := <-ch
        // use resp and reply
    case <-time.After(timeoutNs):
        // call timed out
        break
}

我们也可以通过此定时器实现sleep功能:

chRate := time.Tick(1e9) // 1秒
...
<- chRate // Sleep 1秒

并行计算范例

计算两个矩阵的逆的乘积,我们可以同时运行2个矩阵的逆,然后再做矩阵的乘积,代码如下:

func InverseProduct(a Matrix, b Matrix) {
    a_inv_future := InverseFuture(a)   // start as a goroutine
    b_inv_future := InverseFuture(b)   // start as a goroutine
    a_inv := <-a_inv_future			// 等待a的结果
    b_inv := <-b_inv_future			// 等待b的结果
    return Product(a_inv, b_inv)	// 计算a、b的乘积
}

InverseFuture函数是这里的重点,go启动的是一个闭包函数,该函数将future通道封闭进去了,所以上面的函数可以正常使用通道。

func InverseFuture(a Matrix) chan Matrix {
    future := make(chan Matrix)
    go func() {
        future <- Inverse(a)
    }()
    return future
}

推荐阅读