Golang--协程和管道
1、概念
程序:
是为完成特定任务、用某种语言编写的一组指令的集合,是一段静态的代码。(程序是静态)
进程:
是程序的一次执行过程。正在运行的一个程序,进程作为资源分配的单位,在内存中会为每个进程分配不同的内存区域,是一个动的过程 ,进程有它自身的产生、存在和消亡的过程。(进程是动态的)
线程:
进程可进一步细化为线程,是一个程序内部的一个执行路径。若一个进程同一时间并执行多个线程,就是支持多线程的。
单核:并发执行
多核:并发执行和并行执行
协程:
又称为微线程,纤程,协程是一种用户态的轻量级线程
作用:在执行A函数的时候,可以随时中断,去执行B函数,然后中断继续执行A函数(可以自动切换),注意这一切换过程并不是函数调用(没有调用语句),过程很像多线程,然而协程中只有一个线程在执行(协程的本质是个单线程)对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就将寄存器上下文和栈保存到某个其他地方,然后切换到另外一个任务去计算。在任务切回来的时候,恢复先前保存的寄存器上下文和栈,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,让其看到:该线程好像是一直在计算,io比较少,从而会更多得将cpu执行权限分配给我们的线程(线程是cpu控制的,而协程是程序自身控制的,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级)
2、启动一个协程
案例:
编写一个程序,完成如下功能:
- 在主线程中,开启一个goroutine(协程),该goroutine每隔1秒输出"hello golang"
- 在主线程中也每隔一秒输出"hello world",输出10次后退出程序
- 要求主线程和goroutine同时执行
package main import ( "fmt" "strconv" "time" ) func test(){ for i := 1; i <= 10; i++{ fmt.Println("hello golang" + strconv.Itoa(i)) //阻塞一秒 time.Sleep(time.Second * 1)// 1s } } func main(){ //主线程 //开启一个协程 go test() for i := 1; i <= 10; i++{ fmt.Println("hello world" + strconv.Itoa(i)) //阻塞一秒 time.Sleep(time.Second * 1)// 1s } }
开启协程:使用go关键字,例如:go test()
执行流程:
注意:如果主线程退出,协程还没执行结束,协程也会提前结束(主死从随)
3、启动多个协程
package main
import (
"fmt"
"strconv"
"time"
)
func test1(){
for i := 1; i <= 10; i++{
fmt.Println("hello golang" + strconv.Itoa(i))
//阻塞一秒
time.Sleep(time.Second * 1)// 1s
}
}
func main(){ //主线程
//开启一个协程--使用匿名函数
//匿名函数 + 外部变量 = 闭包
for i := 1; i <= 5; i++{
go func(n int){
fmt.Println(n)
}(i)
}
//开启一个协程--使用普通函数
go test1()
for i := 1; i <= 10; i++{
fmt.Println("hello world" + strconv.Itoa(i))
//阻塞一秒
time.Sleep(time.Second * 1)// 1s
}
}
4、使用WaitGroup控制协程退出
WaitGroup用于等待一组线程的结束。父线程调用Add方法来设定应等待的线程的数量,每个被等待的线程在结束时应调用Done方法。同时主线程里可以调用Wait方法阻塞至所有线程结束。用于解决主线程在子协程结束后自动结束,防止主线程退出导致协程被迫退出,
package main
import (
"fmt"
"sync" // 并发包
"time"
"strconv"
)
var wg sync.WaitGroup // 并发包的变量, 只定义无需初始化--类似计数器
func test1(){
//协程执行完毕,协程数量-1
defer wg.Done()
for i := 1; i <= 10; i++{
fmt.Println("hello golang" + strconv.Itoa(i))
//阻塞一秒
time.Sleep(time.Second * 2)// 2s
}
// //协程执行完毕,协程数量-1
// wg.Done()
}
func main(){ //主线程
//开启一个协程--使用普通函数
wg.Add(1) // 协程数量+1
go test1() // 开启协程
//主线程一直阻塞,等待协程执行完毕--直到协程执行完毕才会继续执行主线程
wg.Wait()
for i := 1; i <= 10; i++{
fmt.Println("hello world" + strconv.Itoa(i))
//阻塞一秒
time.Sleep(time.Second * 1)// 1s
}
}
5、多个协程操纵同一数据案例(使用互斥锁同步协程)
错误案例:
package main import ( "fmt" "sync" // 并发包 ) var wg sync.WaitGroup // 并发包的变量, 只定义无需初始化 //定义一个变量: var cnt int func iAdd(){ defer wg.Done() for i := 1; i <= 10000; i++{ cnt = cnt + 1 } } func iSub(){ defer wg.Done() for i := 1; i <= 10000; i++{ cnt = cnt - 1 } } func main(){ //主线程 //开启一个协程--使用普通函数 wg.Add(2) // 协程数量+2 go iAdd() // 开启协程 go iSub() wg.Wait() fmt.Println(cnt) //结果:理论上cnt是0,但实际上不是 }
两个协程进行的顺序是不一定的,比如取cnt这个值时,两个协程进行的取值和运算操作顺序的关系,可能导致对原本正确的结果进行了覆盖,导致iAdd函数的偏移量和iSub函数的偏移量之和不等于0,实际结果是个不确定的值。
5.1 互斥锁
正确案例:使用互斥锁同步协程
解决上面问题的方案:
确保一个协程在执行逻辑的时候另外的协程不执行---->利用锁的机制---->互斥锁
互斥锁:
Mutex为互斥锁,Lock()加锁,Unlock()解锁,使用Lock()加锁后,便不能再次对其进行加锁,直到利用Unlock()解锁对其解锁后,才能再次加锁,适用于读写不确定场景,即读写次数没有明显的区别-----性能效率相对来说偏低package main import ( "fmt" "sync" // 并发包 ) var wg sync.WaitGroup // 并发包的变量, 只定义无需初始化 var lock sync.Mutex //加入互斥锁 //定义一个变量: var cnt int func iAdd(){ defer wg.Done() for i := 1; i <= 10000; i++{ lock.Lock() // 加锁 cnt = cnt + 1 lock.Unlock() // 解锁 } } func iSub(){ defer wg.Done() for i := 1; i <= 10000; i++{ lock.Lock() // 加锁 cnt = cnt - 1 lock.Unlock() // 解锁 } } func main(){ //主线程 wg.Add(2) go iAdd() go iSub() wg.Wait() fmt.Println(cnt) //结果:理论上cnt是0,但实际上不是 }
5.2 读写锁
读写锁:
RWMutex是一个读写锁,其经常用于读次数远远多于写次数的场景。
在读的时候,数据之间不产生影响,写和读之间才会产生影响package main import ( "fmt" "sync" // 并发包 "time" ) var wg sync.WaitGroup // 并发包的变量, 只定义无需初始化 var lock sync.RWMutex //加入读写锁 func read(){ defer wg.Done() //如果只是读取数据,那么这个锁不产生任何影响,但是如果是写入数据,那么就会产生阻塞 lock.RLock() fmt.Println("尝试读取数据中...") time.Sleep(time.Second) fmt.Println("读取数据成功!") lock.RUnlock() } func write(){ defer wg.Done() lock.Lock() fmt.Println("尝试修改数据中...") time.Sleep(time.Second * 2) fmt.Println("修改数据成功!") lock.Unlock() } func main(){ //主线程 //场景:读多写少 wg.Add(1) go write() for i := 1; i < 20; i++{ wg.Add(1) go read() } wg.Wait() }
在写的过程中,锁生效,但是读可以并发读,锁没有影响
6、管道
6.1 管道概念
管道:
- 管道本质就是一个数据结构--队列
- 数据特性:先进先出
- 自身线程安全,多协程访问时,不需要加锁,channel本身就是线程安全的
- 管道有类型的,如:一个string类型的管道只能存放string类型数据
6.2 管道的定义
定义:
var 变量名 chan 数据类型
- chan是管道关键字
- 数据类型指的是管道的类型,指里面放入数据的类型,int类型的管道只能写入整数int
- 管道是引用类型,必须初始化才能写入数据,即make后才能使用
package main import ( "fmt" ) func main(){ //定义管道、声明管道 ---> 定义一个int类型的管道 var intChan chan int //通过make初始化:管道可以存放3个int类型的数据 intChan = make(chan int,3) //证明管道是引用类型 fmt.Printf("intChan的值:%v\n",intChan) //intChan的值:0xc0000ac080 //向管道中写入数据 intChan <- 10 intChan <- 20 //输出管道的长度 fmt.Printf("管道的实际长度:%v,管道的容量:%v\n",len(intChan),cap(intChan)) // 管道的实际长度:2,管道的容量:3 //从管道中读取数据 num := <-intChan fmt.Println(num) // 10 //关闭管道---> 不能向关闭的管道中写入数据 close(intChan) // intChan <- 100 // 报错:panic: send on closed channel num,flag := <-intChan fmt.Println(num,flag) // 20 true }
6.3 管道的关闭
管道的关闭:
使用内置函数close可以关闭管道,当管道关闭后,就不能再向管道写数据了,但是仍然可以从该管道读取数据。
例子:上面代码的
6.4 管道的遍历
管道的遍历:
管道支持for-range的方式进行遍历,注意:
- 在遍历时,如果管道没有关闭,则会出现deadlock(死锁)的错误
- 在遍历时,如果管道已经关闭,则会正常遍历数据,遍历完后,就会退出遍历
package main import ( "fmt" ) func main(){ //定义管道、声明管道 ---> 定义一个int类型的管道 var intChan chan int = make(chan int,100) for i := 1; i <= 100; i++{ intChan <- i } //遍历前,如果没有关闭管道,那么会出现死锁 -- deadlock close(intChan) //for-range读取管道中的数据 for v := range intChan{ fmt.Printf("%v ",v) } }
6.5 协程和管道协同工作案例
案例需求:
请完成协程和管道协同工作的案例,具体要求:
- 开启一个writeData协程,向管道中写入50个整数
- 开启一个readData协程,从管道中读取writeData写入的数据
- 注意:writeData和readData操作的是同一个管道
- 主线程需要等待writeData和readData协程都完成工作才能退出
package main import ( "fmt" "sync" "time" ) var wg sync.WaitGroup // 并发包的变量, 只定义无需初始化 func writeData(intChan chan int){ defer wg.Done() // 协程执行完毕,协程数量-1 for i := 1; i <= 50; i++{ fmt.Printf("写入数据:%v\n",i) intChan<- i time.Sleep(time.Second * 1) } close(intChan) } func readData(intChan chan int){ defer wg.Done() // 协程执行完毕,协程数量-1 for v := range intChan{ fmt.Printf("读取数据:%v\n",v) time.Sleep(time.Second * 1) } } func main(){ //init var intChan chan int = make(chan int,100) //开启线程 wg.Add(2) go writeData(intChan) go readData(intChan) wg.Wait() }
6.6 声明只读只写管道
管道可以声明为只读或者只写性质:
package main import ( "fmt" ) func main(){ //默认情况下,管道是双向的 --> 可读可写 var intChan1 chan int = make(chan int, 10) intChan1 <- 10 fmt.Println(intChan1) //声明为只写 --> 只能写,不能读 var intChan2 chan <- int = make(chan int, 5) intChan2 <- 10 fmt.Println(intChan2) //fmt.Println(<- intChan2) // 报错:invalid operation: <-intChan2 (receive from send-only type chan<- int) //声明为只读--> 只能读,不能写 var intChan3 <- chan int = make(chan int, 5) fmt.Println(intChan3) //intChan3 <- 10 // 报错:invalid operation: intChan3 <- 10 (send to receive-only type <-chan int) }
6.7 管道的阻塞
阻塞的情况:
package main import ( "fmt" "sync" //"time" ) var wg sync.WaitGroup // 并发包的变量, 只定义无需初始化 func writeData(intChan chan int){ defer wg.Done() // 协程执行完毕,协程数量-1 for i := 1; i <= 10; i++{ fmt.Printf("写入数据:%v\n",i) intChan<- i //time.Sleep(time.Second * 1) } close(intChan) } func readData(intChan chan int){ defer wg.Done() // 协程执行完毕,协程数量-1 for v := range intChan{ fmt.Printf("读取数据:%v\n",v) //time.Sleep(time.Second * 1) } } func main(){ //init var intChan chan int = make(chan int,5) //开启线程 wg.Add(1) go writeData(intChan) //go readData(intChan) wg.Wait() }
- 在Go语言中,如果一个管道(channel)只被写入数据而没有被读取,那么最终会导致管道阻塞。这是因为Go语言中的管道是同步的,即写入操作会等待读取操作完成后才能继续。如果没有读取操作来接收数据,写入操作就会一直等待,从而导致程序阻塞。
- 在代码片段中,如果
writeData
函数一直向intChan
管道写入数据,而没有readData
函数来读取这些数据,那么writeData
函数中的intChan<- i
操作会在管道缓冲区满后阻塞。如果缓冲区大小是5,那么在写入5个数据后,writeData
函数就会阻塞,直到有读取操作来接收数据。- 如果程序中没有其他地方读取这个管道,那么
writeData
函数会一直阻塞,这可能会导致程序死锁。为了避免这种情况,通常应该确保在创建管道时,有相应的读取操作来处理写入的数据。- 在实际编程中,应该根据程序的逻辑和需求来设计管道的读写操作,确保数据能够被正确处理,避免出现阻塞或死锁的情况。如果确实需要一个只写而不读的管道,那么应该考虑使用其他机制来处理数据,或者在程序中添加适当的逻辑来处理这种情况。
确保有相应的读取操作(不管读写速度是否一致(例如读的速度小于写的速度),只要有读取操作就不会阻塞):
package main import ( "fmt" "sync" "time" ) var wg sync.WaitGroup // 并发包的变量, 只定义无需初始化 func writeData(intChan chan int){ defer wg.Done() // 协程执行完毕,协程数量-1 for i := 1; i <= 10; i++{ fmt.Printf("写入数据:%v\n",i) intChan<- i time.Sleep(time.Second * 1) } close(intChan) } func readData(intChan chan int){ defer wg.Done() // 协程执行完毕,协程数量-1 for v := range intChan{ fmt.Printf("读取数据:%v\n",v) time.Sleep(time.Second * 5) } } func main(){ //init var intChan chan int = make(chan int,5) //开启线程 wg.Add(2) go writeData(intChan) go readData(intChan) wg.Wait() }
6.8 select功能
select功能:在Go语言中,
select
语句用于处理多个通道(channel)的操作。它类似于switch
语句,但是专门用于处理通道的发送和接收操作。select
语句会监听多个通道的操作,一旦其中一个通道准备好进行发送或接收操作,select
就会执行相应的分支。
在这个语法中,每个case
分支对应一个通道操作。如果多个case
分支同时满足条件,Go语言会随机选择一个分支执行。如果没有任何分支满足条件,并且存在default
分支,那么就会执行default
分支。如果没有default
分支,select
语句会阻塞,直到有某个分支满足条件。
- case后面必须进行的是io操作,不能是等值,随机去选择一个io操作
- default防止select被阻塞住,加入default
package main import ( "fmt" "time" ) func main(){ //chan int var intChan chan int = make(chan int, 5) go func(){ time.Sleep(time.Second * 4) intChan <- 10 }() //chan string var strChan chan string = make(chan string, 5) go func(){ time.Sleep(time.Second * 2) strChan <- "hello world" }() //chan float32 var floChan chan float32 = make(chan float32, 5) go func(){ time.Sleep(time.Second * 1) floChan <- 1.1111 }() //select--> 多路复用 --> 哪个管道有数据就执行哪个管道 select{ case v := <- intChan: fmt.Printf("select:读取intChan数据:%v\n",v) case v := <- strChan: fmt.Printf("select:读取strChan数据:%v\n",v) case <- floChan: fmt.Printf("floChan管道") //执行 // default: // fmt.Println("防止select阻塞\n") } }
6.9 defer + recover机制处理错误
问题原因:多个协程工作,其中一个协程出现panic,导致程序奔溃
解决方法:利用defer + recover捕获panic进行处理,即使协程出现问题,主线程仍然不受影响可以继续执行package main import ( "fmt" "time" ) //输出数字 func printNum(){ for i := 1; i <= 10; i++{ fmt.Println(i) //time.Sleep(time.Second * 1) } } //除法 func devide(){ defer func(){ err := recover() if err != nil{ fmt.Println("devide()发生错误:",err) } }() num1 := 100000 num2 := 0 result := num1/num2 fmt.Println(result) } func main(){ go printNum() go devide() time.Sleep(time.Second * 5) }
原文地址:https://blog.csdn.net/cookies_s_/article/details/143595543
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!