Envoy 源码解析(一):Envoy 整体架构、Envoy 的初始化
本文基于 Envoy 1.31.0 版本进行源码学习
1、Envoy 整体架构
1)、核心组件
Envoy 包含以下四个核心组件:
- Listener(监听器):定义了 Envoy 如何处理入站请求。一旦连接建立,请求会被传递给一组过滤器进行处理
- Filter(过滤器):作为处理入站和出站流量的链式结构的一部分。可以通过集成特定功能的过滤器来实现额外的功能,例如,使用 Gzip 过滤器来压缩发送到客户端的数据
- Route(路由):用于将流量转发到具体的目标实例,这些目标实例在 Envoy 中被定义为集群
- Cluster(集群):定义了流量的目标端点,同时还可以包含其他的可选配置,比如负载均衡策略等
当 Envoy 收到来自下游 (Downstream) 的请求时,首先会经过 FilterChain,利用各种 L3/L4/L7 Filter 处理请求,然后将请求路由到指定的集群,并通过负载均衡选择一个目标地址,最终将请求转发至上游 (Upstream)
2)、Envoy 过滤器架构
Envoy 的过滤器分为三种:
- 监听过滤器(Listener Filters):当接收到新连接时,会根据监听过滤器链的顺序依次调用监听过滤器的回调方法。此阶段的监听过滤器可判断传输类型、处理 SSL 连接验证、恢复连接上的原始目标地址等,用于连接创建前的准备工作
- L4 网络过滤器(Network Filters):当从网络接收到原始数据报文时,会根据 L4 网络过滤器链中的 L4 网络过滤器顺序一次性调用 L4 网络过滤器的回调方法。此阶段的 L4 网络过滤器可以根据目标地址等信息进行 TCP 转发、限流、鉴权、快速响应测试等,还可以作为 L7 协议过滤器解析的入口,将数据报文转成请求对象并继续处理
- L7 协议过滤器(HTTP Filters):当由 L4 网络过滤器 HttpConnectionManager 作为入口传入应用请求时,经过 HTTP 编解码器解码后,此阶段的 L7 协议过滤器对每个 HTTP 请求都进行过滤,比如七层限流、七层鉴权、修改 HTTP 数据包、执行 Lua 及 Wasm 扩展、失败注入、路由等。其中,路由处理比较复杂且必须作为最后一个 L7 协议过滤器,它在处理完成后将执行 HTTP 负载均衡、上游连接池创建等操作
过滤器的回调方法:
-
监听过滤器:在监听过滤器创建的回调方法对象中通过 addAcceptFilter 插入用于处理新连接的监听过滤器链。在接收到新的网络连接时,Envoy 将依次调用该过滤器链中每个监听过滤器的 onAccept 方法,对传入的网络连接描述符 fd 进行处理。并且在过滤器处理方法中,可以将网络连接相关的处理结果保存到连接的 Socket 信息中
-
L4 网络过滤器分为读过滤器、写过滤器、读写过滤器
- 读过滤器通过 addReadFilter 方法插入过滤器链中,用于处理在 L4 网络连接上接收到的网络数据;当新连接建立时,将触发 onNewConnection 回调方法初始化网络过滤器,而且在有网络数据到达时,触发 onData 回调方法解析网络数据
- 写过滤器通过 addWriteFilter 方法插入过滤器链中,用于在向网络连接发送数据前处理待发送数据,当有数据要发送到网络连接时,将触发 onWrite 回调方法
- 读写过滤器通过 addFilter 方法插入过滤器链中,同时包含读过滤器和写过滤器,因此可同时处理 onNewConnection、onData、onWrite 回调方法
-
L7 协议过滤器分为解码过滤器、编码过滤器、编解码过滤器。解码过滤器用于处理 L4 网络过滤器解码后的 HTTP 对象。编码过滤器用于处理 L4 网络过滤器编码前的 HTTP 对象。编解码过滤器同时包含解码过滤器和编码过滤器
工作线程在应用请求处理开始前,通过 addStreamDecoderFilter、addStreamEncoderFilter、addStreamFilter 方法将 L7 协议过滤器添加到 L7 协议过滤器链中,并按照 L7 协议过滤器的添加顺序分别执行每个 L7 协议过滤器的回调方法来处理 HTTP 请求的不同数据部分
在解码过滤器中,decodeHeaders 处理 HTTP 头部,decodeData 处理 HTTP 数据体,decodeTrailers 处理 HTTP 消息解码结束位置等。在编码过滤器中,encodeHeaders 处理 HTTP 头部,encodeData 处理 HTTP 数据体,encodeTrailers 处理 HTTP 消息编码结束位置等
3)、Envoy 对请求处理过程
Envoy 对请求处理过程分为三个阶段:
-
Downstream 请求处理阶段:应用流量首先依次经过监听过滤器、L4 网络过滤器、解码器得到原始的应用请求对象,然后经过 L7 协议过滤器进行限流、故障注入、原始请求内容修改等操作
-
路由及负载均衡阶段:完成 Downstream 处理后的应用请求对象需要根据配置的路由规则决定如何寻找到 Cluster,以及选择此 Cluster 实例时使用的负载均衡策略。当找到最合适的目标地址时,将请求对象交给上游处理
-
Upstream 处理阶段:此时已经确定上游目标地址,这里将请求通过新的 TCP 连接发送给目标。在将请求交给上游处理的过程中,还需要关注上游连接容量的问题,比如:
- HTTP/1 协议的每个上游连接同时只能处理一个请求及其响应,因此需要等待连接内当前应用请求响应处理完成后,才能处理下一个请求
- HTTP/2 协议基于自身 Frame 帧结构的特点,可以同时使用一个上游连接处理多个不同用户的请求
- 当所有上游连接都被占用时,新的请求需要在上游连接池关联的请求队列中等待空闲的上游连接
2、Envoy 的初始化
1)、创建 MainCommon
Envoy 的初始化入口位于 source/exec/main.cc
的 main 方法中。该方法是对 source/exec/main_common.cc
中静态方法 Envoy::MainCommon::main
的封装,并且可以传入 PostServerHook 函数回调,在创建完 Server 后会调用。在 main 方法中创建了 main_common 对象,代码如下:
// source/exe/main.cc
int main(int argc, char** argv) {
...
return Envoy::MainCommon::main(argc, argv);
}
在 MainCommon 的构造方法中,通过初始化列表执行了成员 options_
和 base_
:
// source/exe/main_common.cc
MainCommon::MainCommon(int argc, const char* const* argv)
: options_(argc, argv, &MainCommon::hotRestartVersion, spdlog::level::info),
base_(options_, real_time_system_, default_listener_hooks_, prod_component_factory_,
std::make_unique<PlatformImpl>(), std::make_unique<Random::RandomGeneratorImpl>(),
nullptr) {}
options_
的构造方法在 source/server/options_impl.cc
中,使用开源的 tclap 解析库。OptionsImpl 支持许多参数配置,具体的参数配置参考 operation/cli。其中 concurrency 参数代表 Envoy 运行的工作线程数,如果不指定,则默认为系统当前的 CPU 核数。Envoy 在启动后不支持动态调整工作线程数
base_
的构造方法 MainCommonBase(const Server::Options& options,...)
代码如下:
StrippedMainBase::CreateInstanceFunction createFunction() {
return
[](Init::Manager& init_manager, const Server::Options& options,
Event::TimeSystem& time_system, ListenerHooks& hooks, Server::HotRestart& restarter,
Stats::StoreRoot& store, Thread::BasicLockable& access_log_lock,
Server::ComponentFactory& component_factory, Random::RandomGeneratorPtr&& random_generator,
ThreadLocal::Instance& tls, Thread::ThreadFactory& thread_factory,
Filesystem::Instance& file_system, std::unique_ptr<ProcessContext> process_context,
Buffer::WatermarkFactorySharedPtr watermark_factory) {
auto local_address = Network::Utility::getLocalAddress(options.localAddressIpVersion());
// 创建 Server::InstanceImpl 并调用 initialize 方法进行初始化
auto server = std::make_unique<Server::InstanceImpl>(
init_manager, options, time_system, hooks, restarter, store, access_log_lock,
std::move(random_generator), tls, thread_factory, file_system,
std::move(process_context), watermark_factory);
server->initialize(local_address, component_factory);
return server;
};
}
MainCommonBase::MainCommonBase(const Server::Options& options, Event::TimeSystem& time_system,
ListenerHooks& listener_hooks,
Server::ComponentFactory& component_factory,
std::unique_ptr<Server::Platform> platform_impl,
std::unique_ptr<Random::RandomGenerator>&& random_generator,
std::unique_ptr<ProcessContext> process_context)
: StrippedMainBase(options, time_system, listener_hooks, component_factory,
std::move(platform_impl), std::move(random_generator),
// 这里传入 createFunction() 函数,在 StrippedMainBase 构造函数中会调用
std::move(process_context), createFunction())
这里会调用 StrippedMainBase 的构造函数,这里最重要的是传入 createFunction()
函数,在 StrippedMainBase 构造函数中会调用,该函数创建 Server::InstanceImpl
并调用 initialize 方法进行初始化
StrippedMainBase 构造函数逻辑如下:
- 执行 configureHotRestarter 方法处理重启后新老 Envoy 进程间的热替换问题
- 创建
ThreadLocal::InstanceImpl
对象,作为线程局部存储空间,并创建stats_store_
用来为 stats 分配存储空间 - 调用传入的 createFunction() 函数,创建
Server::InstanceImpl
并调用 initialize 方法进行初始化
2)、服务 InstanceImpl 初始化
InstanceImpl 类继承自 InstanceBase,而 InstanceBase 的具体实现位于 source/server/server.cc
中。在 InstanceBase::initialize()
方法中,它会调用 InstanceBase::initializeOrThrow
方法来完成初始化过程。InstanceBase::initializeOrThrow
方法是初始化流程的核心入口。该方法的具体实现如下:
// source/server/server.cc
absl::Status InstanceBase::initializeOrThrow(Network::Address::InstanceConstSharedPtr local_address,
ComponentFactory& component_factory) {
...
// Handle configuration that needs to take place prior to the main configuration load.
// 调用 InstanceUtil::loadBootstrapConfig 初始化启动参数 bootstrap
RETURN_IF_NOT_OK(InstanceUtil::loadBootstrapConfig(
bootstrap_, options_, messageValidationContext().staticValidationVisitor(), *api_));
...
// 遍历 FactoryCategoryRegistry::registeredFactories 中的所有过滤器,并将相关信息添加到 bootstrap_ 对象的 node 中的 extensions 列表
for (const auto& ext : Envoy::Registry::FactoryCategoryRegistry::registeredFactories()) {
auto registered_types = ext.second->registeredTypes();
for (const auto& name : ext.second->allRegisteredNames()) {
auto* extension = bootstrap_.mutable_node()->add_extensions();
extension->set_name(std::string(name));
extension->set_category(ext.first);
auto const version = ext.second->getFactoryVersion(name);
if (version) {
*extension->mutable_version() = version.value();
}
extension->set_disabled(ext.second->isFactoryDisabled(name));
auto it = registered_types.find(name);
if (it != registered_types.end()) {
std::sort(it->second.begin(), it->second.end());
for (const auto& type_url : it->second) {
extension->add_type_urls(type_url);
}
}
}
}
...
ListenerManagerFactory* listener_manager_factory = nullptr;
if (bootstrap_.has_listener_manager()) {
listener_manager_factory = Config::Utility::getAndCheckFactory<ListenerManagerFactory>(
bootstrap_.listener_manager(), false);
} else {
listener_manager_factory = &Config::Utility::getAndCheckFactoryByName<ListenerManagerFactory>(
Config::ServerExtensionValues::get().DEFAULT_LISTENER);
}
// Workers get created first so they register for thread local updates.
// 创建 ListenerManager 并初始化 Worker
listener_manager_ = listener_manager_factory->createListenerManager(
*this, nullptr, worker_factory_, bootstrap_.enable_dispatcher_stats(), quic_stat_names_);
...
cluster_manager_factory_ = std::make_unique<Upstream::ProdClusterManagerFactory>(
serverFactoryContext(), stats_store_, thread_local_, http_context_,
[this]() -> Network::DnsResolverSharedPtr { return this->getOrCreateDnsResolver(); },
*ssl_context_manager_, *secret_manager_, quic_stat_names_, *this);
// Now the configuration gets parsed. The configuration may start setting
// thread local data per above. See MainImpl::initialize() for why ConfigImpl
// is constructed as part of the InstanceBase and then populated once
// cluster_manager_factory_ is available.
// 创建 ClusterManager 并初始化 CDS
RETURN_IF_NOT_OK(config_.initialize(bootstrap_, *this, *cluster_manager_factory_));
// Instruct the listener manager to create the LDS provider if needed. This must be done later
// because various items do not yet exist when the listener manager is created.
if (bootstrap_.dynamic_resources().has_lds_config() ||
!bootstrap_.dynamic_resources().lds_resources_locator().empty()) {
std::unique_ptr<xds::core::v3::ResourceLocator> lds_resources_locator;
if (!bootstrap_.dynamic_resources().lds_resources_locator().empty()) {
lds_resources_locator = std::make_unique<xds::core::v3::ResourceLocator>(
THROW_OR_RETURN_VALUE(Config::XdsResourceIdentifier::decodeUrl(
bootstrap_.dynamic_resources().lds_resources_locator()),
xds::core::v3::ResourceLocator));
}
// 加载启动文件里的 LDS 配置,调用父类 ListenerManagerImpl 创建对 LDS 配置的订阅
listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config(),
lds_resources_locator.get());
}
...
}
InstanceImpl 初始化的核心流程中函数调用顺序如下:
InstanceBase::initialize()
-> InstanceBase::initializeOrThrow()
-> InstanceUtil::loadBootstrapConfig() // 初始化启动参数 bootstrap
-> FactoryCategoryRegistry::registeredFactories() // 过滤器信息补齐
-> config_.initialize(bootstrap_, *this, *cluster_manager_factory_) // 创建 ListenerManager 并初始化 Worker
-> listener_manager_->createLdsApi() // 加载启动文件里的 LDS 配置,调用父类 ListenerManagerImpl 创建对 LDS 配置的订阅
3)、初始化启动参数 bootstrap
启动参数 bootstrap(详细配置参考 bootstrap 配置)的初始化是通过 InstanceUtil::loadBootstrapConfig
方法实现的
Envoy 在启动时使用 --config-path
和 --config-yaml
指定启动配置文件,通过 MessageUtil::loadFromFile
和 MessageUtil::loadFromYaml
分别加载配置文件,并通过 bootstrap.MergeFrom
进行配置合并,最终赋值到 instance 的成员变量 envoy::config::bootstrap::v3::Bootstrap
// source/server/server.cc
absl::Status InstanceUtil::loadBootstrapConfig(
envoy::config::bootstrap::v3::Bootstrap& bootstrap, const Options& options,
ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api) {
const std::string& config_path = options.configPath();
const std::string& config_yaml = options.configYaml();
const envoy::config::bootstrap::v3::Bootstrap& config_proto = options.configProto();
// One of config_path and config_yaml or bootstrap should be specified.
if (config_path.empty() && config_yaml.empty() && config_proto.ByteSizeLong() == 0) {
return absl::InvalidArgumentError(
"At least one of --config-path or --config-yaml or Options::configProto() "
"should be non-empty");
}
if (!config_path.empty()) {
MessageUtil::loadFromFile(config_path, bootstrap, validation_visitor, api);
}
if (!config_yaml.empty()) {
envoy::config::bootstrap::v3::Bootstrap bootstrap_override;
#ifdef ENVOY_ENABLE_YAML
MessageUtil::loadFromYaml(config_yaml, bootstrap_override, validation_visitor);
// TODO(snowp): The fact that we do a merge here doesn't seem to be covered under test.
#else
// Treat the yaml as proto
Protobuf::TextFormat::ParseFromString(config_yaml, &bootstrap_override);
#endif
bootstrap.MergeFrom(bootstrap_override);
}
if (config_proto.ByteSizeLong() != 0) {
bootstrap.MergeFrom(config_proto);
}
MessageUtil::validate(bootstrap, validation_visitor);
return absl::OkStatus();
}
4)、过滤器注册及信息补齐
Envoy 过滤器的注册是在每个过滤器的静态变量初始化阶段完成的,并且在 InstanceBase::initializeOrThrow
方法中会将这些可用的过滤器类型补充到 bootstrap 配置文件中。过滤器的静态注册过程如下:
如上图所示,Envoy 中的过滤器分为两个级别。第一个级别为 category 分类,这里对应 Envoy 中的三种过滤器类型:监听过滤器对应 NamedListenerFilterConfigFactory、L4 过滤器对应 NamedNetworkFilterConfigFactory、L7 过滤器对应 NamedHttpFilterConfigFactory。每个分类内都可以保存若干过滤器,并通过过滤器名称与过滤器工厂实例建立映射关系
以 L7 本地限流过滤器 LocalRateLimitFilterConfig 为例,其代码如下:
// source/extensions/filters/http/local_ratelimit/config.h
class LocalRateLimitFilterConfig
: public Common::FactoryBase<
envoy::extensions::filters::http::local_ratelimit::v3::LocalRateLimit> {
public:
LocalRateLimitFilterConfig() : FactoryBase("envoy.filters.http.local_ratelimit") {}
LocalRateLimitFilterConfig 继承自 Common::FactoryBase
,而 FactoryBase 继承自 Server::Configuration::NamedHttpFilterConfigFactory
。后面代码中的 Base 指的是 NamedHttpFilterConfigFactory,而 T 指的是 LocalRateLimitFilterConfig 类型
LocalRateLimitFilter 的注册代码如下:
// source/extensions/filters/http/local_ratelimit/config.cc
LEGACY_REGISTER_FACTORY(LocalRateLimitFilterConfig,
Server::Configuration::NamedHttpFilterConfigFactory,
"envoy.local_rate_limit");
LEGACY_REGISTER_FACTORY 是一个 C++ 宏定义,它将创建一个静态全局变量 FACTORY##_registered
。在这里,它将 REGISTER_FACTORY 扩展为 LocalRateLimitFilterConfig,并调用 Envoy::Registry::RegisterFactory
构造方法对传入的过滤器进行初始化:
// envoy/registry/registry.h
#define REGISTER_FACTORY(FACTORY, BASE) \
ABSL_ATTRIBUTE_UNUSED void forceRegister##FACTORY() {} \
static Envoy::Registry::RegisterFactory</* NOLINT(fuchsia-statically-constructed-objects) */ \
FACTORY, BASE> \
FACTORY##_registered
在每个过滤器的静态全局变量 RegisterFactory 内,保存了过滤器工厂实例对象 instance_
,用于后面对此过滤器执行实例化操作:
// envoy/registry/registry.h
template <class T, class Base> class RegisterFactory {
public:
RegisterFactory() {
ASSERT(!instance_.name().empty());
FactoryRegistry<Base>::registerFactory(instance_, instance_.name());
if (!FactoryCategoryRegistry::isRegistered(instance_.category())) {
FactoryCategoryRegistry::registerCategory(instance_.category(),
new FactoryRegistryProxyImpl<Base>());
}
}
...
private:
...
// 过滤器工厂实例
T instance_{};
};
如上所示,构造方法中的调用 FactoryRegistry<Base>::registerFactory(instance_, instance_.name())
在每个 Category 中将注册过滤器工厂名与过滤器工厂实例建立映射关系:
// envoy/registry/registry.h
static void registerFactory(Base& factory, absl::string_view name,
const envoy::config::core::v3::BuildVersion& version,
absl::string_view instead_value = "") {
auto result = factories().emplace(std::make_pair(name, &factory));
if (!result.second) {
ExceptionUtil::throwEnvoyException(
fmt::format("Double registration for name: '{}'", factory.name()));
}
versionedFactories().emplace(std::make_pair(name, version));
if (!instead_value.empty()) {
deprecatedFactoryNames().emplace(std::make_pair(name, instead_value));
}
}
如果发现此 Category 尚未注册,则执行 FactoryCategoryRegistry::registerCategory(instance_.category(), new FactoryRegistryProxyImpl<Base>())
方法注册 Category,并建立 Category 与该 FactoryRegistryProxyImpl<Base>
实例的映射关系:
// envoy/registry/registry.h
static void registerCategory(const std::string& category, FactoryRegistryProxy* factory_names) {
auto result = factories().emplace(std::make_pair(category, factory_names));
RELEASE_ASSERT(result.second == true,
fmt::format("Double registration for category: '{}'", category));
}
在创建过滤器时,需要根据 Category、过滤器名称等条件查找过滤器创建工厂实例。例如,创建监听过滤器的方法 ProdListenerComponentFactory::createListenerFilterFactoryListImpl
,该方法根据监听器的配置信息返回监听器工厂实例。代码如下:
// source/common/listener_manager/listener_manager_impl.cc
Filter::ListenerFilterFactoriesList
ProdListenerComponentFactory::createListenerFilterFactoryListImpl(
const Protobuf::RepeatedPtrField<envoy::config::listener::v3::ListenerFilter>& filters,
Configuration::ListenerFactoryContext& context,
Filter::TcpListenerFilterConfigProviderManagerImpl& config_provider_manager) {
...
// 获取过滤器工厂实例
auto& factory =
Config::Utility::getAndCheckFactory<Configuration::NamedListenerFilterConfigFactory>(
proto_config);
...
}
首先根据入参中的 filters 配置信息 Category 调用 getAndCheckFactory 模板方法来获取类型为 NamedListenerFilterConfigFactory 的过滤器工厂实例:
// source/common/config/utility.h
template <class Factory, class ProtoMessage>
static Factory* getAndCheckFactory(const ProtoMessage& message, bool is_optional) {
...
return Utility::getAndCheckFactoryByName<Factory>(message.name(), is_optional);
}
在 getAndCheckFactoryByName 内调用 Registry::FactoryRegistry<Factory>::getFactory(name)
方法,在已注册的指定 Category 中根据过滤器名称查找工厂实例并返回
5)、创建 ListenerManager 并初始化 Worker
创建 ListenerManagerImpl 用于管理监听,因为 Downstream 要访问 Upstream 的时候,Envoy 会进行监听,Downstream 会连接监听的端口
ListenerManager 拥有一个或多个工作线程,每个工作线程会去处理一个给定的 Downstream 的 TCP 连接。ListenerManager 负责创建这些工作线程,工作线程创建好了以后 ListenerManager 就创建完成了
在 InstanceBase::initializeOrThrow
方法中创建 ListenerManager 对象,代码如下:
// source/server/server.cc
absl::Status InstanceBase::initializeOrThrow(Network::Address::InstanceConstSharedPtr local_address,
ComponentFactory& component_factory) {
...
// Workers get created first so they register for thread local updates.
// 创建 ListenerManager 并初始化 Worker
listener_manager_ = listener_manager_factory->createListenerManager(
*this, nullptr, worker_factory_, bootstrap_.enable_dispatcher_stats(), quic_stat_names_);
...
}
创建 ListenerManager 及 Worker 工作线程的初始化流程如下:
在 ListenerManagerImpl 的构造方法中,创建了很多的 worker。Envoy 采用 libevent 监听 socket 的事件,当有一个新的连接来的时候,会将任务分配给某个 worker 进行处理,从而实现异步的处理
根据配置的工作线程数创建工作线程,并通过来创建新的工作线程对象,工作线程名以 worker_
为前缀(此处创建的是 worker 对象,并没有创建启动系统线程,在后续的 Envoy 运行环节才会有)
// source/common/listener_manager/listener_manager_impl.cc
ListenerManagerImpl::ListenerManagerImpl(Instance& server,
std::unique_ptr<ListenerComponentFactory>&& factory,
WorkerFactory& worker_factory,
bool enable_dispatcher_stats,
Quic::QuicStatNames& quic_stat_names)
: server_(server), factory_(std::move(factory)),
scope_(server.stats().createScope("listener_manager.")), stats_(generateStats(*scope_)),
enable_dispatcher_stats_(enable_dispatcher_stats), quic_stat_names_(quic_stat_names) {
...
// 根据配置的工作线程数创建工作线程,并通过 ProdWorkerFactory 来创建新的工作线程对象,工作线程名称以 worker_ 为前缀
for (uint32_t i = 0; i < server.options().concurrency(); i++) {
workers_.emplace_back(worker_factory.createWorker(
i, server.overloadManager(), server.nullOverloadManager(), absl::StrCat("worker_", i)));
}
}
调用 ProdWorkerFactory 的 createWorker 方法创建 WorkerImpl 实例
// source/server/worker_impl.cc
WorkerPtr ProdWorkerFactory::createWorker(uint32_t index, OverloadManager& overload_manager,
OverloadManager& null_overload_manager,
const std::string& worker_name) {
Event::DispatcherPtr dispatcher(
api_.allocateDispatcher(worker_name, overload_manager.scaledTimerFactory()));
auto conn_handler = getHandler(*dispatcher, index, overload_manager, null_overload_manager);
return std::make_unique<WorkerImpl>(tls_, hooks_, std::move(dispatcher), std::move(conn_handler),
overload_manager, api_, stat_names_);
}
createWorker 方法调用 api_.allocateDispatcher
方法,进而创建调度器 DispatcherImpl,用来封装 libevent 的事件分发
// source/common/api/api_impl.cc
Event::DispatcherPtr
Impl::allocateDispatcher(const std::string& name,
const Event::ScaledRangeTimerManagerFactory& scaled_timer_factory) {
return std::make_unique<Event::DispatcherImpl>(name, *this, time_system_, scaled_timer_factory,
watermark_factory_);
}
调度器 DispatcherImpl 的作用:
- DispatcherImpl 作为网络事件及其他内部事件调度器,会创建 WatermarkBufferFactory 工厂实例,用于创建分配网络请求内存的 WatermarkBuffer 实例。WatermarkBuffer 可以监控请求内已分配内存的大小是否超出设置的阈值,如果是,则触发 L4 ConnectionImpl 连接对象上的 onWriteBufferLowWatermark/onWriteBufferHighWatermark 方法,暂停接收新请求来保护 Envoy 进程运行
- DispatcherImpl 对象会负责生命周期较短对象内存的延迟释放,解决这类对象由于被其他生命周期较长对象访问时出现的野指针问题
- 同时,在 DispatcherImpl 执行 event_base_new 方法来创建与底层 libevent 库的通信
在 WorkerImpl 构造方法中,将创建的工作线程注册到 registered_threads_
列表内,主线程可以通过 runOnAllThreads 让每个工作线程执行一个指定的外部回调方法
// source/server/worker_impl.cc
WorkerImpl::WorkerImpl(ThreadLocal::Instance& tls, ListenerHooks& hooks,
Event::DispatcherPtr&& dispatcher, Network::ConnectionHandlerPtr handler,
OverloadManager& overload_manager, Api::Api& api,
WorkerStatNames& stat_names)
: tls_(tls), hooks_(hooks), dispatcher_(std::move(dispatcher)), handler_(std::move(handler)),
api_(api), reset_streams_counter_(
api_.rootScope().counterFromStatName(stat_names.reset_high_memory_stream_)) {
// 在构造方法中注册工作线程对象
tls_.registerThread(*dispatcher_, false);
...
}
// source/common/thread_local/thread_local_impl.cc
void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) {
ASSERT_IS_MAIN_OR_TEST_THREAD();
ASSERT(!shutdown_);
if (main_thread) {
// 主线程单独记录
main_thread_dispatcher_ = &dispatcher;
thread_local_data_.dispatcher_ = &dispatcher;
} else {
ASSERT(!containsReference(registered_threads_, dispatcher));
// 将新工作线程注册到列表中
registered_threads_.push_back(dispatcher);
// 主线程轮询每个工作线程发送 post 任务
dispatcher.post([&dispatcher] { thread_local_data_.dispatcher_ = &dispatcher; });
}
}
6)、Dispatcher 内存延迟析构
Envoy 中的延迟析构功能主要是为了在较长生命周期对象引用较短生命周期对象时,解决有可能出现的较短生命周期对象已经析构所引发的野指针问题。解决这类问题时不能将被引用对象的生命周期保留得太长,否则会导致大量小对象无法被及时释放,从而引发内存快速增长的问题
举例如下:
// source/common/network/tcp_listener_impl.cc
TcpListenerImpl::TcpListenerImpl(Event::Dispatcher& dispatcher, Random::RandomGenerator& random,
Runtime::Loader& runtime, SocketSharedPtr socket,
TcpListenerCallbacks& cb, bool bind_to_port,
bool ignore_global_conn_limit, bool bypass_overload_manager,
uint32_t max_connections_to_accept_per_socket_event,
Server::ThreadLocalOverloadStateOptRef overload_state)
: BaseListenerImpl(dispatcher, std::move(socket)), cb_(cb), random_(random), runtime_(runtime),
bind_to_port_(bind_to_port), reject_fraction_(0.0),
...) {
if (bind_to_port) {
// Use level triggered mode to avoid potential loss of the trigger due to
// transient accept errors or early termination due to accepting
// max_connections_to_accept_per_socket_event connections.
// 创建网络监听,并设置收到新连接的回调方法为 onSocketEvent
socket_->ioHandle().initializeFileEvent(
dispatcher,
[this](uint32_t events) {
onSocketEvent(events);
return absl::OkStatus();
},
Event::FileTriggerType::Level, Event::FileReadyType::Read);
}
}
TcpListenerImpl 在监听器 bind_to_port 标记为 true 时创建网络监听,并设置收到新连接的回调方法为 onSocketEvent。该方法为 TcpListenerImpl 的成员方法,需要通过 this 指针调用。initializeFileEvent 方法通过 Dispatcher 向 libevent 库传递监听 fd_
及网络事件的回调方法 onSocketEvent:
// source/common/network/io_socket_handle_impl.cc
void IoSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb,
Event::FileTriggerType trigger, uint32_t events) {
ASSERT(file_event_ == nullptr, "Attempting to initialize two `file_event_` for the same "
"file descriptor. This is not allowed.");
// 通过 Dispatcher 向 libevent 库传递监听 fd_ 及网络事件的回调方法 onSocketEvent
file_event_ = dispatcher.createFileEvent(fd_, cb, trigger, events);
}
initializeFileEvent 方法进而通过 dispatcher.createFileEvent
方法创建 FileEventImpl 实例,用于保存文件描述符 fd_
及回调方法的映射关系:
// source/common/event/dispatcher_impl.cc
FileEventPtr DispatcherImpl::createFileEvent(os_fd_t fd, FileReadyCb cb, FileTriggerType trigger,
uint32_t events) {
ASSERT(isThreadSafe());
return FileEventPtr{new FileEventImpl(
*this, fd,
[this, cb](uint32_t events) {
touchWatchdog();
return cb(events);
},
trigger, events)};
}
FileEventImpl 构造方法将调用 event_assign 方法,用于将事件注册到 libevent 库中:
// source/common/event/file_event_impl.cc
void FileEventImpl::assignEvents(uint32_t events, event_base* base) {
ASSERT(dispatcher_.isThreadSafe());
ASSERT(base != nullptr);
enabled_events_ = events;
event_assign(
&raw_event_, base, fd_,
EV_PERSIST | (trigger_ == FileTriggerType::Edge ? EV_ET : 0) |
(events & FileReadyType::Read ? EV_READ : 0) |
(events & FileReadyType::Write ? EV_WRITE : 0) |
(events & FileReadyType::Closed ? EV_CLOSED : 0),
[](evutil_socket_t, short what, void* arg) -> void {
auto* event = static_cast<FileEventImpl*>(arg);
uint32_t events = 0;
if (what & EV_READ) {
events |= FileReadyType::Read;
}
if (what & EV_WRITE) {
events |= FileReadyType::Write;
}
if (what & EV_CLOSED) {
events |= FileReadyType::Closed;
}
ASSERT(events != 0);
event->mergeInjectedEventsAndRunCb(events);
},
this);
}
当新连接到达时,将通过 FileEventImpl 回调 TcpListenerImpl 实例方法 onSocketEvent。如果在回调执行中 TcpListenerImpl 对象已经析构,则将出现野指针问题。但这是不可能的,因为 Dispatcher 是顺序执行的,主要保证在回调方法执行完毕前 TcpListenerImpl 对象存在,就不会出现野指针问题。而如果采用 C++ 智能指针 shared_ptr 延长对象生命周期来解决此问题,则需要在回调方法内添加额外的处理逻辑,这样容易出错
Dispatcher 内存延迟析构如下图所示:
可以被延迟析构的对象都继承自 DeferredDeletable 接口,如 Connection 连接对象、ActiveStream 下游请求对象、CodecClient 上游请求对象等
ActiveStream 对象代表下游请求的完整生命周期,在下游请求对应的上游响应完成接收和处理后,ActiveStream 对象所在的工作线程调用 ConnectionManagerImpl::doDeferredStreamDestroy
方法对 ActiveStream 对象进行延迟析构:
// source/common/http/conn_manager_impl.cc
void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) {
...
// 对 ActiveStream 对象进行延迟析构
dispatcher_->deferredDelete(stream.removeFromList(streams_));
...
}
deferredDelete 方法在线程 Dispatcher 延迟清理列表的尾部添加待删除对象,并在第一个对象加入时插入一个清理任务
// source/common/event/dispatcher_impl.cc
void DispatcherImpl::deferredDelete(DeferredDeletablePtr&& to_delete) {
ASSERT(isThreadSafe());
if (to_delete != nullptr) {
to_delete->deleteIsPending();
// Dispatcher 延迟清理列表的尾部添加待删除对象
current_to_delete_->emplace_back(std::move(to_delete));
ENVOY_LOG(trace, "item added to deferred deletion list (size={})", current_to_delete_->size());
if (current_to_delete_->size() == 1) {
// 在第一个对象加入时插入一个清理任务
deferred_delete_cb_->scheduleCallbackCurrentIteration();
}
}
}
这样,在当前工作线程的 Dispatcher 执行完一轮任务后,可能已经由于多次调用 deferredDelete 方法累积了多个待延迟删除对象。新的一轮任务执行时,Dispatcher 将有机会运行 deferred_delete_cb_
回调处理器中的 clearDeferredDeleteList 方法对延迟删除对象进行清理:
// source/common/event/dispatcher_impl.cc
DispatcherImpl::DispatcherImpl(const std::string& name, Thread::ThreadFactory& thread_factory,
TimeSource& time_source, Filesystem::Instance& file_system,
Event::TimeSystem& time_system,
const ScaledRangeTimerManagerFactory& scaled_timer_factory,
const Buffer::WatermarkFactorySharedPtr& watermark_factory)
: ...
deferred_delete_cb_(base_scheduler_.createSchedulableCallback(
[this]() -> void { clearDeferredDeleteList(); })),
...
}
clearDeferredDeleteList 方法代码如下:
// source/common/event/dispatcher_impl.cc
void DispatcherImpl::clearDeferredDeleteList() {
ASSERT(isThreadSafe());
std::vector<DeferredDeletablePtr>* to_delete = current_to_delete_;
size_t num_to_delete = to_delete->size();
if (deferred_deleting_ || !num_to_delete) {
return;
}
ENVOY_LOG(trace, "clearing deferred deletion list (size={})", num_to_delete);
// Swap the current deletion vector so that if we do deferred delete while we are deleting, we
// use the other vector. We will get another callback to delete that vector.
if (current_to_delete_ == &to_delete_1_) {
// 切换为 to_delete_2_
current_to_delete_ = &to_delete_2_;
} else {
current_to_delete_ = &to_delete_1_;
}
touchWatchdog();
// 防止其他 clearDeferredDeleteList 同时执行
deferred_deleting_ = true;
// Calling clear() on the vector does not specify which order destructors run in. We want to
// destroy in FIFO order so just do it manually. This required 2 passes over the vector which is
// not optimal but can be cleaned up later if needed.
for (size_t i = 0; i < num_to_delete; i++) {
// 删除执行各个对象的析构方法
(*to_delete)[i].reset();
}
to_delete->clear();
// 允许新的 clearDeferredDeleteList 执行
deferred_deleting_ = false;
}
clearDeferredDeleteList 方法执行时会首先判断 deferred_deleting_
标记是否已设置。由于每次执行时将在 DispatcherImpl 内的两个延迟删除列表 to_delete_1_
、to_delete_2_
间切换,采用两个待删除列表的目的是使每次清理的对象数量不会太大,因此导致一次 clearDeferredDeleteList 方法执行很久才能结束
举例来说,如果当前要删除的链表为 to_delete_1_
,在删除过程中待删除对象的析构方法中可能又会调用 deferredDelete 方法删除其他对象。此时这个 deferredDelete 方法将会把新的待删除对象放入 to_delete_2_
。这样不会导致删除后一个待删除列表所有对象的总删除时间过长,阻塞其他 Dispatcher 上事件的处理
7)、创建 ClusterManager 并初始化 CDS
InstanceBase::initializeOrThrow
方法中执行了 Cluster 服务发现的初始化,代码如下:
// source/server/server.cc
absl::Status InstanceBase::initializeOrThrow(Network::Address::InstanceConstSharedPtr local_address,
ComponentFactory& component_factory) {
...
RETURN_IF_NOT_OK(config_.initialize(bootstrap_, *this, *cluster_manager_factory_));
...
}
创建 ClusterManager 并初始化 CDS 流程如下:
config_.initialize
方法中创建 Cluster 管理器,并且根据静态配置创建 Listener,并添加到 active_listeners_
或 warming_listeners_
中,代码实现如下:
// source/server/configuration_impl.cc
absl::Status MainImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
Instance& server,
Upstream::ClusterManagerFactory& cluster_manager_factory) {
...
// clusterManagerFromProto() and init() have to be called consecutively.
// 创建并初始化 ClusterManager
cluster_manager_ = cluster_manager_factory.clusterManagerFromProto(bootstrap);
status = cluster_manager_->initialize(bootstrap);
RETURN_IF_NOT_OK(status);
// 根据静态配置创建 Listener
const auto& listeners = bootstrap.static_resources().listeners();
ENVOY_LOG(info, "loading {} listener(s)", listeners.size());
for (ssize_t i = 0; i < listeners.size(); i++) {
ENVOY_LOG(debug, "listener #{}:", i);
absl::StatusOr<bool> update_or_error =
server.listenerManager().addOrUpdateListener(listeners[i], "", false);
RETURN_IF_STATUS_NOT_OK(update_or_error);
}
...
}
addOrUpdateListener 方法中调用 addOrUpdateListenerInternal 方法,代码实现如下:
// source/common/listener_manager/listener_manager_impl.cc
bool ListenerManagerImpl::addOrUpdateListenerInternal(
const envoy::config::listener::v3::Listener& config, const std::string& version_info,
bool added_via_api, const std::string& name) {
...
// 创建新的监听器对象
new_listener = std::make_unique<ListenerImpl>(config, version_info, *this, name, added_via_api,
workers_started_, hash);
...
// 判断工作线程是否已经启动
if (workers_started_) {
new_listener->debugLog("add warming listener");
// 如果工作线程已经启动,则新添加的监听器处于 warming 状态,此时还需要获取路由的配置及 Cluster 的配置后才能为工作线程提供服务
warming_listeners_.emplace_back(std::move(new_listener));
} else {
new_listener->debugLog("add active listener");
// 如果工作线程还未启动,则此时 ClusterManager、ListenerManager 将通过 xDS 获取监听器相关的 RDS 及 CDS 配置,
// 这样在监听器关联的工作线程启动后,这些监听器将被设置为 active 状态,表示可以立即提供服务
active_listeners_.emplace_back(std::move(new_listener));
}
...
}
cluster_manager_factory.clusterManagerFromProto
调用 ProdClusterManagerFactory 的 clusterManagerFromProto 方法创建 ClusterManagerImpl 对象。在 ClusterManagerImpl 的 initialize 方法中,针对每个线程创建 ThreadLocalClusterManagerImpl,解决多个工作线程访问 Cluster 配置的锁问题
// source/common/upstream/cluster_manager_impl.cc
absl::Status
ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
...
// Once the initial set of static bootstrap clusters are created (including the local cluster),
// we can instantiate the thread local cluster manager.
// 针对每个线程创建 ThreadLocalClusterManagerImpl,解决多个工作线程访问 Cluster 配置的锁问题
tls_.set([this, local_cluster_params](Event::Dispatcher& dispatcher) {
return std::make_shared<ThreadLocalClusterManagerImpl>(*this, dispatcher, local_cluster_params);
});
...
}
在主线程创建 ClusterManagerImpl 对象后,会立即调用 initialize 方法进行初始化。如果在启动文件中配置了 CDS,那么这里会通过 create 方法创建 CDS
// source/common/upstream/cluster_manager_impl.cc
absl::Status
ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
...
cds_api_ = factory_.createCds(dyn_resources.cds_config(), cds_resources_locator.get(), *this);
init_helper_.setCds(cds_api_.get());
...
}
createCds 方法的返回值是 CdsApiImpl 类型的指针,用于处理 CDS 配置监听:
// source/common/upstream/cluster_manager_impl.cc
CdsApiPtr
ProdClusterManagerFactory::createCds(const envoy::config::core::v3::ConfigSource& cds_config,
const xds::core::v3::ResourceLocator* cds_resources_locator,
ClusterManager& cm) {
return CdsApiImpl::create(cds_config, cds_resources_locator, cm, *stats_.rootScope(),
context_.messageValidationContext().dynamicValidationVisitor());
}
通过 xDS 获取 CDS 订阅是在 CdsApiImpl 构造函数中实现的,这里会注册对 CDS 资源的订阅 subscription:
// source/common/upstream/cds_api_impl.cc
CdsApiImpl::CdsApiImpl(const envoy::config::core::v3::ConfigSource& cds_config,
const xds::core::v3::ResourceLocator* cds_resources_locator,
ClusterManager& cm, Stats::Scope& scope,
ProtobufMessage::ValidationVisitor& validation_visitor)
: Envoy::Config::SubscriptionBase<envoy::config::cluster::v3::Cluster>(validation_visitor,
"name"),
helper_(cm, "cds"), cm_(cm), scope_(scope.createScope("cluster_manager.cds.")) {
const auto resource_name = getResourceName();
if (cds_resources_locator == nullptr) {
// 注册对 CDS 资源的订阅 subscription
// 每当有 CDS 配置事件发生变化时,都通过 SubscriptionCallbacks 注册的回调方法执行 CdsApiImpl::onConfigUpdate 方法,
// 然后执行 ClusterManager 中的 addOrUpdateCluster 或 removeCluster 方法添加或删除 Cluster
subscription_ = THROW_OR_RETURN_VALUE(cm_.subscriptionFactory().subscriptionFromConfigSource(
cds_config, Grpc::Common::typeUrl(resource_name),
*scope_, *this, resource_decoder_, {}),
Config::SubscriptionPtr);
} else {
subscription_ = THROW_OR_RETURN_VALUE(
cm.subscriptionFactory().collectionSubscriptionFromUrl(
*cds_resources_locator, cds_config, resource_name, *scope_, *this, resource_decoder_),
Config::SubscriptionPtr);
}
}
每当有 CDS 配置事件发生变化时,都会通过 SubscriptionCallbacks 注册的回调方法执行 CdsApiImpl::onConfigUpdate
方法,然后执行 ClusterManager 中的 addOrUpdateCluster 或 removeCluster 方法添加或删除 Cluster
8)、LDS 的初始化
LDS 的初始化发生在 CDS 初始化之后。首先加载启动文件里的 LDS 配置,调用 ProdListenerComponentFactory 的 createLdsApi 方法创建新的 LdsApiImpl 对象
// source/server/server.cc
absl::Status InstanceBase::initializeOrThrow(Network::Address::InstanceConstSharedPtr local_address,
ComponentFactory& component_factory) {
...
// 加载启动文件里的 LDS 配置,调用父类 ListenerManagerImpl 创建对 LDS 配置的订阅
listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config(),
lds_resources_locator.get());
...
}
在 LdsApiImpl 构造方法中注册 LDS 订阅 subscription:
// source/common/listener_manager/lds_api.cc
LdsApiImpl::LdsApiImpl(const envoy::config::core::v3::ConfigSource& lds_config,
const xds::core::v3::ResourceLocator* lds_resources_locator,
Upstream::ClusterManager& cm, Init::Manager& init_manager,
Stats::Scope& scope, ListenerManager& lm,
ProtobufMessage::ValidationVisitor& validation_visitor)
: Envoy::Config::SubscriptionBase<envoy::config::listener::v3::Listener>(validation_visitor,
"name"),
listener_manager_(lm), scope_(scope.createScope("listener_manager.lds.")), cm_(cm),
init_target_("LDS", [this]() { subscription_->start({}); }) {
const auto resource_name = getResourceName();
if (lds_resources_locator == nullptr) {
// 注册 LDS 订阅 subscription
// 当有 LDS 变更事件到来时,通过 SubscriptionCallbacks 回调方法进入 Envoy,然后主线程调用 LdsApiImpl::onConfigUpdate 方法
// 执行 ListenerManager 的 addOrUpdateListener 或 removeListener 方法来添加或删除监听器
subscription_ = THROW_OR_RETURN_VALUE(cm.subscriptionFactory().subscriptionFromConfigSource(
lds_config, Grpc::Common::typeUrl(resource_name),
*scope_, *this, resource_decoder_, {}),
Config::SubscriptionPtr);
}
...
}
当有 LDS 变更事件到来时,通过 SubscriptionCallbacks 回调方法进入 Envoy,然后主线程调用 LdsApiImpl::onConfigUpdate
方法执行 ListenerManager 的 addOrUpdateListener 或 removeListener 方法来添加或删除监听器
9)、小结
Envoy 的初始化流程如下图:
Envoy 监听器的创建流程如下图:
- Envoy 在初始化阶段创建 ListenerManager
- 调用 ListenerManager 的 createLdsApi 方法创建与控制面建立了 LDS 资源订阅关系的 LdsApi
- 在 LdsApiImpl 构造方法中注册 LDS 订阅 subscription
- Envoy 在初始化流程中创建工作线程并启动线程的入口方法
- 当 Envoy 主线程收到 LDS 变更时,LdsApi 调用 ListenerManager 的 addOrUpdateListener 或 removeListener 方法来添加、更新或删除监听器
- ListenerManager 根据 LDS 的配置创建新的监听器配置对象
- 在监听器配置对象的创建流程中,ListenerManager 创建 L4 网络过滤器链管理器 FilterChainManager,用于管理 L4 过滤器链,并将在当前 L4 网络过滤器链中配置的 L4 网络过滤器通过 buildFilterChains 注册到连接创建回调中。当应用连接被接收时,按照顺序创建 L4 网络过滤器,每个网络过滤器在创建的同时都将自己添加到 FilterChainManager 中,比如:作为 L7 协议过滤器链入口的 L4 网络过滤器 ConnectionManager 就是在这里被注册的
- 在每个监听器的配置中都可能包含单独的路由配置信息,因此还需要创建 HttpConnectionManagerConfig 来创建其内部 RDS 的订阅
- 每个不同监听器相关的路由配置都被保存在单独的 RouteConfig 中,用于描述连接中的目标虚拟主机 VirtualHost 及请求的 URL 被匹配后最终访问的 Cluster 的映射关系。此时每个 RdsRouteConfigProvider 都保存从 RDS 中获取的与当前监听器关联的路由映射,在 RdsRouteConfigProvider 内部通过线程局部存储 TLS 的方式来保存路由映射项
- RdsRouteConfigProvider 负责创建 RDS 订阅及监听
- 当 Envoy 主线程收到 RDS 变更时,通过 onConfigUpdate 方法通知关联的 RdsRouteConfigProvider 对象路由配置发生变更。由于不同路由项有不同的 RdsRouteConfigProvider 负责订阅,因此只有与当前 RDS 相关的监听器上的 RdsRouteConfigProvider 才能收到配置变更的内容。此时 RdsRouteConfigProvider 采用异步通知方法通知所有线程都保存新的路由配置。此后一旦与路由配置相关的监听器获取到新连接并进行 L7 请求处理,则将通过 RdsRouteConfigProvider 对象从当前线程局部存储的 TLS 上虎丘关联的路由配置,并用于与请求进行匹配计算来得到 Cluster
参考:
《Istio权威指南(下) 云原生服务网格Istio架构与源码》
原文地址:https://blog.csdn.net/qq_40378034/article/details/144049325
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!