自学内容网 自学内容网

Go语言并发控制

channel

// cancelFn 数据通道关闭通知退出
func cancelFn(dataChan chan int) {
for {
select {
case val, ok := <-dataChan:
// 关闭data通道时,通知退出
// 一个可选是判断data=指定值时退出
if !ok {
fmt.Printf("Channel closed !!!")
return
}
fmt.Printf("Receive data from dataChan %d\n", val)
}
}
}

func main() {
channels := make([]chan int, 10)
for i := 0; i < 10; i++ {
channels[i] = make(chan int)
go cancelFn(channels[i])
channels[i] <- 1 // 向管道写数据
fmt.Println(i, "quit")

}
}

watitGroup

var wg sync.WaitGroup

func main() {
ch := make(chan int)
wg.Add(1) //设置计数器 表示goroutine个数加1
go func() {
v, ok := <-ch
if ok {
fmt.Println("value", v)
}
wg.Done() //执行结束之后 , goroutine个数减1
}()
wg.Add(1)
go func() {
ch <- 4
wg.Done()
}()
wg.Wait() //主goroutine阻塞,等待计数器变为0
}

WaitGroup原理

type WaitGroup struct {
    statel [3]uint32
    /*
        长度为3的数组包含两个计数器和一个信号量
        counter : 当前还未执行的结束的goroutine计数器
        waiter count : 等待goroutine-group结束的goroutine数量
        semaphore: 信号量
    */
}

WaitGroup对外提供了三个接口

  • Add(delta int) : 将delta值加到counter中
  • Wait(): waiter递增加1 , 并阻塞等待信号量semaphore
  • Done(): counter递减1 , 按照waiter数值释放相应次数的信号量

Add(delta int)

Add() 做了两件事 , 一是把delta值累加到counter中,因为delta可以为负值.所以说当counter变为0时,根据waiter数值释放等量的信号量 , 把等待的goroutine全部唤醒,如果couner变为负值,则触发panic.

Wait()

Wait()方法一个是要累加waiter , 二是阻塞等待信号量.

Done()

Done 只做一件事,把counter减少1,其实Done里面调用的就是Add(-1)

context原理

Context实际上只定义了接口,凡是实现该接口的类都能称为Context.

type Context interface {
    Deadline() (deadline time.Time , ok bool)
    Done() <-chan struct{}
    Err() error
    value(key interface{}) interface{}
}

Deadline()

该方法返回一个deadline和标识是否已设置deadline的bool值,如果没有设置deadline , 则ok为false,此时deadline为一个初始值的time.Time值.

Done()

该方法返回一个用于探测context是否取消的channel,当context取消时,会自动将该channel关闭. 对于不支持取消的context(如:context.Backgroud) , 该方法可能会返回nil.

Err()

该方法描述context关闭的原因.关闭原因由context实现控制.

value()

有一种context,它不是用于控制呈树状分布的goroutine , 而是用于在树状分布的goroutine之间传递信息.Value()方法就是此种类型的context,根据key查询map集合中的value.

空context

context包中定义了一个公用的emptyCtx全局变量 , 名为backgroud,可以使用context.Backgroud()获取它.context包中提供了四个方法创建不同类型的context , 使用这四个方法如果没有父context,则都需要传入background , 即将background作为父节点:

  • WithCancel();
  • WithDeadline();
  • WithTimeout();
  • WithValue();

context包中实现Context接口的struct,除了emptyCtx , 还有cancelCtx , timerCtx 和 valueCtx三种.

cancelCtx

type cancelCtx struct {
    Context
    mu sync.Mutex
    done chan struct{}
    children map[canceler]struct{}
    err error
}

children 中记录了由此context 派生的所有child , 此context被"cancel"时,会把其中所有的child都cancel掉.cancelCtx与deadline和value无关 , 所以只需要实现Done() 和 Err() 外露接口即可.

Cancel()接口的实现

cancel()内部方法时理解cancelCtx的关键cancelCtx.children的map中,其中key值即后代对象,value值并没有意义.

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    c.mu.Lock()
    c.err = err                       //设置一个error,说明关闭原因
    close(c.done)                     //将channel关闭,以此通知派生的context

    for child := range c.children {   //遍历所有children,逐个调用cancel方法
        child.cancel(false, err)
    }
    c.children = nil
    c.mu.Unlock()
    if removeFromParent {            //正常情况下,需要将自己从parent删除
        removeChild(c.Context, c)
    }
}

WithCancel()方法的使用案例

func HandelRequest(ctx context.Context) {
go WriteRedis(ctx)
go WriteDatabase(ctx)
for {
select {
case <-ctx.Done():
fmt.Println("HandelRequest Done.")
return
default:
fmt.Println("HandelRequest running")
time.Sleep(2 * time.Second)
}
}
}
func WriteRedis(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("WriteRedis Done.")
return
default:
fmt.Println("WriteRedis running")
time.Sleep(2 * time.Second)
}
}
}
func WriteDatabase(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("WriteDatabase Done.")
return
default:
fmt.Println("WriteDatabase running")
time.Sleep(2 * time.Second)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go HandelRequest(ctx)

time.Sleep(5 * time.Second)
fmt.Println("It's time to stop all sub goroutines!")
cancel()

//Just for test whether sub goroutines exit or not
time.Sleep(5 * time.Second)
}

HandelRequest()用于处理某个请求 , 其又会创建两个协程 , main协程可以在适当时机cancel掉所有自子协程

timeCtx

type timerCtx struct {
    cancelCtx
    timer *time.Timer 
    deadline time.Time
}

timerCtx 在cancelCtx的基础上,增加了deadline用于标示自动cancel的最终时间,而timer就是一个触发自动cancel的定时器.由此衍生出了WithDeadline()和WithTimeout().

  • deadline:指定最后期限.
  • timeout: 指定最长存活时间.
package main

import (
    "fmt"
    "time"
    "context"
)

func HandelRequest(ctx context.Context) {
    go WriteRedis(ctx)
    go WriteDatabase(ctx)
    for {
        select {
        case <-ctx.Done():
            fmt.Println("HandelRequest Done.")
            return
        default:
            fmt.Println("HandelRequest running")
            time.Sleep(2 * time.Second)
        }
    }
}

func WriteRedis(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("WriteRedis Done.")
            return
        default:
            fmt.Println("WriteRedis running")
            time.Sleep(2 * time.Second)
        }
    }
}

func WriteDatabase(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("WriteDatabase Done.")
            return
        default:
            fmt.Println("WriteDatabase running")
            time.Sleep(2 * time.Second)
        }
    }
}

func main() {
    ctx, _ := context.WithTimeout(context.Background(), 5 * time.Second)
    go HandelRequest(ctx)

    time.Sleep(10 * time.Second)
}

valueCtx

type valueCtx struct {
    Context
    key, val interface{}
}

valueCtx 只是在Context基础上增加了一个key-value对,用于在各级协程之间传递数据.因此只需要实现Value()接口.

func HandelRequest(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("HandelRequest Done.")
            return
        default:
            fmt.Println("HandelRequest running, parameter: ", ctx.Value("parameter"))
            time.Sleep(2 * time.Second)
        }
    }
}

func main() {
    ctx := context.WithValue(context.Background(), "parameter", "1")
    go HandelRequest(ctx)

    time.Sleep(10 * time.Second)
}

子协程可以读到context的key-value


原文地址:https://blog.csdn.net/c0210g/article/details/137987454

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!