自学内容网 自学内容网

Polaris系列-07.启动分析六

本篇分析 配置中心模块 启动流程:

先看启动配置参数:
在这里插入图片描述
进入方法:
在这里插入图片描述
先看看配置中心服务数据模型:初始化也是围绕着下面各个属性赋值...

// Server 配置中心核心服务
type Server struct {
cfg *Config

storage           store.Store
fileCache         cachetypes.ConfigFileCache
groupCache        cachetypes.ConfigGroupCache
grayCache         cachetypes.GrayCache
caches            cachetypes.CacheManager
watchCenter       *watchCenter
namespaceOperator namespace.NamespaceOperateServer
initialized       bool

history       plugin.History
cryptoManager plugin.CryptoManager
hooks         []ResourceHook

// chains
chains *ConfigChains

sequence int64
}

初始化有幂等性,保证初始化一次;
和服务发现模块一样,也有服务代理proxySvr,也保存了原始的服务originServer实例对象:

// Initialize 初始化配置中心模块
func Initialize(ctx context.Context, config Config, s store.Store, cacheMgr cachetypes.CacheManager,
namespaceOperator namespace.NamespaceOperateServer) error {
if originServer.initialized {
return nil
}
proxySvr, originSvr, err := doInitialize(ctx, config, s, cacheMgr, namespaceOperator)
if err != nil {
return err
}
originServer = originSvr
server = proxySvr
return nil
}

所以我们去看看是如何doInitialize(...)的:
在这里插入图片描述

再看上图中的第118行:originSvr.initialize(...): 点进去:
在这里插入图片描述
在这里插入图片描述

1.看看watchCenter结构体:

在这里插入图片描述

type SubscribtionContext struct {
subID  string
cancel context.CancelFunc
}

WatchContext interface {
// ClientID .
ClientID() string
// ClientLabels .
ClientLabels() map[string]string
// AppendInterest .
AppendInterest(item *apiconfig.ClientConfigFileInfo)
// RemoveInterest .
RemoveInterest(item *apiconfig.ClientConfigFileInfo)
// ShouldNotify .
ShouldNotify(event *model.SimpleConfigFileRelease) bool
// Reply .
Reply(rsp *apiconfig.ConfigClientResponse)
// Close .
Close() error
// ShouldExpire .
ShouldExpire(now time.Time) bool
// ListWatchFiles
ListWatchFiles() []*apiconfig.ClientConfigFileInfo
// IsOnce
IsOnce() bool
}

WatchContext接口有以下3个具体实现:long poll 它,出现了……
在这里插入图片描述
我们先只关注第2项:polaris下的watch.go(不考虑nacos)

type LongPollWatchContext struct {
clientId         string
labels           map[string]string
once             sync.Once
finishTime       time.Time
finishChan       chan *apiconfig.ConfigClientResponse
watchConfigFiles map[string]*apiconfig.ClientConfigFileInfo
betaMatcher      BetaReleaseMatcher
}

2.再看 eventhub.Subscribe(eventhub.ConfigFilePublishTopic, wc, eventhub.WithQueueSize(QueueSize))
方法本身在上篇已经介绍:发布订阅模式,再贴出流程图:
在这里插入图片描述
这里我们先看看入参的值:见名知意:
在这里插入图片描述
第3个参数:队列长度:10240,还有那30秒,正是我们在Polaris系列-00.前言 篇中谈到“入坑”的缘由之一……
在这里插入图片描述
最后看第2个参数:wc,它实现了Handler接口:
在这里插入图片描述
在这里插入图片描述
所以依据上述流程,我们先看watchcenter监听到文件内容改变后是如何处理的:
先看消息数据模型:event.Message
在这里插入图片描述

type ConfigFileReleaseKey struct {
Id          uint64
Name        string
Namespace   string
Group       string
FileName    string
ReleaseType ReleaseType
}

type ReleaseType string

const (
// ReleaseTypeFull 全量类型
ReleaseTypeFull = ""
// ReleaseTypeGray 灰度类型
ReleaseTypeGray = "gray"
)

清楚了数据结构之后,我们再看实现:wc.notifyToWatchers(event.Message)
FYI: 省略了例外判断逻辑:

func (wc *watchCenter) notifyToWatchers(publishConfigFile *model.SimpleConfigFileRelease) {
// namespace + group + filename 拼接起来
watchFileId := utils.GenFileId(publishConfigFile.Namespace, publishConfigFile.Group, publishConfigFile.FileName)
// watchers是一个map: fileId -> []clientId
clientIds, _ := wc.watchers.Load(watchFileId)

// 包装通知客户端入参模型:见下面解释说明1
changeNotifyRequest := publishConfigFile.ToSpecNotifyClientRequest()
// 配置变更客户端响应对象
response := api.NewConfigClientResponse(apimodel.Code_ExecuteSuccess, changeNotifyRequest)

notifyCnt := 0
// 获取clientIds此刻的一个数据快照,再遍历之:
clientIds.Range(func(clientId string) {
// clientId -> watchContext
watchCtx, _ := wc.clients.Load(clientId)
// 是否要通知:见下面解释2
if watchCtx.ShouldNotify(publishConfigFile) {
// 解释3
watchCtx.Reply(response)
notifyCnt++
// 只能用一次,通知完就要立马清理掉这个 WatchContext
// 只有StreamWatchContext的IsOnce()实现才是false
if watchCtx.IsOnce() {
// 疑问4
wc.RemoveAllWatcher(watchCtx.ClientID())
}
}
})
// 日志输出
}

解释说明1:
在这里插入图片描述
解释说明2: 后面新版本console中配置文件变更时会弹出框,让我们填写版本,防止我们无脑地随便更新……
在这里插入图片描述

func (c ConfigFileReleaseKey) FileKey() string {
return fmt.Sprintf("%v@%v@%v", c.Namespace, c.Group, c.FileName)
}

解释3: 往finishChan chan中投递数据,看来早有人在守着此chan,等待读取里面数据:

func (c *LongPollWatchContext) Reply(rsp *apiconfig.ConfigClientResponse) {
c.once.Do(func() {
c.finishChan <- rsp
close(c.finishChan)
})
}

我们看看谁调用下面这2个方法(之一):
在这里插入图片描述
果然有客户端(而且只有一处调用)在监听等待结果。我们先不在这里去深入研究,只是确定一件事情:有人在监听此chan,获取配置变更最新结果。而上图另一个GetNotifieResultWithTime(...)只出现在client_test.go文件中,长舒一口气…
在这里插入图片描述
疑问, 从上述代码中:wc.RemoveAllWatcher(watchCtx.ClientID())
用了一次watchCtx后就删除了,这……那后续文件再变更怎么办?我们接着看…

3.go wc.startHandleTimeoutRequestWorker(ctx):每秒检测,移除过期的订阅者:
在这里插入图片描述

func (c *LongPollWatchContext) ShouldExpire(now time.Time) bool {
return now.After(c.finishTime)
}

type LongPollWatchContext struct {
clientId         string
labels           map[string]string
once             sync.Once
finishTime       time.Time
finishChan       chan *apiconfig.ConfigClientResponse
watchConfigFiles map[string]*apiconfig.ClientConfigFileInfo
betaMatcher      BetaReleaseMatcher
}

看来我们丢失了一些环节,比如s.clients 是什么时候/如何 关联上值的?
还有上面提出的那个疑问:后续再有文件变更又该如何,毕竟你都移除了watchCtx

我们先把整体梳理完,再尝试去找答案…

初始化配置中心核心服务 Server数据模型后,往下:
在这里插入图片描述
在这里插入图片描述
进入第一个代理:上上图的第132行:
在这里插入图片描述
在这里插入图片描述
第二层代理:
在这里插入图片描述
在这里插入图片描述
至此,配置中心初始化大概流程介绍完毕。下一篇从下面蓝行开始:
在这里插入图片描述


我们回头看看上面留的两个问题:

  1. s.clients 是什么时候/如何 关联上值的?
  2. 后续再有文件变更又该如何,毕竟你都移除了watchCtx

先看2: 既然有删除,那肯定有添加:
在这里插入图片描述
查找调用点:nacos我们不看,只看第2个红框:
在这里插入图片描述
在这里插入图片描述
此方法实现的接口:
在这里插入图片描述
我们看看上上图的第113行:
1.先看第3个参数:默认30秒超时,正是对应hold住请求30秒:
在这里插入图片描述
进入AddWatcher(...):
在这里插入图片描述
上面的超时30秒 结合上方中的 3.go wc.startHandleTimeoutRequestWorker(ctx):每秒检测,移除过期的订阅者方法 正是 官方wiki文档 Polaris系列00篇说到的 默认hold住请求30秒…

我们本着尝试解答疑问2,没想到把疑问1也解答了。。。

最后我们再看看谁调用了LongPullWatchFile(...): 两种实现: http + grpc:
在这里插入图片描述
在这里插入图片描述
最后,我还是有些说服不了自己,文件变更后,移除了watchCtx, 那是不是客户端就主动再发起监听呢,这样咱们Polaris- server服务器才能在配置文件变更时再次推送给客户端?只能理解成这就是一个http/grpc请求,既然在这一次请求中收到配置文件内容变更,那就算是完成了本次监听的使命,后续要监听的话,客户端只能主动再发起请求监听。如果知情者看到此疑问,请不吝解惑,谢谢。

今天周六,加班的感觉。。。有点顶… 状态不佳,最后祝大家周末愉快…


原文地址:https://blog.csdn.net/csdnfanguyinheng/article/details/140568146

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!