自学内容网 自学内容网

【go每日一题】结合channel 实现工作池

题目描述

使用go语言实现的一个工作池,工作池中的工作带有超时控制,

工作池是一种常用的并发设计模式,固定数量的 goroutine 来处理一组任务任务可以被异步地添加到工作池中,等待可用的 worker goroutine 来处理。当没有更多的任务需要处理时,worker goroutine 将会保持空闲状态,等待新的任务到来。

核心思路

  1. 要满足:任务可以被异步地添加到工作池中,等待可用的 worker goroutine 来处理,那么工作池中的woker就需要监听一个队列,只要这个队列里有新的任务,就被空闲的worker拿去执行。这部分的需求,最好的解决方案就是使用一个channel队列来实现。
  2. worker只需要range channel,拿到一个任务,那就去执行(耗时操作),执行完毕后,又回到range channel的下一个阻塞中,一旦channel有新的任务,那就去执行。

代码思路

  1. task是具体的工作,可以是一个函数类型

  2. workPool是一个池子,这个池子肯定有:

    • 池子的大小(能执行的工作数量)
    • 阻塞队列(实现工作池的最核心部分,chan的类型就是task)
  3. workPool这个结构体,肯定需要有很多成员方法:

    • 对外提供创建接口
    • 开启这个工作池(开worker数量个协程,每个协程监听阻塞队列,监听到了就执行)
    • 应该有waitGroup,用于关闭时等待其中的任务执行完毕
    • 提供向其阻塞队列添加任务的对外暴露方法
  4. 注意资源释放:提供关闭工作池的方法,关键在于对阻塞队列的关闭,并等待woker处理完毕当前的任务(waitGroup)

代码

package test

import (
"context"
"fmt"
"sync"
"testing"
"time"
)

type Task func(ctx context.Context) error

type WorkPool struct {
//如果这些变量只应该在WorkerPool结构体所属的包内部使用,就应该将它们定义为小写
workerNum int
taskQueue chan Task // 有缓冲的chan
wg        sync.WaitGroup
}

func NewWorkPool(workerNum int) *WorkPool {
return &WorkPool{
workerNum: workerNum,
taskQueue: make(chan Task, workerNum), // 个数如何确定比较好?
// wg:        sync.WaitGroup{}, 不需要初始化,零值具备它的初始状态特征
}
}

// exeTime 每个worker的执行耗时
func (wp *WorkPool) Start(exeTime time.Duration) error {
// 开启workerNum个协程数 去监控队列
for i := 0; i < wp.workerNum; i++ {
wp.wg.Add(1)
go func(workerId int) {
defer wp.wg.Done()
for task := range wp.taskQueue { // 阻塞,如果channel被关闭,就会退出
ctx, _ := context.WithTimeout(context.Background(), exeTime)
fmt.Printf("协程workerId=(%d)正在执行任务\n", workerId)
err := task(ctx)
if err != nil {
fmt.Errorf("err : (%w)", err)
}
// 处理结果,如果处理可以提供管道对外暴露
}
}(i)
}
return nil
}

func (wp *WorkPool) AddTask(task Task) error {
wp.taskQueue <- task // 阻塞
return nil
}

func (wp *WorkPool) StopWorkerPool() error {
close(wp.taskQueue)
wp.wg.Wait()
fmt.Println("工作池关闭成功")
return nil
}

func TestWorkPool(t *testing.T) {

tasks := []Task{
func(ctx context.Context) error {
select {
case <-ctx.Done():
fmt.Println("func 1 开始执行")
fmt.Println("ctx 已经终止")
return ctx.Err()
default:
time.Sleep(time.Millisecond * 5)
for i := 0; i < 5; i++ {
fmt.Println(i)
}
return nil
}
},
func(ctx context.Context) error {
select {
case <-ctx.Done():
fmt.Println("ctx 已经终止")
return ctx.Err()
default:
fmt.Println("func 2 开始执行")
time.Sleep(time.Millisecond * 5)
for i := 5; i < 10; i++ {
fmt.Println(i)
}
return nil
}
},
func(ctx context.Context) error {
select {
case <-ctx.Done():
fmt.Println("ctx 已经终止")
return ctx.Err()
default:
fmt.Println("func 3 开始执行")
time.Sleep(time.Millisecond * 5)
for i := 10; i < 15; i++ {
fmt.Println(i)
}
return nil
}
},
}
wp := NewWorkPool(2)
wp.Start(time.Second * 30)

for _, task := range tasks {
wp.AddTask(task)
}

wp.StopWorkerPool()
}


原文地址:https://blog.csdn.net/YiGeiGiaoGiao/article/details/144259973

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!