本文共 3061 字,大约阅读时间需要 10 分钟。
目录
我们先写一个数据生产者和数据消费者的程序,数据生产者不断生成数据,消费者不断消费生产者生产的数据。
package channel_closeimport ( "fmt" "sync" "testing")//数据生产者func dataProducer(ch chan int, wg *sync.WaitGroup) chan int { wg.Add(1) go func() { for i := 0; i < 10; i++ { ch <- i } wg.Done() }() return ch}//数据消费者func dataConsumer(ch chan int, wg *sync.WaitGroup) { wg.Add(1) go func() { for i := 0; i < 10; i++ { data := <-ch fmt.Println(data) } wg.Done() }()}//数据消费者func dataConsumer2(ch chan int, wg *sync.WaitGroup) { wg.Add(1) go func() { for i := 0; i < 10; i++ { data := <-ch fmt.Println(data) } wg.Done() }()}//channel还未关闭的场景func TestChannelNotClosed(t *testing.T) { ch := make(chan int) var wg sync.WaitGroup dataProducer(ch, &wg) dataConsumer(ch, &wg) wg.Wait()}
一旦我们生产的数据和消费的数据不一致时,比如生产者可以生成 11 个数,消费者仍然只消费 10 个数,或者生产者生成 10 个数,而消费者去消费 11 个数时,就会报下面的错误:
fatal error: all goroutines are asleep - deadlock!
为了解决这种问题,Go 急需 channel 具有关闭功能,且关闭后会广播所有的订阅者。
//关闭 channelclose(channelName)//ok=true表示正常接收,false表示通道关闭if val, ok := <-ch; ok { //other code}
当 channel 已正常关闭,数据接收者还继续接收数据,则接收的数据为 channel 对应数据的默认值。
package channel_closeimport ( "fmt" "sync" "testing")//数据生产者func dataProducer(ch chan int, wg *sync.WaitGroup) chan int { wg.Add(1) go func() { for i := 0; i < 10; i++ { ch <- i } //关闭 channel close(ch) //向关闭的 channel 发送消息,会报 panic: send on closed channel //ch <- 11 wg.Done() }() return ch}//数据消费者func dataReceiver(ch chan int, wg *sync.WaitGroup) { wg.Add(1) go func() { //我们这里多接收一个数据,看看拿到的值是什么 for i := 0; i < 11; i++ { data := <-ch fmt.Println(data) } wg.Done() }()}//关闭掉 channelfunc TestCloseChannel(t *testing.T) { ch := make(chan int) var wg sync.WaitGroup dataProducer(ch, &wg) dataReceiver(ch, &wg) wg.Wait()}/*=== RUN TestCloseChannel01234567890 --- PASS: TestCloseChannel (0.00s)PASS*/
我们会发现,当 channel 已关闭后,我们多接收了一个值,由于我们 channel 定义的数据类型为 int,则拿到的数据类型讲师 int 型的默认值 0。
package channel_closeimport ( "fmt" "sync" "testing")//数据生产者func dataProducer(ch chan int, wg *sync.WaitGroup) chan int { wg.Add(1) go func() { for i := 0; i < 10; i++ { ch <- i } //关闭 channel close(ch) wg.Done() }() return ch}//数据消费者func dataConsumer(ch chan int, wg *sync.WaitGroup) { wg.Add(1) go func() { for { if data, ok := <-ch; ok { fmt.Println(data) } else { break; } } wg.Done() }()}//数据消费者func dataReceiver(ch chan int, wg *sync.WaitGroup) { wg.Add(1) go func() { for { if data, ok := <-ch; ok { fmt.Println(data) } else { //通道关闭后就退出 break; } } wg.Done() }()}//关闭掉 channelfunc TestCloseChannel(t *testing.T) { ch := make(chan int) var wg sync.WaitGroup //1个数据生成者 dataProducer(ch, &wg) //多个数据消费者 dataReceiver(ch, &wg) dataConsumer(ch, &wg) wg.Wait()}/*=== RUN TestCloseChannel0123456789--- PASS: TestCloseChannel (0.00s)PASS*/
注:这篇博文是我学习中的总结,如有转载请注明出处:
上一篇:
下一篇: