自学内容网 自学内容网

Golang笔记——channel

大家好,这里是Good Note,关注 公主号:Goodnote,专栏文章私信限时Free。本文详细介绍Golang的数据类型channel,包括基本概念,源码,常见问题及其解决并发。

在这里插入图片描述

基本概念

定义

  • Channel 是 Go 语言中用于实现协程(goroutine)之间通信的核心机制。通过 channel,可以在协程之间通过数据传递实现同步。Go 的并发编程依赖 CSP(Communicating Sequential Processes)模型,强调“通过通信共享内存,而不是通过共享内存来通信”。

优势

Goroutine 解放了程序员,让我们更能贴近业务去思考问题。而不用考虑各种像线程库、线程开销、线程调度等等这些繁琐的底层问题,goroutine 天生替你解决好了。channel 的灵活性和扩展能力使其成为 Go 并发编程的核心工具,而非单纯的同步机制。

  • Channel 的组合能力
    channel 可以将多个 goroutine 的结果汇集到一个统一的 channel 中,主协程可以从这个 channel 中依次接收结果,从而实现数据聚合。

  • Channel 与其他机制的结合

    1. select:通过 select 实现多路复用,从多个 channel 中同时监听数据并随机选择一个可用的 channel 进行操作。
    2. cancel:通过 channel 实现协程的取消信号,通知其他协程终止工作。
    3. timeout:通过 select 和带超时的 channel,可以优雅地处理超时操作。
  • 对比 mutex

    • channel 的扩展性更强:它不仅可以传递数据,还能实现协程间的同步、取消、超时等功能。
    • mutex 的局限性mutex 只用于同步共享资源,无法传递数据,也不支持组合或超时等复杂功能。

尽量使用 channel: 通过 channel 实现协程之间的通信,避免使用共享内存。

常见操作符

  • 发送:ch <- value
  • 接收:value := <-ch
  • 关闭:close(ch)

Goroutine概要介绍

和进程和线程一样,Goroutine也是提高程序并发的一种手段,Goroutine就是代码中使用go关键词创建的执行单元,也是大家熟知的有“轻量级线程”之称的协程,协程是不为操作系统所知的,它由编程语言层面实现,上下文切换不需要经过内核态,再加上协程占用的内存空间极小,所以有着非常大的发展潜力。

那么协程间的通信方式是什么呢?这就引出了大名鼎鼎的channel,汉译“通道” 。


channel分类

使用make可以为channel分配缓存,使用new不行,也就是一开始问题提到的加数字,建立缓存信道。

通道(channel),协程通信方式,协程之间共享同一个线程的内存是真的共享内存吗?不是,可以通过go语言并发编程的座右铭理解:使用通信来共享内存,而不是通过共享内存来通信。channel可能是促使Go使用CSP(Communicating Sequential Processes)作为并发模型的一个很重要原因,毕竟CSP的核心观念也是使用通道将两个并发执行的实体连接起来。

channel是否带有缓存,是否会阻塞读写,可以通过make中参数进行判断。

无缓冲的通道

无缓冲的通道(unbuffered channel),没有能力保存任何值,必须读取双方同时做好准备,一方没有做好准备就会造成死锁,channel只做为流通的通道,该通道是同步的(协程同步方法之一)。

ch := make(chan string)

有缓冲的通道

有缓冲的通道(buffered channel),有一定能力存储数据,在channel没满之前可以一直接收,在channel没空之前可以一直取出,该通道是异步的。

ch := make(chan string,capacity)

可以使用close关闭channel,已经关闭的channel只能读不能再写数据,对于 nil 的 channel,无论收发都会被阻塞。当然也可以设置通道为只读或者只写。

设置单方向的 channel

默认情况下,通道是双向的,也就是可以同时进行写入和读取操作。

但是,我们经常见一个通道作为参数进行传递而值希望对方是单向使用的,要么只让它发送数据,要么只让它接收数据,这时候我们可以指定通道的方向。

单向 channel 变量的声明非常简单,如下:
单方向通道声明:

var ch1 chan int       // ch1 是一个正常的 channel,不是单向的
var send chan<- int    // send 是单向channel,只用于写数据
var recv <-chan int    // recv 是单向channel,只用于读数据

注意:不能将单向 channel 转换为普通 channel。以下操作会报错:


并发问题引入

先看下面的一个小问题:

func main() {
   ch := make(chan string)
   ch <- "lcc"
   fmt.Println(<-ch)
}
  • 结果:报错死锁;
  • 原因:make后面没有带数字,新建的是无缓冲信道,此类信道只能用来流通数据,并不存储数据,接收一个数据,一定要及时将数据读取出去,不然就会发现阻塞。

那么加一个数字即可建立缓冲信道:

func main() {
   ch := make(chan string,1)
   ch <- "lcc"
   fmt.Println(<-ch)
}
  • 结果:正常输出 lcc。
  • 原因:缓冲通道允许暂存数据,无需同步操作即可写入数据。

还有其他办法吗?
加一个协程,将数据读出去。

func main() {
   ch := make(chan string)
   ch <- "lcc"
   go func() {
      fmt.Println(<-ch)
   }()
}
  • 结果:还是报错
  • 原因:在执行到写入channel就已经发生了死锁,都等不到你去开新的协程,那就将开协程内容前移到写内容之前,让他先候着?
func main() {
   ch := make(chan string)
   go func() {
      fmt.Println(<-ch)
   }()
   ch <- "lcc"
}
  • 结果:不报错了,但是没有输出
  • 原因:协程结束的时间顺序,main也是一个协程,那么匿名函数的协程和main协程哪个先结束了呢?

如果main协程先结束,那么没有结果就很正常了啊,怎么办?先让main睡一会

func main() {
   ch := make(chan string)
   go func() {
      fmt.Println(<-ch)
   }()
   ch <- "lcc"
   time.Sleep(time.Millisecond)
}

有结果了,那么又有一个新问题,time.Sleep 是一种不确定的等待方式,time.Sleep()的参数,不好掌控。
有没有更好的方法?使用 sync.WaitGroup可以更加优雅地等待所有协程完成。

package main

import (
   "fmt"
   "sync"
)

func main() {
   var wg sync.WaitGroup
   ch := make(chan string)

   wg.Add(1)
   go func() {
      defer wg.Done()
      fmt.Println(<-ch)
   }()

   ch <- "lcc"
   wg.Wait() // 等待所有协程完成
}
  • 结果:正常输出 lcc
  • 原因
    • 使用 sync.WaitGroup 确保主协程在所有子协程完成之前不会退出。

源码实现

type hchan struct {
qcount   uint           // 当前队列中剩余元素个数
dataqsiz uint           // 环形队列长度,即可以存放的元素个数
buf      unsafe.Pointer // 环形队列指针
elemsize uint16         // 每个元素的大小
closed   uint32            // 标识关闭状态
elemtype *_type         // 元素类型
sendx    uint        // 队列下标,指示元素写入时存放到队列中的位置
recvx    uint           // 队列下标,指示元素从队列的该位置读出
recvq    waitq          // 等待读消息的goroutine队列
sendq    waitq          // 等待写消息的goroutine队列
lock mutex              // 互斥锁,chan不允许并发读写
}

channel储存的单位是一个指针指向的array,这个array的功能就是一个环形队列,也就是说是 index = index % length(array) 得出来的结果。而且最下面的

buf

  • 指向底层循环数组的指针。
  • 仅缓冲型通道存在该字段,无缓冲通道此值为 nil。

closed

  • 标志通道是否已关闭:
    • 0:未关闭。
    • 非 0:已关闭。

sendx,recvx

  • 均指向底层循环数组,表示当前可以发送和接收的元素位置索引值(相对于底层数组)。

sendq,recvq

  • 分别表示被阻塞的 goroutine,这些 goroutine 由于尝试读取 channel 或向 channel 发送数据而被阻塞。

waitq 是一个封装 goroutine 的双向链表:

type waitq struct {
    first *sudog // 队列的第一个节点
    last  *sudog // 队列的最后一个节点
}

lock
lock是一个互斥锁,通过互斥锁确保了 channel 的并发安全。channel 每次只能在一个 goruntine 中使用。所以使用channel通信当然不用加锁了,因为go底层已经加上了。

操作 channel 可能存在的panic/阻塞

操作行为
关闭的 channel 读操作可以读取数据,返回零值和 false(如果数据已被取完)。
关闭的 channel 写操作panic
未初始化的 channel 读写操作无论发送还是接收,都会永久阻塞。
已满的缓冲通道写操作阻塞,直到有空间可写入。
空缓冲通道读操作阻塞,直到有数据可读取。
无缓冲通道写操作阻塞,直到有协程准备接收数据。
无缓冲通道读操作阻塞,直到有协程准备发送数据。

阻塞不等于 panic

  • 阻塞是正常的程序行为,在并发编程中很常见。程序等待某个条件满足,暂停执行。
  • panic 是运行时错误,表示程序运行失败,需要修复。

关闭一个 nil 的 channel;重复关闭一个 channel,都会panic。


对已经关闭的的chan进行读写

答案:众所周知,已经关闭的channel只能读不能再写数据,具体如下:

  • 读已经关闭的 chan 能一直读到东西,但是读到的内容根据通道内关闭前是否有元素而不同。
    • 如果 chan 关闭前,buffer 内有元素还未读 , 会正确读到 chan 内的值,且返回的第二个 bool 值(是否读成功)为 true。
    • 如果 chan 关闭前,buffer 内有元素已经被读完,chan 内无值,接下来所有接收的值都会非阻塞直接成功,返回 channel 元素的零值,但是第二个 bool 值一直为 false。
  • 写已经关闭的 chan 会 panic

为什么写已经关闭的 chan 就会 panic 呢?
源码解读:

// 在 src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }   
    // 省略其他逻辑
}

c.closed != 0 时,表示通道已关闭,此时执行写操作,源码直接触发 panic,输出的内容为 send on closed channel


为什么读已关闭的 chan 会一直能读到值?

源码解读:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 省略部分逻辑
    lock(&c.lock)
    // 当chan被关闭且缓存为空时
    // ep 是指 val, ok := <-c 中 val 的地址
    if c.closed != 0 && c.qcount == 0 {
        if receenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        // 如果接收值的地址不空,接收值会获得该值类型的零值
        // typedmemclr 会根据类型清理相应内存
        if ep != null {
            typedmemclr(c.elemtype, ep)
        }
        // 返回两个参数 selected 和 received
        // 第二个参数即 val, ok := <-c 中的 ok
        return true, false
    }
}

总结:

  • c.closed != 0 && c.qcount == 0:通道关闭且缓存为空。
  • 如果接收值的地址 ep 不为空,则接收值会是该类型的零值。
  • typedmemclr 根据类型清理内存。这就解释了为什么关闭的通道会返回对应类型的零值,且第二个返回值始终为 false

对未初始化nilchan 进行读写

答案:读写未初始化的 chan 都会阻塞。

为什么写 nilchan 就会 阻塞 呢?
源码解读:

// 在 src/runtime/chan.go 中
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        // 不能阻塞,直接返回 false,表示未发送成功
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    // 省略其他逻辑
}

当通道未初始化时,c == nil

  • 如果 chan 不能阻塞,直接返回 false,表示写入失败。
  • 如果 chan 能阻塞,则直接阻塞当前协程并抛出错误,错误信息为 chan send (nil chan)

为什么读 nilchan 就会 阻塞 呢?
源码解读:

// 在 src/runtime/chan.go 中
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    // 省略其他逻辑
}

未初始化的 chan 此时等于 nil

  • 如果 chan 不能阻塞,直接返回 false,表示读操作失败。
  • 如果 chan 能阻塞,则直接阻塞当前协程并抛出错误,错误信息为 chan receive (nil chan)

对已满的 channel 写操作,对空的 channel 读操作

答案:都会被阻塞。需区分以下两种情况:

  1. 缓冲通道
    • 写操作:当缓冲区已满时,写操作会阻塞,直到缓冲区有空间。
    • 读操作:当缓冲区为空时,读操作会阻塞,直到缓冲区有数据。
  2. 无缓冲通道
    • 写操作:写操作会始终阻塞,直到有协程准备好接收数据。
    • 读操作:读操作会始终阻塞,直到有协程准备好发送数据。

通道的多路复用 select

问题:从单个的channel取数据可以使用遍历,那么如何从多个channel存取出数据?
答案是:通道的多路复用select

从多个通道中读取数据:

package main

import (
"fmt"
)

func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
for {
select {
case c1 := <-ch1:
fmt.Println(c1)
case c2 := <-ch2:
fmt.Println(c2)
}
}
}()
ch1 <- 1
ch2 <- 2
}

注意点:

  • 每个 case 都必须是一个通信。

  • 如果有多个 case 都可以运行,select 会随机公平地选出一个执行,其他不会执行。

  • 如果多个 case 都不能运行,使用 default 分支

    • 若有 default 子句,则执行该语句,可以避免阻塞;
    • 若无default 子句,select 将阻塞,直到某个 case 可以运行。

如果通道没有数据发送,但 select 中有存在接收通道数据的语句;或者空的select,将发生死锁,如下片段1和片段2:
片段1:

package main

import "fmt"

type name interface{}

func main() {
lcc := make(chan int,3)
select {
case i:=<-lcc:
fmt.Println(i)
}
}

片段2:

package main

type name interface{}

func main() {
select {}
}

每个 case 表达式只求值一次,select 不会自动循环执行,需要在外部添加循环结构(如 for)以连续操作,并用break退出for循环。

package main

import "fmt"


func main() {
lcc := make(chan int,3)
lcc<-1
lcc<-2
lcc<-3
LOOP:
for  {
select {
case i:=<-lcc:
fmt.Println(i)
default:
break LOOP
}
}
}


Q&A

如何优雅地关闭 Channel?

channel 自身不提供查询关闭状态的功能。无法直接判断一个 channel 是否已关闭
关闭一个 channel 需要遵循特定原则,以避免 panic 和程序的不确定性。以下是基于不同场景如何优雅关闭 channel 的方法和注意事项。


关闭 Channel 的基本原则
  1. 不要从接收者(receiver)侧关闭 channel

    • 通常由发送者(sender)负责关闭 channel,因为发送者知道数据发送完成的时机。
  2. 不要在多个发送者情况下关闭 channel

    • 如果有多个发送者,则单个发送者无法确定是否其他发送者还需要继续发送数据,因此不能贸然关闭。
  3. 最本质的原则

    • 不要关闭已关闭的 channel。否则会异常:panic: close of closed channel。

    • 不要向已关闭的 channel 发送数据


优雅关闭的实现方法

根据发送者和接收者的数量,采取不同的策略。

  1. 一个 Sender,一个 Receiver

  2. 一个 Sender,多个 Receivers

  3. 多个 Senders,一个 Receiver

  4. 多个 Senders,多个 Receivers

  5. 单 Sender,单 Receiver

    • Sender 负责关闭数据 channel
    • Receiver 使用 range 循环读取数据。
  6. 单 Sender,多 Receiver

    • Sender 负责关闭数据 channel
    • 所有 Receiver 使用 range 循环读取,通道关闭后自动退出。
  7. 多 Sender,单 Receiver

    • 增加一个关闭信号 channel(如 stopCh)。
    • Receiver 通过 stopCh 通知 Senders 停止发送。
    • Senders 接收关闭信号后,停止发送数据,退出。
    • 关闭顺序
      1. Receiver 关闭 stopCh
      2. Sender 完成后退出。
      3. 主协程关闭 dataCh
  8. 多 Sender,多 Receiver

    • 同样使用关闭信号 channel
    • Receiver 通过 stopCh 通知所有协程停止工作。
    • Senders 和 Receivers 同时监听 stopCh,根据信号优雅退出。
    • 关闭顺序
      1. Receiver 完成任务后关闭 stopCh
      2. 所有协程(包括 Sender 和 Receiver)监听信号并退出。
      3. 主协程关闭 dataCh,释放资源。

未正确处理 channel 的关闭和阻塞导致Goroutine 泄漏

Channel 引发 Goroutine 泄漏的常见场景:

  1. 未关闭的 Channel

    • Sender 持续等待发送数据,而 Receiver 提前退出,导致 Sender 阻塞。
  2. 数据未消费完

    • Sender 持续发送数据到缓冲区满后阻塞,而 Receiver 没有及时消费所有数据。
  3. 永久阻塞的 select

    • select 监听的所有通道都不可用,导致 Goroutine 永久阻塞。

如何避免 Channel 引发 Goroutine 泄漏:

  1. 确保 Channel 被正确关闭

    • Sender 负责关闭 channel,在任务完成时及时关闭。
    • 避免重复关闭通道导致 panic
  2. 使用带缓冲的 Channel

    • channel 添加缓冲区,减少 Sender 和 Receiver 的压力。
  3. 使用退出信号机制

    • 增加一个退出信号 channel(如 stopCh),用于通知 Goroutine 停止工作。
  4. 避免未消费的数据

    • 确保 Receiver 能消费完 Sender 生成的所有数据;若无法消费完,通知 Sender 停止发送。
  5. 使用带 defaultselect

    • select 中添加 default 分支,防止所有通道都不可用时 Goroutine 永久阻塞。

避免关闭channel的panic

在 Go 中,关闭 channel 时发生 panic 通常是因为试图关闭一个已经关闭的 channel,或者向已关闭的 channel 发送数据。为了避免这些问题,可以遵循以下几个原则:

  1. 关闭前检查 channel 是否已关闭
    在某些场景下,可能需要检查 channel 是否已经关闭。可以利用 Go 的多值返回特性,来判断从 channel 接收到的数据是否为零值(表示 channel 已关闭)。

    v, ok := <-ch
    if !ok {
        // channel 已关闭,执行处理逻辑
    }
    
  2. 使用 sync.Once 来确保 channel 只关闭一次
    如果需要保证一个 goroutine 在程序中只关闭一次 channel,可以使用 sync.Once 来实现这一点:

    var once sync.Once
    var ch = make(chan int)
    
    once.Do(func() {
        close(ch)
    })
    

    sync.Once 确保 close(ch) 只会执行一次,不会出现重复关闭的情况。

总结来说,避免 panic 的关键是确保:

  • 只有一个 goroutine 负责关闭 channel。
  • 在关闭 channel 前进行适当的检查,避免重复关闭。
  • 在关闭后,不再向 channel 发送数据。

历史文章

MySQL数据库

MySQL数据库

Redis

Redis数据库笔记合集

Golang

  1. Golang笔记——语言基础知识
  2. Golang笔记——切片与数组
  3. Golang笔记——hashmap
  4. Golang笔记——rune和byte

原文地址:https://blog.csdn.net/haopingbiji/article/details/145089965

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!