【gRPC】clientPool 客户端连接池简单实现与go案例
什么是 gRPC 客户端连接池?
-
在 gRPC 中,创建和维护一个到服务器的连接是非常消耗资源的(比如 TCP 连接建立和 TLS 握手)。
-
而在高并发场景下,如果每次请求都创建新的连接,不仅会导致性能下降,还可能耗尽系统资源。
-
因此,客户端连接池的作用是复用一定数量的连接,提高资源利用率和性能。
gRPC 客户端连接池的原理
- 连接复用,池子里的连接使用时取出,用完放回
- 控制连接数,可以固定数量或动态调整,防止建太多连接
- 并发安全
先展示一个基于sync.pool创建的clientPool
- 实际上,企业不推荐使用sync包里的无锁机制,
- 因为sync包里的无锁设计适用于高并发,短暂资源的情况,
- 而gRPC本身设计初衷是客户端连接是长生命周期,需要稳定管理的资源,与sync.pool的特性不完全匹配
因此为了更好实现,可以自己加锁设计,或者使用第三方库这里举例github:go-grpc-pool
type ClientPool interface {
Get() *grpc.ClientConn
Put(conn *grpc.ClientConn)
}
type clientPool struct {
pool sync.Pool
}
func GetPool(target string, opts ...grpc.DialOption) (ClientPool, error) {
return &clientPool{
pool: sync.Pool{
New: func() any {
conn, err := grpc.Dial(target, opts...)
if err != nil {
log.Println(err)
return nil
}
return conn
},
},
}, nil
}
func (c *clientPool) Get() *grpc.ClientConn {
conn := c.pool.Get().(*grpc.ClientConn)
if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {
conn.Close()
conn = c.pool.New().(*grpc.ClientConn)
}
return conn
}
func (c *clientPool) Put(conn *grpc.ClientConn) {
if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {
conn.Close()
return
}
c.pool.Put(conn)
}
自己加锁设计
package main
import (
"log"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)
// ClientPool 定义接口
type ClientPool interface {
Get() (*grpc.ClientConn, error)
Put(conn *grpc.ClientConn)
Close()
}
// clientPool 是 ClientPool 的实现
type clientPool struct {
mu sync.Mutex
connections chan *grpc.ClientConn
maxSize int
idleTimeout time.Duration
target string
opts []grpc.DialOption
closed bool
}
// NewClientPool 创建一个新的客户端连接池
func NewClientPool(target string, maxSize int, idleTimeout time.Duration, opts ...grpc.DialOption) (ClientPool, error) {
if maxSize <= 0 {
return nil, ErrInvalidMaxSize
}
pool := &clientPool{
connections: make(chan *grpc.ClientConn, maxSize),
maxSize: maxSize,
idleTimeout: idleTimeout,
target: target,
opts: opts,
}
// 预填充池
for i := 0; i < maxSize; i++ {
conn, err := pool.createConnection()
if err != nil {
return nil, err
}
pool.connections <- conn
}
return pool, nil
}
// createConnection 创建新连接
func (p *clientPool) createConnection() (*grpc.ClientConn, error) {
conn, err := grpc.Dial(p.target, p.opts...)
if err != nil {
return nil, err
}
return conn, nil
}
// Get 从连接池获取一个连接
func (p *clientPool) Get() (*grpc.ClientConn, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return nil, ErrPoolClosed
}
select {
case conn := <-p.connections:
// 检查连接状态
if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {
conn.Close()
return p.createConnection()
}
return conn, nil
default:
// 如果没有空闲连接,尝试创建新的连接
return p.createConnection()
}
}
// Put 将连接放回池中
func (p *clientPool) Put(conn *grpc.ClientConn) {
if conn == nil {
return
}
// 检查连接状态
if conn.GetState() == connectivity.Shutdown || conn.GetState() == connectivity.TransientFailure {
conn.Close()
return
}
select {
case p.connections <- conn:
// 放回池中
default:
// 如果池已满,直接关闭连接
conn.Close()
}
}
// Close 关闭连接池
func (p *clientPool) Close() {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return
}
p.closed = true
close(p.connections)
for conn := range p.connections {
conn.Close()
}
}
// 错误定义
var (
ErrInvalidMaxSize = log.New("invalid max size")
ErrPoolClosed = log.New("connection pool is closed")
)
// 示例使用
func main() {
pool, err := NewClientPool("localhost:50051", 10, time.Minute, grpc.WithInsecure())
if err != nil {
log.Fatalf("Failed to create pool: %v", err)
}
conn, err := pool.Get()
if err != nil {
log.Fatalf("Failed to get connection: %v", err)
}
// 使用连接
// client := pb.NewYourServiceClient(conn)
// 放回连接
pool.Put(conn)
// 程序退出时关闭连接池
pool.Close()
}
原文地址:https://blog.csdn.net/m0_74282926/article/details/145130944
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!