自学内容网 自学内容网

Go语言中的并发编程

Go语言中的并发编程

Go语言中的并发编程主要依赖于两个核心概念:goroutine 和 channel。

1. Goroutine

Goroutine 是Go语言中实现并发的基本单位。

它是一种轻量级的线程,由Go运行时(runtime)管理,
而不是由操作系统直接管理。Goroutine的开销非常小,因此可以在一个程序中创建成千上万个Goroutine。

goroutine的概念类似于线程,但goroutine是由Go的运行时(runtime)调度和管理的。

Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。

Go语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。

Go语言中使用goroutine非常简单,只需要在调用函数的时候在前面加上go关键字,就可以为一个函数创建一个goroutine

一个goroutine必定对应一个函数,可以创建多个goroutine去执行相同的函数。


package main

import (
"fmt"
"time"
)

func sayHello() {
for i := 0; i < 5; i++ {
fmt.Println("Hello")
time.Sleep(100 * time.Millisecond)
}
}

func main() {
go sayHello() // 启动一个新的Goroutine

for i := 0; i < 5; i++ {
fmt.Println("World")
time.Sleep(100 * time.Millisecond)
}
}
在这个例子中,sayHello 函数在一个新的Goroutine中运行,
而 main 函数在另一个Goroutine中运行。两个Goroutine并发执行,因此你会看到 "Hello""World" 交替打印。

在程序启动时,Go程序就会为main()函数创建一个默认的goroutine

当main()函数返回的时候该goroutine就结束了,所有在main()函数中启动的goroutine会一同结束

goroutine 的特点

轻量级:goroutine 使用非常少的内存,大约 2KB 的初始栈空间,且栈会根据需要动态增长和缩小。

并发执行:众多 goroutine 可以并发地执行,使得 Go 语言非常适合处理 I/O 密集型任务。

调度:Go 运行时负责管理 goroutine 的调度,可以在多个操作系统线程之间动态地分配 goroutine。

结束 goroutine

goroutine 的执行并不是无限的,它会在函数返回后结束。如果你希望主程序等待某个 goroutine 执行完毕,可以通过使用 Channel 来实现通信。

package main

import (
    "fmt"
    "time"
)

func main() {
    done := make(chan bool)

    go func() {
        fmt.Println("Goroutine 正在执行...")
        time.Sleep(2 * time.Second) // 模拟耗时任务
        done <- true // 发送信号通知主程序
    }()

    <-done // 阻塞,等待 goroutine 完成
    主程序会阻塞在这一行,直到从 done channel 接收到数据(即等待 goroutine 的通知)。这样可以确保在 goroutine 完成任务后,主程序才会继续执行。
    fmt.Println("Goroutine 完成.")
}

2. Channel

Channel 是Go语言中用于在Goroutine之间传递数据的通信机制。

Channel可以看作是一个管道,一个Goroutine可以将数据发送到Channel,另一个Goroutine可以从Channel接收数据。

创建 Channel


可以使用 make 函数来创建一个 channel。 channel 的类型是在声明时指定的,表明可以传递的数据类型。
ch := make(chan int) // 创建一个传递 int 类型数据的 channel

发送和接收数据

一旦 channel 被创建,就可以使用 <- 操作符发送和接收数据。

发送数据: 使用 ch <- value 将数据发送到 channel 中。

接收数据: 使用 value := <-ch 从 channel 中接收数据。

package main

import (
    "fmt"
)

func main() {
    ch := make(chan string)

    go func() {
        ch <- "Hello, Channel!" // 发送数据到 channel
    }()

    msg := <-ch // 从 channel 接收数据
    fmt.Println(msg) // 输出:Hello, Channel!
}

Channel 的类型

Channel 可以是无缓冲(unbuffered)或有缓冲(buffered):

无缓冲 Channel: 默认情况下,channel 是无缓冲的,发送和接收操作会阻塞,直到另一端准备好。

有缓冲 Channel: 可以创建一个有缓冲的 channel,指定缓冲区的大小。发送操作将在缓冲区满时阻塞,而接收操作将在缓冲区为空时阻塞。

ch := make(chan int, 3) // 创建一个缓冲区大小为 3 的 channel

ch <- 1 // 发送数据
ch <- 2
ch <- 3 // 发送数据不会阻塞,直到缓冲区满

// ch <- 4 // 这将导致阻塞,因为缓冲区已满

fmt.Println(<-ch) // 接收数据

使用 select 语句

select 语句用于等待多个 channel 操作,它会监听多个 channel,并在其中任意一个 channel 可用时执行对应的操作。

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "来自通道 1"
    }()

    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "来自通道 2"
    }()

    select {
    case msg1 := <-ch1:
        fmt.Println(msg1)
    case msg2 := <-ch2:
        fmt.Println(msg2)
    }
}
在这个示例中,select 语句阻塞,直到从 ch1 或 ch2 接收到数据。一旦某个 channel 可用,select 将执行相应的代码块。

简单的多个 goroutine

并发地启动多个 goroutine,每个 goroutine 输出一个计数。
package main

import (
    "fmt"
    "time"
)

func count(id int) {
    for i := 0; i < 5; i++ {
        fmt.Printf("Goroutine %d: %d\n", id, i)
        time.Sleep(100 * time.Millisecond) // 模拟耗时操作
    }
}

func main() {
    for i := 0; i < 3; i++ { // 启动 3 个 goroutine
        go count(i) // 每个 goroutine 执行 count 函数
    }

    time.Sleep(1 * time.Second) // 等待 goroutine 完成
    fmt.Println("主 goroutine 结束.")
}

使用 WaitGroup 等待所有 goroutine 完成

通常,直接使用 time.Sleep 来等待 goroutine 完成不是一个好的实践。

可以使用 sync.WaitGroup 来等待多个 goroutine 完成。

在这个示例中,使用 sync.WaitGroup 实现了更优雅的等待。

通过 wg.Add(1) 增加等待计数,

通过 wg.Done() 在每个 goroutine 完成时减少计数,

最后通过 wg.Wait() 等待所有计数归零,表示所有 goroutine 已完成。

package main

import (
    "fmt"
    "sync"
    "time"
)

func count(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 在函数结束时调用 Done(),表示这个 goroutine 完成了
    for i := 0; i < 5; i++ {
        fmt.Printf("Goroutine %d: %d\n", id, i)
        time.Sleep(100 * time.Millisecond) // 模拟耗时操作
    }
}

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 3; i++ {
        wg.Add(1) // 增加等待计数
        go count(i, &wg) // 启动 goroutine
    }

    wg.Wait() // 等待所有 goroutine 完成
    fmt.Println("所有 goroutine 完成,主 goroutine 结束.")
}

goroutine与线程

可增长的栈

OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),

一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),

goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,

虽然极少会用到这么大。所以在Go语言中一次创建十万左右的goroutine也是可以的。

GPM

GPM(Goroutine, OS Thread, and M:即 Goroutine、操作系统线程和调度器)是 Go 语言的调度模型,主要负责管理 goroutine 的执行。

1. G(Goroutine)

   G 代表一个 goroutine,是 Go 程序中的一个轻量级线程。
   每个 goroutine 被分配少量的内存(通常为 2KB 的初始栈空间),并且这个栈空间是动态增长的。
   通过 go 关键字创建的 goroutine 是并发执行的,多个 goroutine 可以同时处于工作状态,使得程序可以有效利用 CPU 资源。

2. M(Operating System Thread)

   M 代表操作系统线程。 Go 运行时系统管理着多个 M,每个 M 都是一个操作系统线程。
   调度器通过将 goroutine 分配到 M 上执行,使得 goroutine 的执行不受限于固定的操作系统线程数量。
   M 的数量可以根据系统的实际情况进行控制,开发者可以使用 runtime.GOMAXPROCS(n) 来设置可运行的 M 的最大数量。

3. P(Processor)

   P 代表处理器,是 Go 运行时的一个概念,表示调度器用来管理 goroutine 的资源。
   每个 P 有一个本地运行队列(local run queue),用于存放可以运行的 goroutine。
   P 还维护与 M 的关联,使得每个 M 在执行时能访问一个 P 及其本地队列。
   P 的数量通常与 GOMAXPROCS 的值相等,决定了可以并发执行的 goroutine 的数量。

GPM 模型的工作流程

创建 goroutine:开发者通过 go 关键字启动一个新的 goroutine。Go 运行时会在内部创建一个相应的 G 实例。

调度和执行:
调度器会将 G 放入适当的 P 的本地队列中。
M 会从 P 的本地队列中获取 G 并执行。每个 M 可以访问其绑定的 P 的 goroutine。

解决阻塞:
当 goroutine 进行 I/O 操作或其他阻塞操作时,调度器会将该 G 标记为阻塞状态,
同时将其他可运行的 G 移入 M 的任务队列,确保 CPU 资源的高效利用。

调度抢占:
时间片调度:调度器会为每个 goroutine 分配一个时间片(通常为 10 毫秒)。如果 goroutine 在这段时间内没有主动让出 CPU 时间,调度器会强制切换到其他 goroutine。

阻塞和唤醒:当 goroutine 执行阻塞操作(如 I/O 操作、channel 发送/接收等)时,调度器会将其挂起并将 CPU 资源分配给其他可运行的 goroutine。

回收和清理:
当 goroutine 完成其任务后,其相关的 G 实例将被清理,释放相关资源。

P与M一般也是一一对应的。他们关系是:

P管理着一组G挂载在M上运行。当一个G长久阻塞在一个M上时,runtime会新建一个M,阻塞G所在的P会把其他的G 挂载在新建的M上。当旧的G阻塞完成或者认为其已经死掉时 回收旧的M。

P的个数是通过runtime.GOMAXPROCS设定(最大256),Go1.5版本之后默认为物理线程数。 在并发量大的时候会增加一些P和M,但不会太多,切换太频繁的话得不偿失。

运行队列

Go 的调度器维护了多个运行队列(run queue),其中包括:

全局队列:所有的 goroutine 的全局运行队列。

本地队列:每个线程都有自己的本地队列,用于存储该线程上正在运行的 goroutine。

调度器会在这些队列中移动 goroutine,以确保每个线程能够尽快获取需要执行的 goroutine。

单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的,

goroutine则是由Go运行时(runtime)自己的调度器调度的,

这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。

其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,

包括内存的分配与释放,都是在用户态维护着一块大的内存池,

不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。

Go 运行时维护着一块大的内存池,减少了对操作系统的频繁 malloc 和 free 调用,降低了内存分配与释放的成本

另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上,

再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。

协作式与抢占式

虽然 Go 的调度器支持抢占式调度,但它也鼓励协作式调度。

开发者可以通过在 goroutine 中添加 runtime.Gosched() 来主动让出 CPU 控制权,

允许其他 goroutine 执行。这种方式可以帮助提高 CPU 的利用率。

在这个示例中,我们通过设置 GOMAXPROCS 来限制可以并发运行的 goroutine 数量。

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d is starting\n", id)
    time.Sleep(2 * time.Second) // 模拟耗时工作
    fmt.Printf("Worker %d is done\n", id)
}

func main() {
    // 设置 GOMAXPROCS 为 2
    runtime.GOMAXPROCS(2)

    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }

    wg.Wait() // 等待所有 goroutine 完成
    fmt.Println("所有工作完成")
}

channel

单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。

虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信

如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

channel类型

var 变量 chan 元素类型

var ch1 chan int // 声明一个传递整型的通道

var ch2 chan bool // 声明一个传递布尔型的通道

var ch3 chan []int // 声明一个传递int切片的通道

创建channel

通道是引用类型,通道类型的空值是nil

var ch chan int
fmt.Println(ch) // <nil>

声明的通道后需要使用make函数初始化之后才能使用。

创建channel的格式如下:

make(chan 元素类型, 容量(缓冲区大小 可选))

channel操作

通道有发送(send)、接收(receive)和关闭(close)三种操作。

发送和接收都使用<-符号。

现在我们先使用以下语句定义一个通道:

ch := make(chan int)

发送

将一个值发送到通道中。

ch <- 10 // 把10发送到ch中

接收

从一个通道中接收值。

x := <- ch // 从ch中接收值并赋值给变量x
<-ch       // 从ch中接收值,忽略结果

关闭

我们通过调用内置的close函数来关闭通道。

close(ch)

关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。

通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。

关闭后的通道有以下特点:

  1. 对一个关闭的通道再发送值就会导致panic。
  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  3. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  4. 关闭一个已经关闭的通道会导致panic。

无缓冲的通道

无缓冲通道(也称为阻塞通道)是一种在发送和接收操作之间建立直接同步的机制。

无缓冲的通道只有在有人接收值的时候才能发送值。

package main

import (
    "fmt"
)

func main() {
    ch := make(chan string) // 创建一个无缓冲的通道

    go func() {
        fmt.Println("Goroutine 正在等待发送消息...")
        ch <- "Hello, Channel!" // 发送消息,这里会阻塞,直到有接收者
        fmt.Println("消息已发送")
    }()

    msg := <-ch // 在此阻塞,等待接收消息
    fmt.Println("接收到的消息:", msg)
}

特点

阻塞特性:无缓冲通道的发送和接收操作都是阻塞的,在没有接收者时,发送操作会等待接收者。在接收到数据之后,发送操作才会继续,反之亦然。这使得发送和接收操作之间形成一种直接的同步机制。

适用于协作:无缓冲通道非常适合用于 goroutine 之间的协作与同步,例如在任务执行完成后通知主程序。

有缓冲的通道

它与无缓冲通道的主要区别在于,有缓冲通道可以在不阻塞的情况下存储一定数量的消息,直到缓冲区满。这种机制使得 goroutine 的发送和接收操作更加灵活。

创建有缓冲的通道
可以在创建通道时指定缓冲区的大小,示例如下:

ch := make(chan string, 3) // 创建一个容量为 3 的有缓冲通道
package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string, 3) // 创建一个容量为 3 的有缓冲通道

    go func() {
        for i := 0; i < 5; i++ {
            msg := fmt.Sprintf("消息 %d", i)
            fmt.Printf("发送: %s\n", msg)
            ch <- msg // 发送消息
            time.Sleep(100 * time.Millisecond) // 模拟耗时操作
        }
        close(ch) // 发送完毕后关闭通道
    }()

    // 接收消息
    for msg := range ch { // 会一直接收直到通道关闭
        fmt.Printf("接收到: %s\n", msg)
    }
}

特点

非阻塞的发送:当有缓冲区空间时,发送操作不会阻塞。如果缓冲区已满,则发送操作会阻塞,直到有接收者取走数据。

提高了并发效率:通过有缓冲通道,可以提高并发程序的效率,允许发送者和接收者在不同速度下处理数据。

可以在多个 goroutine 之间传递数据:多个 goroutine 可以同时向同一个有缓冲通道发送数据,提高了数据传递的灵活性。

for range从通道循环取值

当向通道中发送完数据时,我们可以通过close函数来关闭通道。

当通道被关闭时,再往该通道发送值会引发panic,从该通道取值的操作会先取完通道中的值,再然后取到的值一直都是对应类型的零值。

那如何判断一个通道是否被关闭了

多重赋值读通道

使用多重赋值从通道读取数据时,可以检测到通道的关闭。具体方法是从通道读取两个值:一个是通道中的数据,另一个是一个布尔值,表示通道是否被关闭。

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)

    go func() {
        messages := []string{"消息1", "消息2", "消息3"}
        for _, msg := range messages {
            ch <- msg // 发送消息
            time.Sleep(100 * time.Millisecond)
        }
        close(ch) // 关闭通道
    }()

    // 从通道接收数据
    for {
        msg, ok := <-ch // 尝试从通道接收数据
        if !ok { // ok 为 false 说明通道已关闭
            fmt.Println("通道已关闭,退出循环")
            break
        }
        fmt.Println("接收到:", msg)
    }
}

单向通道

单向通道是 Go 语言中的一种通道形式,其限制某个通道只能用于发送或接收数据,而不能同时进行。这种特性有助于提高代码的可读性和安全性,避免意外的错误使用。

单向通道在函数参数中尤其常用,可以通过 chan<- 和 <-chan 来分别定义。

1. 定义单向通道

发送通道:可以使用 chan<- 来表示一个只用于发送数据的通道。
接收通道:可以使用 <-chan 来表示一个只用于接收数据的通道。

package main

import (
    "fmt"
)

// 定义一个函数,接受一个发送通道作为参数 在这个函数中,ch 被定义为一个发送通道,意味着这个函数只能向 ch 发送数据,无法从中接收数据
func sendData(ch chan<- string) {
    messages := []string{"消息1", "消息2", "消息3"}
    for _, msg := range messages {
        ch <- msg // 发送消息
    }
    close(ch) // 关闭通道
}

// 定义一个函数,接受一个接收通道作为参数 这个函数的参数是一个接收通道,只能从 ch 接收数据,无法发送数据。
func receiveData(ch <-chan string) {
    for msg := range ch {
        fmt.Println("接收到:", msg) // 接收并打印消息
    }
}

func main() {
    ch := make(chan string) // 创建通道

    go sendData(ch)   // 启动发送 goroutine
    receiveData(ch)   // 启动接收 goroutine,阻塞等待接收数据
}

单向通道的优点

类型安全:通过限制通道的方向,可以防止函数之间的意外数据发送或接收错误,提升代码的安全性。

明确的设计:使用单向通道可以清晰地表达数据流向,使得程序逻辑更易于理解。

worker pool(goroutine池)

Worker Pool(工作池或 goroutine 池)是一种设计模式,用于管理大量的 goroutine,

通过限制并发执行的 goroutine 数量来提高程序的性能和资源使用效率。

这种模式特别适用于需要处理大量相似任务的场景,比如处理 HTTP 请求、计算任务等。

Worker Pool 模式的核心 idea 是将任务分配给一组 worker(工作者 goroutine),这些 worker 会从一个共享的任务队列中获取任务并执行。

通过控制 worker 的数量,可以防止过多的 goroutine 同时运行,导致系统资源耗尽。

实现步骤

创建任务队列:使用通道来存放待处理的任务。

创建 Worker:Worker 从任务队列中接受任务并执行。

启动 Workers:预先启动一定数量的 worker,以处理任务。

发布任务:将任务发送到任务队列中,供 worker 处理

package main

import (
    "fmt"
    "sync"
    "time"
)

// 1. 定义任务结构体
type Task struct {
    id int // 任务标识
}

// 2. 定义 Worker 函数,处理任务
func worker(id int, tasks <-chan Task, wg *sync.WaitGroup) {
    defer wg.Done() // 标记当前 worker 完成
    for task := range tasks { // 从任务通道中接收任务
        fmt.Printf("Worker %d 正在处理任务 %d\n", id, task.id)
        time.Sleep(1 * time.Second) // 模拟耗时操作
        fmt.Printf("Worker %d 完成任务 %d\n", id, task.id)
    }
}

func main() {
    // 3. 设置 worker 数量
    const numWorkers = 3 // 设定工作池的 worker 数量
    tasks := make(chan Task, 10) // 创建任务通道(容量为 10)
    var wg sync.WaitGroup // 创建 WaitGroup 用于等待所有 worker 完成

    // 4. 启动 workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1) // 将 WaitGroup 的计数器加 1
        go worker(i, tasks, &wg) // 启动一个 worker goroutine
    }

    // 5. 发布任务
    for i := 1; i <= 10; i++ {
        tasks <- Task{id: i} // 将任务发送到任务通道
    }
    close(tasks) // 关闭任务通道,表示不再添加任务

    // 6. 等待所有 worker 完成
    wg.Wait() // 阻塞直到所有 worker 完成
    fmt.Println("所有任务完成.")
}


select多路复用

多路复用通常指的是通过 select 语句来同时监控多个通道(channels),以便在多个 goroutine 之间进行有效的通信。

这种机制允许程序同时处理多个 I/O 操作或 goroutine,极大地提高了并发程序的灵活性和效率。

多路复用的主要概念

select 语句: select 语句是 Go 提供的控制结构,可以在多个通道操作之间进行选择。它会阻塞,直到其中一个通道准备好发送或接收数据。

非阻塞操作: 通过使用 select,可以在多个通道之间进行非阻塞的选择,这样可以避免由于单个操作阻塞而导致整个程序停滞的情况。

超时处理: 使用 select 可以很容易地实现超时机制,从而在某个操作未完成时采取相应措施。

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    // 启动第一个 goroutine
    go func() {
        time.Sleep(2 * time.Second) // 模拟耗时操作
        ch1 <- "来自通道 1"
    }()

    // 启动第二个 goroutine
    go func() {
        time.Sleep(1 * time.Second) // 模拟耗时操作
        ch2 <- "来自通道 2"
    }()

    // 使用 select 语句处理多个通道
    for i := 0; i < 2; i++ { // 等待两个结果
        select {
        case msg1 := <-ch1:
            fmt.Println("接收到:", msg1)
        case msg2 := <-ch2:
            fmt.Println("接收到:", msg2)
        }
    }
    
    fmt.Println("所有消息处理完毕.")
}

并发安全和锁

在并发编程中,多个 goroutine 可能会同时访问共享数据,导致数据竞争和不一致的状态。

因此,确保并发安全是一个重要的考虑。

在 Go 语言中,主要通过使用锁和其他同步机制来实现并发安全。

并发安全

并发安全是确保多个 goroutine 在并行执行时,访问共享资源(如变量、数据结构)时不会导致不一致或错误的状态。为了实现并发安全,通常需要对共享资源进行适当的同步。

锁的类型

互斥锁(Mutex):

Go 语言提供的 sync.Mutex 类型可以实现互斥锁,用于保护共享数据的访问。
当一个 goroutine 获取锁时,其他试图获取同一锁的 goroutine 将被阻塞,直到锁被释放。

读写锁(RWMutex):
sync.RWMutex 允许有多个读者或一个写者访问共享资源。
读者可以并发访问,当一个写者想要写数据时,所有的读者会被阻塞。

channel:
Go 语言的通道(channel)也可以用作同步机制,通过通道的发送和接收来实现数据的安全交换。

互斥锁

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    balance int           // 共享变量
    mu      sync.Mutex   // 互斥锁
)

// 存款函数
func deposit(amount int, wg *sync.WaitGroup) {
    defer wg.Done() // 确保完成后减少 WaitGroup 的计数
    mu.Lock() // 获取锁
    fmt.Printf("正在存款: %d\n", amount)
    balance += amount // 更新共享变量
    mu.Unlock() // 释放锁
}

// 取款函数
func withdraw(amount int, wg *sync.WaitGroup) {
    defer wg.Done()
    mu.Lock() // 获取锁
    fmt.Printf("正在取款: %d\n", amount)
    balance -= amount // 更新共享变量
    mu.Unlock() // 释放锁
}

func main() {
    var wg sync.WaitGroup

    wg.Add(2) // 增加等待的 goroutine 数量
    go deposit(100, &wg)
    go withdraw(50, &wg)

    wg.Wait() // 等待所有 goroutine 完成

    fmt.Printf("账户余额: %d\n", balance) // 输出最终余额
}

读写互斥锁

读写互斥锁的重要概念

多个读者:RWMutex 允许多个 goroutine 同时读取共享资源,这在读操作远多于写操作的场景下,可以显著提高性能。

独占写者:当一个 goroutine 写数据时,其他所有的读者和写者都会被阻塞,直到写操作完成。这确保了写操作的安全性和数据的一致性。

性能优势:在读操作频繁的情况下,读写互斥锁通常比普通的互斥锁(sync.Mutex)性能更好,因为它减少了读操作的阻塞。

package main

import (
    "fmt"
    "sync"
    "time"
)

// 定义一个共享数据结构
type Data struct {
    value int
    mu    sync.RWMutex // 读写互斥锁
}

// 读取数据的函数
func (d *Data) Read() int {
    d.mu.RLock() // 获取读锁
    defer d.mu.RUnlock() // 确保在函数结束时释放读锁
    return d.value // 返回共享数据
}

// 写入数据的函数
func (d *Data) Write(value int) {
    d.mu.Lock() // 获取写锁
    defer d.mu.Unlock() // 确保在函数结束时释放写锁
    d.value = value // 更新共享数据
}

func main() {
    data := Data{}

    // 启动多个写操作
    go func() {
        for i := 0; i < 5; i++ {
            data.Write(i) // 写入数据
            fmt.Printf("写入: %d\n", i)
            time.Sleep(500 * time.Millisecond)
        }
    }()

    // 启动多个读操作
    for i := 0; i < 5; i++ {
        go func() {
            time.Sleep(200 * time.Millisecond) // 确保读取时写入已经发生
            value := data.Read() // 读取数据
            fmt.Printf("读取: %d\n", value)
        }()
    }

    // 睡眠一段时间以等待所有 goroutine 完成
    time.Sleep(3 * time.Second)
}

Cond(条件变量)

sync.Cond 用于在多个 goroutine 之间进行复杂的同步。

当一个条件不满足时,可以阻塞当前 goroutine,

并在条件满足时通知等待的 goroutine 继续执行。

主要概念

阻塞和唤醒:使用条件变量,goroutine 可以在某个特定条件未满足时等待。当某些条件被更改(满足)时,可以通过条件变量唤醒等待的 goroutine。

互斥锁:条件变量通常和互斥锁(sync.Mutex 或 sync.RWMutex)结合使用,以确保在检查条件和修改条件之间的原子性。

多生产者与消费者:条件变量经常用于多生产者和多消费者模式,以协调生产和消费过程。

package main

import (
    "fmt"
    "sync"
    "time"
)

type BoundedQueue struct {
    items    []int// 队列元素
    capacity int// 队列容量
    mu       sync.Mutex// 互斥锁
    cond     *sync.Cond // 条件变量
}

// 创建一个新的 BoundedQueue
func NewBoundedQueue(capacity int) *BoundedQueue {
    q := &BoundedQueue{
        items:    make([]int, 0, capacity),
        capacity: capacity,
    }
    q.cond = sync.NewCond(&q.mu)
    return q
}

// 向队列中添加一个元素
func (q *BoundedQueue) Enqueue(item int) {
    q.mu.Lock() // 加锁以保护共享数据
    defer q.mu.Unlock()

    for len(q.items) == q.capacity {
        q.cond.Wait() // 队列满时,等待条件变量
    }

    // 添加元素
    q.items = append(q.items, item)
    fmt.Printf("生产者: 生产 %d\n", item)
    q.cond.Signal() // 唤醒等待的消费者
}

// 从队列中取出一个元素
func (q *BoundedQueue) Dequeue() int {
    q.mu.Lock() // 加锁以保护共享数据
    defer q.mu.Unlock()

    for len(q.items) == 0 {
        q.cond.Wait() // 队列空时,等待条件变量
    }

    // 删除并返回第一个元素
    item := q.items[0]
    q.items = q.items[1:]
    fmt.Printf("消费者: 消费 %d\n", item)
    q.cond.Signal() // 唤醒等待的生产者
    return item
}

func main() {
    queue := NewBoundedQueue(5) // 创建一个容量为 5 的队列
    var wg sync.WaitGroup

    // 启动多个生产者
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 5; j++ {
                queue.Enqueue(j + id*100) // 生产不同的产品
                time.Sleep(time.Second) // 模拟生产时间
            }
        }(i)
    }

    // 启动多个消费者
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 5; j++ {
                queue.Dequeue() // 消费产品
                time.Sleep(2 * time.Second) // 模拟消费时间
            }
        }()
    }

    wg.Wait() // 等待所有 goroutine 完成
    fmt.Println("所有生产和消费任务完成.")
}

Once(一次性)

在 Go 语言中,sync.Once 是一个用于确保某个操作只被执行一次的同步机制。

它用于对资源进行单次初始化或设置,避免多次初始化可能导致的数据不一致和资源浪费。

常见的使用场景包括单例模式和全局变量的初始化。

主要概念:

确保执行一次:使用 sync.Once 的 Do 方法可以确保给定的函数由于多次调用而只执行一次,

无论有多少 goroutine 并发调用它。

线程安全:Once 是线程安全的,适合在多个 goroutine 中使用,例如在初始化全局变量时。

package main

import (
    "fmt"
    "sync"
)

// 1. 定义全局变量
var (
    instance *Singleton     // 存储单例实例的指针
    once     sync.Once      // 用于确保操作只执行一次
)

// 2. 定义单例结构体
type Singleton struct {
    value int // 单例的属性,存储的值
}

// 3. 获取单例实例的函数
func GetInstance() *Singleton {
    // 4. 确保传入的函数只会执行一次
    once.Do(func() {
        fmt.Println("初始化单例...") // 仅在第一次被调用时打印
        instance = &Singleton{value: 42} // 实例化单例,赋予 value 值为 42
    })
    return instance // 返回单例实例
}

func main() {
    var wg sync.WaitGroup // 创建一个 WaitGroup,用于等待 goroutine 完成

    // 5. 启动多个 goroutine
    for i := 0; i < 5; i++ {
        wg.Add(1) // 增加 WaitGroup 的计数
        go func(id int) {
            defer wg.Done() // 在 goroutine 结束时减少 WaitGroup 的计数
            inst := GetInstance() // 获取单例实例
            fmt.Printf("Goroutine %d: 单例值为 %d\n", id, inst.value) // 输出 goroutine ID 及单例的值
        }(i) // 向 goroutine 传递当前 ID
    }

    wg.Wait() // 等待所有 goroutine 完成
}

.WaitGroup(等待组)

主要概念

计数器:WaitGroup 维护一个计数器,表示正在运行的 goroutine 数量。

Add:通过 Add(n) 方法增加 goroutine 数量,n 通常为 1,表示启动一个新的 goroutine。

Done:在 goroutine 执行完毕后,需要调用 Done() 方法来减少计数器的值。

Wait:主程序调用 Wait() 方法,该方法会阻塞,直到计数器的值变为 0,表示所有的 goroutine 都已完成。

sync.Map

sync.Map 是 Go 1.9 版本引入的一种并发安全的、无锁的、键值对映射。

sync.Map 是一种并发安全的映射类型,旨在提供高效的多 goroutine 读写操作。

它是标准库 sync 包中的一部分,特别设计用于替代传统的 map 类型,

以解决在并发情况下通常需要手动管理的同步问题。

主要特点

并发安全:sync.Map 在并发环境中安全,不需要额外的锁来管理对它的读写操作。

读优化:它采用了读优化策略,适合读操作远多于写操作的场景。

结合原生 map 使用:针对不常变动的值,可以直接使用原生的 map,而对于需要并发读写的场景使用 sync.Map。

内置接口:sync.Map 提供了一些内置方法,简化了对映射的操作。

常用方法

Store(key, value):将键-值对存储到 sync.Map 中。

Load(key):获取指定键的值,如果存在返回值和 true,否则返回 nil 和 false。

LoadOrStore(key, value):如果指定键存在,返回该键的值;否则,存储键-值对并返回新值。

Delete(key):删除指定键及其对应的值。

Range(f func(key, value interface{}) bool):遍历 sync.Map 中的所有键值对。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var m sync.Map // 创建一个 sync.Map

    // 存储键值对
    m.Store("a", 1)
    m.Store("b", 2)

    // 启动多个 goroutine 进行并发读写
    var wg sync.WaitGroup
    wg.Add(3)

    // 读取数据
    go func() {
        defer wg.Done()
        if value, ok := m.Load("a"); ok {
            fmt.Printf("读取键 'a': %v\n", value)
        }
    }()

    // 读取数据
    go func() {
        defer wg.Done()
        if value, ok := m.Load("b"); ok {
            fmt.Printf("读取键 'b': %v\n", value)
        }
    }()

    // 存储新值
    go func() {
        defer wg.Done()
        m.Store("c", 3)
        fmt.Println("存储键 'c': 3")
    }()

    wg.Wait() // 等待所有 goroutine 完成

    // 遍历 Map
    m.Range(func(key, value interface{}) bool {
        fmt.Printf("遍历 - 键: %v, 值: %v\n", key, value)
        return true // 返回 true 继续遍历
    })
}

原子操作

在 Go 语言中,原子操作是指在执行某个操作时不会被其他 goroutine 中断或干扰的一种操作。

原子操作通常用于处理共享变量的并发访问,以确保数据的一致性和完整性。

Go 提供了一些原子操作的支持,主要集中在 sync/atomic 包中。

原子操作在 Go 语言中是处理并发共享数据的一种高效方法,

通过 sync/atomic 包提供的工具,可以在不使用显式锁的情况下安全地更新和读取共享变量。

这特别适用于需要频繁更新的场景,如计数器、状态标志等。

原子操作的优势

高效性:原子操作避免了使用 mutex(互斥锁)带来的额外开销,通常速度更快。

避免死锁:由于不涉及锁的获取和释放,原子操作可以减少死锁的风险。

简单性:可以在多 goroutine 中访问同一变量而不需要进行显式的锁机制。

atomic 包中的主要函数

sync/atomic 包提供了针对整数和指针类型的原子操作,常用的函数包括:

AddInt32 / AddInt64:对 32 位或 64 位整数执行加算操作。

LoadInt32 / LoadInt64:加载 32 位或 64 位整数的当前值。

StoreInt32 / StoreInt64:将 32 位或 64 位整数的值存储为一个新的值。

CompareAndSwapInt32 / CompareAndSwapInt64:比较并交换操作,如果当前值与预期值相等,则将其替换为新值。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

func main() {
    var counter int64 // 定义一个共享变量

    var wg sync.WaitGroup // 用于等待所有 goroutine 完成

    // 启动多个 goroutine 进行并发增加
    for i := 0; i < 5; i++ {
        wg.Add(1) // 增加 WaitGroup 的计数
        go func() {
            defer wg.Done() // 在 goroutine 完成时减少计数
            for j := 0; j < 1000; j++ {
                atomic.AddInt64(&counter, 1) // 原子操作增加计数
            }
        }()
    }

    wg.Wait() // 等待所有 goroutine 完成

    fmt.Printf("最终计数: %d\n", atomic.LoadInt64(&counter)) // 获取最终计数并输出
}


原文地址:https://blog.csdn.net/gopher9511/article/details/142437670

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!