goroutine 是 Golang 裡面一個十分強大的功能,不過要如何在不同的 gorountines 之間溝通,就必須了解 channels 。

  • channel 的定義,以及什麼是 unbuffered channel 與 buffered channel
  • channel 的阻塞 (block) 以及 deadlock
  • 接收 channel 資料的方法
  • 關閉 channel

channel 的定義

channel 跟 map 一樣,必須透過 make 來宣告,未經宣告的 channel 是 nil (zero value),最後一個參數是容量,可以不宣告,不宣告的話就是 0 。

容量為 0 的 channel 又叫做 unbuffered channel,反之容量 > 0 就是 buffered channel

ci := make(chan int)            // unbuffered channel of integers
cj := make(chan int, 0)         // unbuffered channel of integers
cs := make(chan *os.File, 100)  // buffered channel of pointers to Files

channel 主要用途有兩個:

傳遞資料

xs := []string{"3wkhc whn", "dwslhfwmo", "dshkdkjdkjd", "33333222sddd"}
c := make(chan string)
// append strings  
go func(s []string) {
	var result string
	for _, v := range s {
		result = result+string(v)
	}
	c <- result
}(xs)
fmt.Println(<-c)
// 3wkhc whndwslhfwmodshkdkjdkjd33333222sddd

或是傳送信號

xs := []string{"3wkhc whn", "dwslhfwmo", "dshkdkjdkjd", "33333222sddd"}
c := make(chan int)  
// sort slice strings
go func(s *[]string) {
	sort.Strings(*s)
	c <- 1  
}(&xs)
doSomethingForAWhile()
<-c   
fmt.Println("finished sort")
fmt.Println(xs)
// finished sort
// [33333222sddd 3wkhc whn dshkdkjdkjd dwslhfwmo]

channel 有兩個角色: sender and receiver

沒有特別指定的 channel ,可以當作 sender 也可以當作 receiver

c := make(chan int) //sender and receiver
cs := make(chan<- int) //sender, write onlt 
cr := make(<-chan int) //recevier, read only

receiver 會一直被阻塞 (block),直到有資料進來

v := <-c // always block

如果是 unbuffered channel 的話 ,sender 也會被阻塞,直到有一個 receiver 接收了資料

c<- value // if unbuffered, blocked until receiver ready

這邊先來解釋一下 buffer 的概念,buffer 是 channel 的容量,channel 可以先把 sender 的資料暫存在 buffer 裏面,因此在 buffer 還沒有滿之前,sender 就可以一直傳資料進去,直到 buffer 滿了,就必須等待 receiver 接收資料,釋放出 buffer 的空間,才能再傳遞資料。

c := make(chan int, 100)

buffered channel 可以來做多線程處理的信號,比如說 handle request:

var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
    sem <- 1    // Wait for active queue to drain.
    process(r)  // May take a long time.
    <-sem       // Done; enable next request to run.
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)  // Don't wait for handle to finish.
    }
}

MaxOutstanding 是我們設定最大可以處理的 requests 數量,超過了就會先卡住,直到有一個 process 完成後釋出空間。

上面的設計還有一些問題,我們設定了 MaxOutstanding 作為處理的上限,但是卻可以接受無限數量的 request ,並且開啟無限數量的 goroutine,雖然我們限制了 process 的數量,還是有可能因為大量的 requests 而開啟大量的 goroutine 造成系統崩潰,怎麼改進可以去看看 effective-go


channel 的阻塞 (block) 以及 deadlock

前面有提到 block ,當 receiver 要接收資料,但沒有 sender 傳送資料,以及 sender 傳送資料但沒有 receiver 要接收資料的時候,都會產生 block ,當有 goroutines 產生 block,但是卻沒有其中一個可以處理掉 block 的時候,就會產生 deadlock,以下是一個很簡單的範例:

func main() {
	c := make(chan int)
	fmt.Println(<-c) 
}
// fatal error: all goroutines are asleep - deadlock!

上面的例子是在 main 的 goroutine 中要接受 c 的訊息,但是我們並沒有開啟另一個 goroutine 來傳送訊息,因此系統判斷 deadlock 產生,直接退出程式。

以下列幾個常見的 deadlock 產生的情況:

  1. 沒有開啟 goroutine 做溝通
  2. 沒有 receiver
  3. 沒有 sender(尤其是使用 for loop 的時候)

接收 channel 資料的方法

跟 receiver 比 sender 相對單純,在設計 receiver 的接收模式時,沒有考慮清楚,就會容易產生 deadlock 或是邏輯上的錯誤。

常見的 receiver 持續讀取方法有 for range 以及 for select 這兩種方式:

	c := make(chan int)
	go func(c chan int){
		for i:=0; i < 10; i++ {
			c<- i
		}
		close(c) 
	}(c)

	for v := range c {
		fmt.Println(v)
	}

for-loop 的用法比較簡單,但是如果沒有關閉 channel 的話,就會造成 for-loop 內的 receiver 一直在等待,進而造成 deadlock。除了 close channel 之外,也可以嘗試關閉 for-loop

	c := make(chan int)
	go func(c chan int){
		for i:=0; i < 10; i++ {
			c<- i
		}
	}(c)

	count := 0 // some condition
	for v := range c {
		fmt.Println(v)
		count++
		if count == 10 { // if achieve goal
			break
		}
	}

Select 比較複雜,但是比起 for range 來說,是更加有彈性的

  1. 監聽多個 channel,但是同一個 channel 滿足兩個 case 的話,則會隨機執行
  2. 設定 timeout 機制
  3. default case 持續監聽
Loop:
		for {
			select{
			case v:= <- ch1:
				// do something
				fmt.Println("case1:", v)
			case v:= <- ch2:
				// do something
				fmt.Println("case2:", v)
			case <-time.After(2 * time.Second):
				fmt.Println("times up")
				break Loop
			}
		}
/*
default
case1: 0
default
case1: 1
case2: 2
case1: 3
default
....
*/

比較特別的是 case <-time.After(2 * time.Second) 作為一個 timeout 機制,當兩秒過後就會把 Loop 這個一個標籤內的 for-loop 關掉。(Loop 的名稱可以任意,是用來標記要 break 的區塊,因為 for-loop 跟 select 都可以 break) 。


關閉 channel

接下來再談談關閉 close() ,如果關閉了一個 channel ,那麼當 sender 往這個 channel 送的時候,會造成 panic: send on closed channel。那如果是監聽 closed channel 會發生什麼事呢?

	c := make(chan int)
	close(c)
	// c <- 1 // panic

	Loop:
		for  {
			select {
			case v,ok := <- c:
				time.Sleep(500*time.Millisecond)
				fmt.Println(ok) // false
				fmt.Println(v) // 0, zero value
			case <- time.After(2*time.Second):
				fmt.Println("time out")
				break Loop
			}
		}

則是會拿到 zero value of type,要怎麼確定這個 channel 是否已經關閉,可以用 v, ok := ←chan

如果 channel 是關閉的,ok == false。

再來聊一個有趣的應用: nil channel,不管是 sender or receiver 都會是 block

var c chan int 
c <- 10   // deadlock
<- c      // deadlock

看起來沒什麼特別的,不過請看看以下的範例:

func merge (c1, c2 <-chan int, out chan<- int) {
	for {
		select {
		case v := <- c1:
			out <- v
		case v := <- c2:
			out <- v
		}
	}
}

這是一個很容易理解的 merge channel func ,把 c1, c2 的值,合併到 out 去

結果某天,我們發現了 out 不斷地收到 0 ,一大堆的 0! 為什麼呢?

v, ok := <-c // when channel closed, v = 0, ok = false

當 c1 or c2 其中一個 channel 被關閉的時候,這個 channel 的 receiver 將會立即收到 zero value

我們可以做以下的改良:

	// use nil channel to disable select cases
  for c1 != nil || c2 != nil {
		select {
		case v,ok := <- c1:
			if !ok {
				c1 = nil
				continue
			}
			out <- v
		case v, ok := <- c2:
			if !ok {
				c2 = nil
				continue
			}
			out <- v
		}
	}
	close(out)

channel 的使用先簡單就介紹到這邊,未來有想到什麼有趣的主題再來深入討論。

Reference: