自学内容网 自学内容网

【gRPC】clientPool 客户端连接池简单实现与go案例

什么是 gRPC 客户端连接池?

  • 在 gRPC 中,创建和维护一个到服务器的连接是非常消耗资源的(比如 TCP 连接建立和 TLS 握手)。

  • 而在高并发场景下,如果每次请求都创建新的连接,不仅会导致性能下降,还可能耗尽系统资源。

  • 因此,客户端连接池的作用是复用一定数量的连接,提高资源利用率和性能。


gRPC 客户端连接池的原理

  1. 连接复用,池子里的连接使用时取出,用完放回
  2. 控制连接数,可以固定数量或动态调整,防止建太多连接
  3. 并发安全

先展示一个基于sync.pool创建的clientPool

  • 实际上,企业不推荐使用sync包里的无锁机制,
  • 因为sync包里的无锁设计适用于高并发,短暂资源的情况,
  • 而gRPC本身设计初衷是客户端连接是长生命周期,需要稳定管理的资源,与sync.pool的特性不完全匹配
因此为了更好实现,可以自己加锁设计,或者使用第三方库这里举例github:go-grpc-pool

type ClientPool interface {
Get() *grpc.ClientConn
Put(conn *grpc.ClientConn)
}

type clientPool struct {
pool sync.Pool
}

func GetPool(target string, opts ...grpc.DialOption) (ClientPool, error) {
return &clientPool{
pool: sync.Pool{
New: func() any {
conn, err := grpc.Dial(target, opts...)
if err != nil {
log.Println(err)
return nil
}
return conn
},
},
}, nil
}

func (c *clientPool) Get() *grpc.ClientConn {
conn := c.pool.Get().(*grpc.ClientConn)
if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {
conn.Close()
conn = c.pool.New().(*grpc.ClientConn)
}
return conn
}

func (c *clientPool) Put(conn *grpc.ClientConn) {
if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {
conn.Close()
return
}
c.pool.Put(conn)
}

自己加锁设计

package main

import (
"log"
"sync"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)

// ClientPool 定义接口
type ClientPool interface {
Get() (*grpc.ClientConn, error)
Put(conn *grpc.ClientConn)
Close()
}

// clientPool 是 ClientPool 的实现
type clientPool struct {
mu          sync.Mutex
connections chan *grpc.ClientConn
maxSize     int
idleTimeout time.Duration
target      string
opts        []grpc.DialOption
closed      bool
}

// NewClientPool 创建一个新的客户端连接池
func NewClientPool(target string, maxSize int, idleTimeout time.Duration, opts ...grpc.DialOption) (ClientPool, error) {
if maxSize <= 0 {
return nil, ErrInvalidMaxSize
}

pool := &clientPool{
connections: make(chan *grpc.ClientConn, maxSize),
maxSize:     maxSize,
idleTimeout: idleTimeout,
target:      target,
opts:        opts,
}

// 预填充池
for i := 0; i < maxSize; i++ {
conn, err := pool.createConnection()
if err != nil {
return nil, err
}
pool.connections <- conn
}

return pool, nil
}

// createConnection 创建新连接
func (p *clientPool) createConnection() (*grpc.ClientConn, error) {
conn, err := grpc.Dial(p.target, p.opts...)
if err != nil {
return nil, err
}
return conn, nil
}

// Get 从连接池获取一个连接
func (p *clientPool) Get() (*grpc.ClientConn, error) {
p.mu.Lock()
defer p.mu.Unlock()

if p.closed {
return nil, ErrPoolClosed
}

select {
case conn := <-p.connections:
// 检查连接状态
if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {
conn.Close()
return p.createConnection()
}
return conn, nil
default:
// 如果没有空闲连接,尝试创建新的连接
return p.createConnection()
}
}

// Put 将连接放回池中
func (p *clientPool) Put(conn *grpc.ClientConn) {
if conn == nil {
return
}

// 检查连接状态
if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {
conn.Close()
return
}

select {
case p.connections <- conn:
// 放回池中
default:
// 如果池已满,直接关闭连接
conn.Close()
}
}

// Close 关闭连接池
func (p *clientPool) Close() {
p.mu.Lock()
defer p.mu.Unlock()

if p.closed {
return
}

p.closed = true
close(p.connections)

for conn := range p.connections {
conn.Close()
}
}

// 错误定义
var (
ErrInvalidMaxSize = log.New("invalid max size")
ErrPoolClosed     = log.New("connection pool is closed")
)

// 示例使用
func main() {
pool, err := NewClientPool("localhost:50051", 10, time.Minute, grpc.WithInsecure())
if err != nil {
log.Fatalf("Failed to create pool: %v", err)
}

conn, err := pool.Get()
if err != nil {
log.Fatalf("Failed to get connection: %v", err)
}

// 使用连接
// client := pb.NewYourServiceClient(conn)

// 放回连接
pool.Put(conn)

// 程序退出时关闭连接池
pool.Close()
}

原文地址:https://blog.csdn.net/m0_74282926/article/details/145130944

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!