自学内容网 自学内容网

Dubbo学习笔记

Dubbo

简介

Apache Dubbo是一款高性能的Java RPC框架。其前身是阿里巴巴公司开源的一个高性能、轻量级的开源Java RPC框架,可以和Spring框架无缝集成。 其中文官网:https://dubbo.gitbooks.io/dubbo-user-book/content/

特性和用法

架构

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

节点角色说明

节点角色说明
Provider暴露服务的服务提供方
Consumer调用远程服务的服务消费方
Registry服务注册与发现的注册中心
Monitor统计服务的调用次数和调用时间的监控中心
Container服务运行容器

调用关系说明

  1. 服务容器负责启动,加载,运行服务提供者。
  2. 服务提供者在启动时,向注册中心注册自己提供的服务。
  3. 服务消费者在启动时,向注册中心订阅自己所需的服务。
  4. 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
  5. 服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
  6. 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。

Dubbo 架构具有以下几个特点,分别是连通性、健壮性、伸缩性、以及向未来架构的升级性。

连通性

  • 注册中心负责服务地址的注册与查找,相当于目录服务,服务提供者和消费者只在启动时与注册中心交互,注册中心不转发请求,压力较小
  • 监控中心负责统计各服务调用次数,调用时间等,统计先在内存汇总后每分钟一次发送到监控中心服务器,并以报表展示
  • 服务提供者向注册中心注册其提供的服务,并汇报调用时间到监控中心,此时间不包含网络开销
  • 服务消费者向注册中心获取服务提供者地址列表,并根据负载算法直接调用提供者,同时汇报调用时间到监控中心,此时间包含网络开销
  • 注册中心,服务提供者,服务消费者三者之间均为长连接,监控中心除外
  • 注册中心通过长连接感知服务提供者的存在,服务提供者宕机,注册中心将立即推送事件通知消费者
  • 注册中心和监控中心全部宕机,不影响已运行的提供者和消费者,消费者在本地缓存了提供者列表
  • 注册中心和监控中心都是可选的,服务消费者可以直连服务提供者

健状性

  • 监控中心宕掉不影响使用,只是丢失部分采样数据
  • 数据库宕掉后,注册中心仍能通过缓存提供服务列表查询,但不能注册新服务
  • 注册中心对等集群,任意一台宕掉后,将自动切换到另一台
  • 注册中心全部宕掉后,服务提供者和服务消费者仍能通过本地缓存通讯
  • 服务提供者无状态,任意一台宕掉后,不影响使用
  • 服务提供者全部宕掉后,服务消费者应用将无法使用,并无限次重连等待服务提供者恢复

伸缩性

  • 注册中心为对等集群,可动态增加机器部署实例,所有客户端将自动发现新的注册中心
  • 服务提供者无状态,可动态增加机器部署实例,注册中心将推送新的服务提供者信息给消费者

升级性

当服务集群规模进一步扩大,带动IT治理结构进一步升级,需要实现动态部署,进行流动计算,现有分布式服务架构不会带来阻力。下图是未来可能的一种架构:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

节点角色说明

节点角色说明
Deployer自动部署服务的本地代理
Repository仓库用于存储服务应用发布包
Scheduler调度中心基于访问压力自动增减服务提供者
Admin统一管理控制台
Registry服务注册与发现的注册中心
Monitor统计服务的调用次数和调用时间的监控中心

注册中心

Registry(服务注册中心)在Dubbo起着至关重要的作用。Dubbo官方推荐使用Zookeeper作为服务注册中心。Zookeeper 是 Apache Hadoop 的子项目.也可以选择不使用注册中心进行直连(不推荐)

配置

配置方式

  • 注解: 基于注解可以快速的将程序配置,无需多余的配置信息,包含提供者和消费者。但是这种方式有一个弊端,有些时候配置信息并不是特别好找,无法快速定位。

  • XML: 一般这种方式我们会和Spring做结合,相关的Service和Reference均使用Spring集成后的。通过这样的方式可以很方便的通过几个文件进行管理整个集群配置。可以快速定位也可以快速更改。

  • API: 基于代码方式的对配置进行配置。这个使用的比较少,这种方式更适用于自己公司对其框架与Dubbo做深度集成时才会使用。

XML配置说明

dubbo:service

服务提供者暴露服务配置。对应的配置类:com.alibaba.dubbo.config.ServiceConfig

属性对应URL参数类型是否必填缺省值作用描述兼容性
interfaceclass必填服务发现服务接口名1.0.0以上版本
refobject必填服务发现服务对象实现引用1.0.0以上版本
versionversionstring可选0.0.0服务发现服务版本,建议使用两位数字版本,如:1.0,通常在接口不兼容时版本号才需要升级1.0.0以上版本
groupgroupstring可选服务发现服务分组,当一个接口有多个实现,可以用分组区分1.0.7以上版本
path string可选缺省为接口名服务发现服务路径 (注意:1.0不支持自定义路径,总是使用接口名,如果有1.0调2.0,配置服务路径可能不兼容)1.0.12以上版本
delaydelayint可选0性能调优延迟注册服务时间(毫秒) ,设为-1时,表示延迟到Spring容器初始化完成时暴露服务1.0.14以上版本
timeouttimeoutint可选1000性能调优远程服务调用超时时间(毫秒)2.0.0以上版本
retriesretriesint可选2性能调优远程服务调用重试次数,不包括第一次调用,不需要重试请设为02.0.0以上版本
connectionsconnectionsint可选100性能调优对每个提供者的最大连接数,rmi、http、hessian等短连接协议表示限制连接数,dubbo等长连接协表示建立的长连接个数2.0.0以上版本
loadbalanceloadbalancestring可选random性能调优负载均衡策略,可选值:random,roundrobin,leastactive,分别表示:随机,轮循,最少活跃调用2.0.0以上版本
asyncasyncboolean可选false性能调优是否缺省异步执行,不可靠异步,只是忽略返回值,不阻塞执行线程2.0.0以上版本
stubstubclass/boolean可选false服务治理设为true,表示使用缺省代理类名,即:接口名 + Local后缀,服务接口客户端本地代理类名,用于在客户端执行本地逻辑,如本地缓存等,该本地代理类的构造函数必须允许传入远程代理对象,构造函数如:public XxxServiceLocal(XxxService xxxService)2.0.0以上版本
mockmockclass/boolean可选false服务治理设为true,表示使用缺省Mock类名,即:接口名 + Mock后缀,服务接口调用失败Mock实现类,该Mock类必须有一个无参构造函数,与Local的区别在于,Local总是被执行,而Mock只在出现非业务异常(比如超时,网络异常等)时执行,Local在远程调用之前执行,Mock在远程调用后执行。2.0.0以上版本
tokentokenstring/boolean可选false服务治理令牌验证,为空表示不开启,如果为true,表示随机生成动态令牌,否则使用静态令牌,令牌的作用是防止消费者绕过注册中心直接访问,保证注册中心的授权功能有效,如果使用点对点调用,需关闭令牌功能2.0.0以上版本
registrystring可选缺省向所有registry注册配置关联向指定注册中心注册,在多个注册中心时使用,值为dubbo:registry的id属性,多个注册中心ID用逗号分隔,如果不想将该服务注册到任何registry,可将值设为N/A2.0.0以上版本
providerstring可选缺使用第一个provider配置配置关联指定provider,值为dubbo:provider的id属性2.0.0以上版本
deprecateddeprecatedboolean可选false服务治理服务是否过时,如果设为true,消费方引用时将打印服务过时警告error日志2.0.5以上版本
dynamicdynamicboolean可选true服务治理服务是否动态注册,如果设为false,注册后将显示后disable状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。2.0.5以上版本
accesslogaccesslogstring/boolean可选false服务治理设为true,将向logger中输出访问日志,也可填写访问日志文件路径,直接把访问日志输出到指定文件2.0.5以上版本
ownerownerstring可选服务治理服务负责人,用于服务治理,请填写负责人公司邮箱前缀2.0.5以上版本
documentdocumentstring可选服务治理服务文档URL2.0.5以上版本
weightweightint可选性能调优服务权重2.0.5以上版本
executesexecutesint可选0性能调优服务提供者每服务每方法最大可并行执行请求数2.0.5以上版本
proxyproxystring可选javassist性能调优生成动态代理方式,可选:jdk/javassist2.0.5以上版本
clusterclusterstring可选failover性能调优集群方式,可选:failover/failfast/failsafe/failback/forking2.0.5以上版本
filterservice.filterstring可选default性能调优服务提供方远程调用过程拦截器名称,多个名称用逗号分隔2.0.5以上版本
listenerexporter.listenerstring可选default性能调优服务提供方导出服务监听器名称,多个名称用逗号分隔
protocolstring可选配置关联使用指定的协议暴露服务,在多协议时使用,值为dubbo:protocol的id属性,多个协议ID用逗号分隔2.0.5以上版本
layerlayerstring可选服务治理服务提供者所在的分层。如:biz、dao、intl:web、china:acton。2.0.7以上版本
registerregisterboolean可选true服务治理该协议的服务是否注册到注册中心2.0.8以上版本
activesactivesint可选0性能调优每服务消费者每服务每方法最大并发调用数2.0.5以上版本
dubbo:reference

服务消费者引用服务配置。对应的配置类: com.alibaba.dubbo.config.ReferenceConfig

属性对应URL参数类型是否必填缺省值作用描述兼容性
idstring必填配置关联服务引用BeanId1.0.0以上版本
interfaceclass必填服务发现服务接口名1.0.0以上版本
versionversionstring可选服务发现服务版本,与服务提供者的版本一致1.0.0以上版本
groupgroupstring可选服务发现服务分组,当一个接口有多个实现,可以用分组区分,必需和服务提供方一致1.0.7以上版本
timeouttimeoutlong可选缺省使用dubbo:consumer的timeout性能调优服务方法调用超时时间(毫秒)1.0.5以上版本
retriesretriesint可选缺省使用dubbo:consumer的retries性能调优远程服务调用重试次数,不包括第一次调用,不需要重试请设为02.0.0以上版本
connectionsconnectionsint可选缺省使用dubbo:consumer的connections性能调优对每个提供者的最大连接数,rmi、http、hessian等短连接协议表示限制连接数,dubbo等长连接协表示建立的长连接个数2.0.0以上版本
loadbalanceloadbalancestring可选缺省使用dubbo:consumer的loadbalance性能调优负载均衡策略,可选值:random,roundrobin,leastactive,分别表示:随机,轮循,最少活跃调用2.0.0以上版本
asyncasyncboolean可选缺省使用dubbo:consumer的async性能调优是否异步执行,不可靠异步,只是忽略返回值,不阻塞执行线程2.0.0以上版本
genericgenericboolean可选缺省使用dubbo:consumer的generic服务治理是否缺省泛化接口,如果为泛化接口,将返回GenericService2.0.0以上版本
checkcheckboolean可选缺省使用dubbo:consumer的check服务治理启动时检查提供者是否存在,true报错,false忽略2.0.0以上版本
urlurlstring可选服务治理点对点直连服务提供者地址,将绕过注册中心1.0.6以上版本
stubstubclass/boolean可选服务治理服务接口客户端本地代理类名,用于在客户端执行本地逻辑,如本地缓存等,该本地代理类的构造函数必须允许传入远程代理对象,构造函数如:public XxxServiceLocal(XxxService xxxService)2.0.0以上版本
mockmockclass/boolean可选服务治理服务接口调用失败Mock实现类名,该Mock类必须有一个无参构造函数,与Local的区别在于,Local总是被执行,而Mock只在出现非业务异常(比如超时,网络异常等)时执行,Local在远程调用之前执行,Mock在远程调用后执行。Dubbo1.0.13及其以上版本支持
cachecachestring/boolean可选服务治理以调用参数为key,缓存返回结果,可选:lru, threadlocal, jcache等Dubbo2.1.0及其以上版本支持
validationvalidationboolean可选服务治理是否启用JSR303标准注解验证,如果启用,将对方法参数上的注解进行校验Dubbo2.1.0及其以上版本支持
proxyproxyboolean可选javassist性能调优选择动态代理实现策略,可选:javassist, jdk2.0.2以上版本
clientclientstring可选性能调优客户端传输类型设置,如Dubbo协议的netty或mina。Dubbo2.0.0以上版本支持
registrystring可选缺省将从所有注册中心获服务列表后合并结果配置关联从指定注册中心注册获取服务列表,在多个注册中心时使用,值为dubbo:registry的id属性,多个注册中心ID用逗号分隔2.0.0以上版本
ownerownerstring可选服务治理调用服务负责人,用于服务治理,请填写负责人公司邮箱前缀2.0.5以上版本
activesactivesint可选0性能调优每服务消费者每服务每方法最大并发调用数2.0.5以上版本
clusterclusterstring可选failover性能调优集群方式,可选:failover/failfast/failsafe/failback/forking2.0.5以上版本
filterreference.filterstring可选default性能调优服务消费方远程调用过程拦截器名称,多个名称用逗号分隔2.0.5以上版本
listenerinvoker.listenerstring可选default性能调优服务消费方引用服务监听器名称,多个名称用逗号分隔2.0.5以上版本
layerlayerstring可选服务治理服务调用者所在的分层。如:biz、dao、intl:web、china:acton。2.0.7以上版本
initinitboolean可选false性能调优是否在afterPropertiesSet()时饥饿初始化引用,否则等到有人注入或引用该实例时再初始化。2.0.10以上版本
protocolprotocolstring可选服力治理只调用指定协议的服务提供方,其它协议忽略。2.2.0以上版本
dubbo:protocol

服务提供者协议配置。对应的配置类: com.alibaba.dubbo.config.ProtocolConfig。同时,如果需要支持多协议,可以声明多个 <dubbo:protocol> 标签,并在 <dubbo:service> 中通过 protocol 属性指定使用的协议。

属性对应URL参数类型是否必填缺省值作用描述兼容性
idstring可选dubbo配置关联协议BeanId,可以在<dubbo:service protocol=“”>中引用此ID,如果ID不填,缺省和name属性值一样,重复则在name后加序号。2.0.5以上版本
namestring必填dubbo性能调优协议名称2.0.5以上版本
portint可选dubbo协议缺省端口为20880,rmi协议缺省端口为1099,http和hessian协议缺省端口为80;如果配置为**-1** 或者 没有配置port,则会分配一个没有被占用的端口。Dubbo 2.4.0+,分配的端口在协议缺省端口的基础上增长,确保端口段可控。服务发现服务端口2.0.5以上版本
hoststring可选自动查找本机IP服务发现-服务主机名,多网卡选择或指定VIP及域名时使用,为空则自动查找本机IP,-建议不要配置,让Dubbo自动获取本机IP2.0.5以上版本
threadpoolthreadpoolstring可选fixed性能调优线程池类型,可选:fixed/cached2.0.5以上版本
threadsthreadsint可选100性能调优服务线程池大小(固定大小)2.0.5以上版本
iothreadsthreadsint可选cpu个数+1性能调优io线程池大小(固定大小)2.0.5以上版本
acceptsacceptsint可选0性能调优服务提供方最大可接受连接数2.0.5以上版本
payloadpayloadint可选88388608(=8M)性能调优请求及响应数据包大小限制,单位:字节2.0.5以上版本
codeccodecstring可选dubbo性能调优协议编码方式2.0.5以上版本
serializationserializationstring可选dubbo协议缺省为hessian2,rmi协议缺省为java,http协议缺省为json性能调优协议序列化方式,当协议支持多种序列化方式时使用,比如:dubbo协议的dubbo,hessian2,java,compactedjava,以及http协议的json等2.0.5以上版本
accesslogaccesslogstring/boolean可选服务治理设为true,将向logger中输出访问日志,也可填写访问日志文件路径,直接把访问日志输出到指定文件2.0.5以上版本
path string可选服务发现提供者上下文路径,为服务path的前缀2.0.5以上版本
transportertransporterstring可选dubbo协议缺省为netty性能调优协议的服务端和客户端实现类型,比如:dubbo协议的mina,netty等,可以分拆为server和client配置2.0.5以上版本
serverserverstring可选dubbo协议缺省为netty,http协议缺省为servlet性能调优协议的服务器端实现类型,比如:dubbo协议的mina,netty等,http协议的jetty,servlet等2.0.5以上版本
clientclientstring可选dubbo协议缺省为netty性能调优协议的客户端实现类型,比如:dubbo协议的mina,netty等2.0.5以上版本
dispatcherdispatcherstring可选dubbo协议缺省为all性能调优协议的消息派发方式,用于指定线程模型,比如:dubbo协议的all, direct, message, execution, connection等2.1.0以上版本
queuesqueuesint可选0性能调优线程池队列大小,当线程池满时,排队等待执行的队列大小,建议不要设置,当线程程池时应立即失败,重试其它服务提供机器,而不是排队,除非有特殊需求。2.0.5以上版本
charsetcharsetstring可选UTF-8性能调优序列化编码2.0.5以上版本
bufferbufferint可选8192性能调优网络读写缓冲区大小2.0.5以上版本
heartbeatheartbeatint可选0性能调优心跳间隔,对于长连接,当物理层断开时,比如拔网线,TCP的FIN消息来不及发送,对方收不到断开事件,此时需要心跳来帮助检查连接是否已断开2.0.10以上版本
telnettelnetstring可选服务治理所支持的telnet命令,多个命令用逗号分隔2.0.5以上版本
registerregisterboolean可选true服务治理该协议的服务是否注册到注册中心2.0.8以上版本
contextpathcontextpathString可选缺省为空串服务治理2.0.6以上版本
dubbo:registry

注册中心配置。对应的配置类: com.alibaba.dubbo.config.RegistryConfig。同时如果有多个不同的注册中心,可以声明多个 <dubbo:registry> 标签,并在 <dubbo:service><dubbo:reference>registry 属性指定使用的注册中心。

属性对应URL参数类型是否必填缺省值作用描述兼容性
idstring可选配置关联注册中心引用BeanId,可以在<dubbo:service registry=“”>或<dubbo:reference registry=“”>中引用此ID1.0.16以上版本
addresshost:portstring必填服务发现注册中心服务器地址,如果地址没有端口缺省为9090,同一集群内的多个地址用逗号分隔,如:ip:port,ip:port,不同集群的注册中心,请配置多个dubbo:registry标签1.0.16以上版本
protocolstring可选dubbo服务发现注同中心地址协议,支持dubbo, http, local三种协议,分别表示,dubbo地址,http地址,本地注册中心2.0.0以上版本
portint可选9090服务发现注册中心缺省端口,当address没有带端口时使用此端口做为缺省值2.0.0以上版本
usernamestring可选服务治理登录注册中心用户名,如果注册中心不需要验证可不填2.0.0以上版本
passwordstring可选服务治理登录注册中心密码,如果注册中心不需要验证可不填2.0.0以上版本
transportregistry.transporterstring可选netty性能调优网络传输方式,可选mina,netty2.0.0以上版本
timeoutregistry.timeoutint可选5000性能调优注册中心请求超时时间(毫秒)2.0.0以上版本
sessionregistry.sessionint可选60000性能调优注册中心会话超时时间(毫秒),用于检测提供者非正常断线后的脏数据,比如用心跳检测的实现,此时间就是心跳间隔,不同注册中心实现不一样。2.1.0以上版本
fileregistry.filestring可选服务治理使用文件缓存注册中心地址列表及服务提供者列表,应用重启时将基于此文件恢复,注意:两个注册中心不能使用同一文件存储2.0.0以上版本
waitregistry.waitint可选0性能调优停止时等待通知完成时间(毫秒)2.0.0以上版本
checkcheckboolean可选true服务治理注册中心不存在时,是否报错2.0.0以上版本
registerregisterboolean可选true服务治理是否向此注册中心注册服务,如果设为false,将只订阅,不注册2.0.5以上版本
subscribesubscribeboolean可选true服务治理是否向此注册中心订阅服务,如果设为false,将只注册,不订阅2.0.5以上版本
dynamicdynamicboolean可选true服务治理服务是否动态注册,如果设为false,注册后将显示后disable状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。2.0.5以上版本
dubbo:monitor

监控中心配置。对应的配置类: com.alibaba.dubbo.config.MonitorConfig

属性对应URL参数类型是否必填缺省值作用描述兼容性
protocolprotocolstring可选dubbo服务治理监控中心协议,如果为protocol=“registry”,表示从注册中心发现监控中心地址,否则直连监控中心。2.0.9以上版本
addressstring可选N/A服务治理直连监控中心服务器地址,address=“10.20.130.230:12080”1.0.16以上版本
dubbo:application

应用信息配置。对应的配置类:com.alibaba.dubbo.config.ApplicationConfig

属性对应URL参数类型是否必填缺省值作用描述兼容性
nameapplicationstring必填服务治理当前应用名称,用于注册中心计算应用间依赖关系,注意:消费者和提供者应用名不要一样,此参数不是匹配条件,你当前项目叫什么名字就填什么,和提供者消费者角色无关,比如:kylin应用调用了morgan应用的服务,则kylin项目配成kylin,morgan项目配成morgan,可能kylin也提供其它服务给别人使用,但kylin项目永远配成kylin,这样注册中心将显示kylin依赖于morgan1.0.16以上版本
versionapplication.versionstring可选服务治理当前应用的版本2.2.0以上版本
ownerownerstring可选服务治理应用负责人,用于服务治理,请填写负责人公司邮箱前缀2.0.5以上版本
organizationorganizationstring可选服务治理组织名称(BU或部门),用于注册中心区分服务来源,此配置项建议不要使用autoconfig,直接写死在配置中,比如china,intl,itu,crm,asc,dw,aliexpress等2.0.0以上版本
architecturearchitecturestring可选服务治理用于服务分层对应的架构。如,intl、china。不同的架构使用不同的分层。2.0.7以上版本
environmentenvironmentstring可选服务治理应用环境,如:develop/test/product,不同环境使用不同的缺省值,以及作为只用于开发测试功能的限制条件2.0.0以上版本
compilercompilerstring可选javassist性能优化Java字节码编译器,用于动态类的生成,可选:jdk或javassist2.1.0以上版本
loggerloggerstring可选slf4j性能优化日志输出方式,可选:slf4j,jcl,log4j,jdk2.2.0以上版本
dubbo:module

模块信息配置。对应的配置类 com.alibaba.dubbo.config.ModuleConfig

属性对应URL参数类型是否必填缺省值作用描述兼容性
namemodulestring必填服务治理当前模块名称,用于注册中心计算模块间依赖关系2.2.0以上版本
versionmodule.versionstring可选服务治理当前模块的版本2.2.0以上版本
ownerownerstring可选服务治理模块负责人,用于服务治理,请填写负责人公司邮箱前缀2.2.0以上版本
organizationorganizationstring可选服务治理组织名称(BU或部门),用于注册中心区分服务来源,此配置项建议不要使用autoconfig,直接写死在配置中,比如china,intl,itu,crm,asc,dw,aliexpress等2.2.0以上版本
dubbo:provider

服务提供者缺省值配置。对应的配置类: com.alibaba.dubbo.config.ProviderConfig。同时该标签为 <dubbo:service><dubbo:protocol> 标签的缺省值设置。

属性对应URL参数类型是否必填缺省值作用描述兼容性
idstring可选dubbo配置关联协议BeanId,可以在<dubbo:service proivder=“”>中引用此ID1.0.16以上版本
protocolstring可选dubbo性能调优协议名称1.0.16以上版本
hoststring可选自动查找本机IP服务发现服务主机名,多网卡选择或指定VIP及域名时使用,为空则自动查找本机IP,建议不要配置,让Dubbo自动获取本机IP1.0.16以上版本
threadsthreadsint可选100性能调优服务线程池大小(固定大小)1.0.16以上版本
payloadpayloadint可选88388608(=8M)性能调优请求及响应数据包大小限制,单位:字节2.0.0以上版本
path string可选服务发现提供者上下文路径,为服务path的前缀2.0.0以上版本
serverserverstring可选dubbo协议缺省为netty,http协议缺省为servlet性能调优协议的服务器端实现类型,比如:dubbo协议的mina,netty等,http协议的jetty,servlet等2.0.0以上版本
clientclientstring可选dubbo协议缺省为netty性能调优协议的客户端实现类型,比如:dubbo协议的mina,netty等2.0.0以上版本
codeccodecstring可选dubbo性能调优协议编码方式2.0.0以上版本
serializationserializationstring可选dubbo协议缺省为hessian2,rmi协议缺省为java,http协议缺省为json性能调优协议序列化方式,当协议支持多种序列化方式时使用,比如:dubbo协议的dubbo,hessian2,java,compactedjava,以及http协议的json,xml等2.0.5以上版本
defaultboolean可选false配置关联是否为缺省协议,用于多协议1.0.16以上版本
filterservice.filterstring可选性能调优服务提供方远程调用过程拦截器名称,多个名称用逗号分隔2.0.5以上版本
listenerexporter.listenerstring可选性能调优服务提供方导出服务监听器名称,多个名称用逗号分隔2.0.5以上版本
threadpoolthreadpoolstring可选fixed性能调优线程池类型,可选:fixed/cached2.0.5以上版本
acceptsacceptsint可选0性能调优服务提供者最大可接受连接数2.0.5以上版本
versionversionstring可选0.0.0服务发现服务版本,建议使用两位数字版本,如:1.0,通常在接口不兼容时版本号才需要升级2.0.5以上版本
groupgroupstring可选服务发现服务分组,当一个接口有多个实现,可以用分组区分2.0.5以上版本
delaydelayint可选0性能调优延迟注册服务时间(毫秒)- ,设为-1时,表示延迟到Spring容器初始化完成时暴露服务2.0.5以上版本
timeoutdefault.timeoutint可选1000性能调优远程服务调用超时时间(毫秒)2.0.5以上版本
retriesdefault.retriesint可选2性能调优远程服务调用重试次数,不包括第一次调用,不需要重试请设为02.0.5以上版本
connectionsdefault.connectionsint可选0性能调优对每个提供者的最大连接数,rmi、http、hessian等短连接协议表示限制连接数,dubbo等长连接协表示建立的长连接个数2.0.5以上版本
loadbalancedefault.loadbalancestring可选random性能调优负载均衡策略,可选值:random,roundrobin,leastactive,分别表示:随机,轮循,最少活跃调用2.0.5以上版本
asyncdefault.asyncboolean可选false性能调优是否缺省异步执行,不可靠异步,只是忽略返回值,不阻塞执行线程2.0.5以上版本
stubstubboolean可选false服务治理设为true,表示使用缺省代理类名,即:接口名 + Local后缀。2.0.5以上版本
mockmockboolean可选false服务治理设为true,表示使用缺省Mock类名,即:接口名 + Mock后缀。2.0.5以上版本
tokentokenboolean可选false服务治理令牌验证,为空表示不开启,如果为true,表示随机生成动态令牌2.0.5以上版本
registryregistrystring可选缺省向所有registry注册配置关联向指定注册中心注册,在多个注册中心时使用,值为dubbo:registry的id属性,多个注册中心ID用逗号分隔,如果不想将该服务注册到任何registry,可将值设为N/A2.0.5以上版本
dynamicdynamicboolean可选true服务治理服务是否动态注册,如果设为false,注册后将显示后disable状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。2.0.5以上版本
accesslogaccesslogstring/boolean可选false服务治理设为true,将向logger中输出访问日志,也可填写访问日志文件路径,直接把访问日志输出到指定文件2.0.5以上版本
ownerownerstring可选服务治理服务负责人,用于服务治理,请填写负责人公司邮箱前缀2.0.5以上版本
documentdocumentstring可选服务治理服务文档URL2.0.5以上版本
weightweightint可选性能调优服务权重2.0.5以上版本
executesexecutesint可选0性能调优服务提供者每服务每方法最大可并行执行请求数2.0.5以上版本
activesdefault.activesint可选0性能调优每服务消费者每服务每方法最大并发调用数2.0.5以上版本
proxyproxystring可选javassist性能调优生成动态代理方式,可选:jdk/javassist2.0.5以上版本
clusterdefault.clusterstring可选failover性能调优集群方式,可选:failover/failfast/failsafe/failback/forking2.0.5以上版本
deprecateddeprecatedboolean可选false服务治理服务是否过时,如果设为true,消费方引用时将打印服务过时警告error日志2.0.5以上版本
queuesqueuesint可选0性能调优线程池队列大小,当线程池满时,排队等待执行的队列大小,建议不要设置,当线程程池时应立即失败,重试其它服务提供机器,而不是排队,除非有特殊需求。2.0.5以上版本
charsetcharsetstring可选UTF-8性能调优序列化编码2.0.5以上版本
bufferbufferint可选8192性能调优网络读写缓冲区大小2.0.5以上版本
iothreadsiothreadsint可选CPU + 1性能调优IO线程池,接收网络读写中断,以及序列化和反序列化,不处理业务,业务线程池参见threads配置,此线程池和CPU相关,不建议配置。2.0.5以上版本
telnettelnetstring可选服务治理所支持的telnet命令,多个命令用逗号分隔2.0.5以上版本
dubbo:servicecontextpathcontextpathString可选缺省为空串服务治理2.0.6以上版本
layerlayerstring可选服务治理服务提供者所在的分层。如:biz、dao、intl:web、china:acton。2.0.7以上版本
dubbo:consumer

服务消费者缺省值配置。配置类: com.alibaba.dubbo.config.ConsumerConfig 。同时该标签为 <dubbo:reference> 标签的缺省值设置。

属性对应URL参数类型是否必填缺省值作用描述兼容性
timeoutdefault.timeoutint可选1000性能调优远程服务调用超时时间(毫秒)1.0.16以上版本
retriesdefault.retriesint可选2性能调优远程服务调用重试次数,不包括第一次调用,不需要重试请设为01.0.16以上版本
loadbalancedefault.loadbalancestring可选random性能调优负载均衡策略,可选值:random,roundrobin,leastactive,分别表示:随机,轮循,最少活跃调用1.0.16以上版本
asyncdefault.asyncboolean可选false性能调优是否缺省异步执行,不可靠异步,只是忽略返回值,不阻塞执行线程2.0.0以上版本
connectionsdefault.connectionsint可选100性能调优每个服务对每个提供者的最大连接数,rmi、http、hessian等短连接协议支持此配置,dubbo协议长连接不支持此配置1.0.16以上版本
genericgenericboolean可选false服务治理是否缺省泛化接口,如果为泛化接口,将返回GenericService2.0.0以上版本
checkcheckboolean可选true服务治理启动时检查提供者是否存在,true报错,false忽略1.0.16以上版本
proxyproxystring可选javassist性能调优生成动态代理方式,可选:jdk/javassist2.0.5以上版本
ownerownerstring可选服务治理调用服务负责人,用于服务治理,请填写负责人公司邮箱前缀2.0.5以上版本
activesdefault.activesint可选0性能调优每服务消费者每服务每方法最大并发调用数2.0.5以上版本
clusterdefault.clusterstring可选failover性能调优集群方式,可选:failover/failfast/failsafe/failback/forking2.0.5以上版本
filterreference.filterstring可选性能调优服务消费方远程调用过程拦截器名称,多个名称用逗号分隔2.0.5以上版本
listenerinvoker.listenerstring可选性能调优服务消费方引用服务监听器名称,多个名称用逗号分隔2.0.5以上版本
registrystring可选缺省向所有registry注册配置关联向指定注册中心注册,在多个注册中心时使用,值为dubbo:registry的id属性,多个注册中心ID用逗号分隔,如果不想将该服务注册到任何registry,可将值设为N/A2.0.5以上版本
layerlayerstring可选服务治理服务调用者所在的分层。如:biz、dao、intl:web、china:acton。2.0.7以上版本
initinitboolean可选false性能调优是否在afterPropertiesSet()时饥饿初始化引用,否则等到有人注入或引用该实例时再初始化。2.0.10以上版本
cachecachestring/boolean可选服务治理以调用参数为key,缓存返回结果,可选:lru, threadlocal, jcache等Dubbo2.1.0及其以上版本支持
validationvalidationboolean可选服务治理是否启用JSR303标准注解验证,如果启用,将对方法参数上的注解进行校验Dubbo2.1.0及其以上版本支持
dubbo:method

方法级配置。对应的配置类: com.alibaba.dubbo.config.MethodConfig。同时该标签为 <dubbo:service><dubbo:reference> 的子标签,用于控制到方法级。

属性对应URL参数类型是否必填缺省值作用描述兼容性
namestring必填标识方法名1.0.8以上版本
timeout.timeoutint可选缺省为的timeout性能调优方法调用超时时间(毫秒)1.0.8以上版本
retries.retriesint可选缺省为dubbo:reference的retries性能调优远程服务调用重试次数,不包括第一次调用,不需要重试请设为02.0.0以上版本
loadbalance.loadbalancestring可选缺省为的loadbalance性能调优负载均衡策略,可选值:random,roundrobin,leastactive,分别表示:随机,轮循,最少活跃调用2.0.0以上版本
async.asyncboolean可选缺省为dubbo:reference的async性能调优是否异步执行,不可靠异步,只是忽略返回值,不阻塞执行线程1.0.9以上版本
sent.sentboolean可选true性能调优异步调用时,标记sent=true时,表示网络已发出数据2.0.6以上版本
actives.activesint可选0性能调优每服务消费者最大并发调用限制2.0.5以上版本
executes.executesint可选0性能调优每服务每方法最大使用线程数限制- -,此属性只在dubbo:method作为dubbo:service子标签时有效2.0.5以上版本
deprecated.deprecatedboolean可选false服务治理服务方法是否过时,此属性只在dubbo:method作为dubbo:service子标签时有效2.0.5以上版本
sticky.stickyboolean可选false服务治理设置true 该接口上的所有方法使用同一个provider.如果需要更复杂的规则,请使用用路由2.0.6以上版本
return.returnboolean可选true性能调优方法调用是否需要返回值,async设置为true时才生效,如果设置为true,则返回future,或回调onreturn等方法,如果设置为false,则请求发送成功后直接返回Null2.0.6以上版本
oninvokeattribute属性,不在URL中体现String可选性能调优方法执行前拦截2.0.6以上版本
onreturnattribute属性,不在URL中体现String可选性能调优方法执行返回后拦截2.0.6以上版本
onthrowattribute属性,不在URL中体现String可选性能调优方法执行有异常拦截2.0.6以上版本
cache.cachestring/boolean可选服务治理以调用参数为key,缓存返回结果,可选:lru, threadlocal, jcache等Dubbo2.1.0及其以上版本支持
validation.validationboolean可选服务治理是否启用JSR303标准注解验证,如果启用,将对方法参数上的注解进行校验Dubbo2.1.0及其以上版本支持

比如:

<dubbo:reference interface="com.xxx.XxxService">
    <dubbo:method name="findXxx" timeout="3000" retries="2" />
</dubbo:reference>
dubbo:argument

方法参数配置。对应的配置类: com.alibaba.dubbo.config.ArgumentConfig。该标签为 <dubbo:method> 的子标签,用于方法参数的特征描述,比如:

<dubbo:method name="findXxx" timeout="3000" retries="2">
    <dubbo:argument index="0" callback="true" />
</dubbo:method>
属性对应URL参数类型是否必填缺省值作用描述兼容性
indexint必填标识方法名2.0.6以上版本
typeString与index二选一标识通过参数类型查找参数的index2.0.6以上版本
callback.retriesboolean可选服务治理参数是否为callback接口,如果为callback,服务提供方将生成反向代理,可以从服务提供方反向调用消费方,通常用于事件推送.2.0.6以上版本
dubbo:parameter

选项参数配置。对应的配置类:java.util.Map。同时该标签为<dubbo:protocol><dubbo:service><dubbo:provider><dubbo:reference><dubbo:consumer>的子标签,用于配置自定义参数,该配置项将作为扩展点设置自定义参数使用。

属性对应URL参数类型是否必填缺省值作用描述兼容性
keykeystring必填服务治理路由参数键2.0.0以上版本
valuevaluestring必填服务治理路由参数值2.0.0以上版本

比如:

<dubbo:protocol name="napoli">
    <dubbo:parameter key="http://10.20.160.198/wiki/display/dubbo/napoli.queue.name" value="xxx" />
</dubbo:protocol>

也可以:

<dubbo:protocol name="jms" p:queue="xxx" />

属性配置说明

官方相关文档

如果公共配置很简单,没有多注册中心,多协议等情况,或者想多个 Spring 容器想共享配置,可以使用 dubbo.properties 作为缺省配置。

Dubbo 将自动加载 classpath 根目录下的 dubbo.properties,可以通过JVM启动参数 -Ddubbo.properties.file=xxx.properties 改变缺省配置位置。

映射规则

官方相关文档

将 XML 配置的标签名,加属性名,用点分隔,多个属性拆成多行

  • 比如:dubbo.application.name=foo等价于<dubbo:application name="foo" />
  • 比如:dubbo.registry.address=10.20.153.10:9090等价于<dubbo:registry address="10.20.153.10:9090" />

如果 XML 有多行同名标签配置,可用 id 号区分,如果没有 id 号将对所有同名标签生效

  • 比如:dubbo.protocol.rmi.port=1234等价于<dubbo:protocol id="rmi" name="rmi" port="1234" />
  • 比如:dubbo.registry.china.address=10.20.153.10:9090等价于<dubbo:registry id="china" address="10.20.153.10:9090" />

下面是 dubbo.properties 的一个典型配置:

dubbo.application.name=foo
dubbo.application.owner=bar
dubbo.registry.address=10.20.153.10:9090

覆盖策略

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

JVM 启动 -D 参数优先,这样可以使用户在部署和启动时进行参数重写,比如在启动时需改变协议的端口。

XML 次之,如果在 XML 中有配置,则 dubbo.properties 中的相应配置项无效。

Properties 最后,相当于缺省值,只有 XML 没有配置时,dubbo.properties 的相应配置项才会生效,通常用于共享公共配置,比如应用名。

  1. 如果 classpath 根目录下存在多个 dubbo.properties,比如多个 jar 包中有 dubbo.properties,Dubbo 会任意加载,并打印 Error 日志,后续可能改为抛异常。
  1. 协议的 id 没配时,缺省使用协议名作为 id

源码简介

源码下载和编译

dubbo的项目在github中的地址为: https://github.com/apache/dubbo,下载后切到相应的分支然后进入dubbo项目 cd dubbo , 进行编译操作 mvn clean install -DskipTests 最后使用IDE引入项目

整体调用和设计

调用链路

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

上半部分代表服务生产者的范围,下半部分代表了服务消费者的范围

红色箭头代表了调用的方向

整体链路调用的流程:

  • 消费者通过Interface进行方法调用 ,统一交由消费者端的 Proxy 通过ProxyFactory 来进行代理对象的创建 使用到了 jdk javassist技术

  • Filter,这个模块 做一个统一的过滤请求

  • Invoker,最主要的调用逻辑,

    • 通过Directory 去配置中新读取信息 最终通过list方法获取所有的Invoker
    • 通过Cluster模块根据选择的具体路由规则来选取Invoker列表
    • 通过LoadBalance模块根据负载均衡策略选择一个具体的Invoker 来处理我们的请求
    • 如果执行中出现错误 并且Consumer阶段配置了重试机制 则会重新尝试执行
  • 继续经过Filter进行执行功能的前后封装 Invoker 选择具体的执行协议

  • 客户端 进行编码和序列化然后发送数据

  • 服务提供者收到数据包,会使用Codec处理协议头及一些半包、 粘包等。

  • 处理完成后再对完整的数据报文做反序列化处理

  • 随后, 这个Request会被分配到线程池(ThreadPool)中进行处理

  • Server会从线程池中处理这些Request,根据请求查找对应的Exporter (它内部持有了 Invoker)

  • 经过一层一层的Filter过滤后到达Invoker 执行器

  • 通过Invoker 调用接口的具体实现 然后返回

整体设计

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图例说明

  • 图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口。
  • 图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系,每一层都可以剥离上层被复用,其中,Service 和 Config 层为 API,其它各层均为 SPI。
  • 图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类。
  • 图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调时链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法。

Dubbo源码整体设计与调用链路十分相似。只不过这里可以看到接口的一些具体实现以及左侧也有更为详细的层次划分

分层介绍:

Business 业务逻辑层

  • service业务层.包括我们的业务代码 比如 接口 实现类 直接面向开发者

RPC层 远程过程调用层

  • config 配置层.对外提供配置 以ServiceConfig ReferenceConfig 为核心 可以直接初始化配置类 也可以解析配置文件生成
  • proxy 服务代理层.无论是生产者 还是消费者 框架都会产生一个代理类 整个过程对上层透明 就是业务层对远程调用无感
  • registry 注册中心层. 封装服务地址的注册与发现 以服务的URL为中心
  • cluster 路由层 (集群容错层). 提供了多个提供者的路由和负载均衡 并且它桥接注册中心 以Invoker为核心
  • monitor 监控层. RPC调用相关的信息 如 调用次数 成功失败的情况 调用时间等 在这一层完成
  • protocol 远程调用层. 封装RPC调用 无论是服务的暴露 还是 服务的引用 都是在Protocol中作为主功能入口 负责Invoker的整个生命周期 Dubbo中所有的模型都向Invoker靠拢

Remoting层 远程数据传输层

  • exchange 信息交换层. 封装请求和响应的模式 如把请求由同步 转换成异步
  • transport 网络传输层. 统一网络传输的接口 比如 netty 和 mina 统一为一个网络传输接口
  • serialize 数据序列化层. 负责管理整个框架中的数据传输的序列化和反序列化

SPI

ExtensionLoader

Dubbo中SPI的入口是ExtensionLoader.其中关键的接口有

  • getExtensionLoader:获取扩展点加载器 并加载所对应的所有的扩展点实现

  • getAdaptiveExtension:获取被@Adaptive标注的实现

  • getExtension:根据name 获取扩展的指定实现

  • getSupportedExtensions:获取所有扩展点

getExtensionLoader

调用getExtensionLoader获取某个接口的ExtensionLoader步骤

  • 判断接口类型是否合法
  • 从缓存中读取数据
  • 如果缓存中没有就进行初始化(接口加载器)
    • 获取ExtensionFactory对应的ExtensionLoader(加载器工厂的加载器)
    • 调用加载器的getAdaptiveExtension方法获取一个具体的加载器工厂,默认是AdaptiveExtensionFactory
    • 将加载器工厂赋值给接口加载器的objectFactory
  • 将接口加载器放入缓存
  • 从缓存中读取接口加载器
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    if (type == null) {
        throw new IllegalArgumentException("Extension type == null");
    }
    if (!type.isInterface()) {
        throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
    }
    if (!withExtensionAnnotation(type)) {
        throw new IllegalArgumentException("Extension type (" + type +
                ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
    }
    //尝试从缓存中加载
    ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    if (loader == null) {
        //缓存中没有进行加载 并放入缓存
        EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
        loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    }
    return loader;
}
private ExtensionLoader(Class<?> type) {
        this.type = type;
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }
getAdaptiveExtension
  • 从缓存中加载
  • 如果缓存中没有就创建
    • 加载所有的扩展类信息
    • 如果有类被标注为@Adaptive就返回
    • 如果没有类被标注就创建一个代理类后返回
  • 将创建的代理对象放入缓存中
public T getAdaptiveExtension() {
        Object instance = cachedAdaptiveInstance.get();// 进行Holder和加锁的方式来保证只会被创建一次
        if (instance == null) {
            if (createAdaptiveInstanceError != null) {// 如果直接已经有创建并且错误的情况,则直接返回错误信息,防止重复没必要的创建
                throw new IllegalStateException("Failed to create adaptive instance: " +
                        createAdaptiveInstanceError.toString(),
                        createAdaptiveInstanceError);
            }

            synchronized (cachedAdaptiveInstance) {
                instance = cachedAdaptiveInstance.get();
                if (instance == null) {
                    try {
                        instance = createAdaptiveExtension();// 创建Adaptive实现的代理对象
                        cachedAdaptiveInstance.set(instance);//放入缓存
                    } catch (Throwable t) {
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
                    }
                }
            }
        }

        return (T) instance;
    }
private T createAdaptiveExtension() {
        try {
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());// 使用`getAdaptiveExtensionClass`方法进行构建类并且执行实例化 然后进行扩展
        } catch (Exception e) {
            throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
        }
    }
private Class<?> getAdaptiveExtensionClass() {
        getExtensionClasses();// 加载所有的扩展类信息
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;//加载过了就直接返回
        }
        return cachedAdaptiveClass = createAdaptiveExtensionClass();//进行创建
    }

    private Class<?> createAdaptiveExtensionClass() {
        String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();//实例化一个新的Adaptive的代码生成器,并且进行代码生成
        ClassLoader classLoader = findClassLoader();// 获取类加载器
        org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();// 通过扩展点,寻找编译器, 有Java自带的编译器和Javassist的编译器
        return compiler.compile(code, classLoader);// 编译并且生成class
    }
获取扩展类及创建对象

当一个接口没有@Adaptive标识的实现类时createAdaptiveExtensionClass方法会生成一个代理类

getExtensionClasses方法是用来加载所有的扩展

private Map<String, Class<?>> getExtensionClasses() {
    Map<String, Class<?>> classes = cachedClasses.get();
    if (classes == null) {
        synchronized (cachedClasses) {
            classes = cachedClasses.get();
            if (classes == null) {
                classes = loadExtensionClasses();
                cachedClasses.set(classes);
            }
        }
    }
    return classes;
}
private Map<String, Class<?>> loadExtensionClasses() {
        cacheDefaultExtensionName();//获取SPI注解中的value 并赋值给cachedDefaultName

        Map<String, Class<?>> extensionClasses = new HashMap<>();

        for (LoadingStrategy strategy : strategies) {//加载接口对应的文件
            loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.excludedPackages());
            loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.excludedPackages());
        }

        return extensionClasses;
    }
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type,
                           boolean extensionLoaderClassLoaderFirst, String... excludedPackages) {
    String fileName = dir + type;
    try {
        Enumeration<java.net.URL> urls = null;
        ClassLoader classLoader = findClassLoader();
        
        // try to load from ExtensionLoader's ClassLoader first
        if (extensionLoaderClassLoaderFirst) {
            ClassLoader extensionLoaderClassLoader = ExtensionLoader.class.getClassLoader();
            if (ClassLoader.getSystemClassLoader() != extensionLoaderClassLoader) {
                urls = extensionLoaderClassLoader.getResources(fileName);
            }
        }
        
        if(urls == null || !urls.hasMoreElements()) {
            if (classLoader != null) {
                urls = classLoader.getResources(fileName);
            } else {
                urls = ClassLoader.getSystemResources(fileName);
            }
        }

        if (urls != null) {
            while (urls.hasMoreElements()) {
                java.net.URL resourceURL = urls.nextElement();
                loadResource(extensionClasses, classLoader, resourceURL, excludedPackages);//加载文件
            }
        }
    } catch (Throwable t) {
        logger.error("Exception occurred when loading extension class (interface: " +
                type + ", description file: " + fileName + ").", t);
    }
}
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader,
                          java.net.URL resourceURL, String... excludedPackages) {
    try {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) {
            String line;
            while ((line = reader.readLine()) != null) {
                final int ci = line.indexOf('#');
                if (ci >= 0) {
                    line = line.substring(0, ci);
                }
                line = line.trim();
                if (line.length() > 0) {
                    try {
                        String name = null;
                        int i = line.indexOf('=');
                        if (i > 0) {
                            name = line.substring(0, i).trim();
                            line = line.substring(i + 1).trim();
                        }
                        if (line.length() > 0 && !isExcluded(line, excludedPackages)) {
                            loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);//加载文件中配置的类
                        }
                    } catch (Throwable t) {
                        IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
                        exceptions.put(line, e);
                    }
                }
            }
        }
    } catch (Throwable t) {
        logger.error("Exception occurred when loading extension class (interface: " +
                type + ", class file: " + resourceURL + ") in " + resourceURL, t);
    }
}
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
    if (!type.isAssignableFrom(clazz)) {
        throw new IllegalStateException("Error occurred when loading extension class (interface: " +
                type + ", class line: " + clazz.getName() + "), class "
                + clazz.getName() + " is not subtype of interface.");
    }
    if (clazz.isAnnotationPresent(Adaptive.class)) {
        cacheAdaptiveClass(clazz);//加载被@Adaptive 标注的类
    } else if (isWrapperClass(clazz)) {
        cacheWrapperClass(clazz);//缓存包装类
    } else {
        clazz.getConstructor();
        if (StringUtils.isEmpty(name)) {
            name = findAnnotationName(clazz);
            if (name.length() == 0) {
                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
            }
        }

        String[] names = NAME_SEPARATOR.split(name);
        if (ArrayUtils.isNotEmpty(names)) {
            cacheActivateClass(clazz, names[0]);//加载需要被激活的类 被@Activate标注
            for (String n : names) {
                cacheName(clazz, n);//缓存对应的class
                saveInExtensionClass(extensionClasses, clazz, n);//保存
            }
        }
    }
}
注入其他扩展类的对象

createAdaptiveExtension方法中的injectExtension是对对象进行扩展,如果一个对象中有其他的扩展类需要注入,该方法就是进行注入

private T injectExtension(T instance) {

    if (objectFactory == null) {
        return instance;
    }

    try {
        for (Method method : instance.getClass().getMethods()) {//遍历使用方法
            if (!isSetter(method)) {//如果是set方法 且只有一个参数 且是公开方法才处理
                continue;
            }
            /**
             * Check {@link DisableInject} to see if we need auto injection for this property
             */
            if (method.getAnnotation(DisableInject.class) != null) {// 如果设置了取消注册,则不进行处理
                continue;
            }
            Class<?> pt = method.getParameterTypes()[0];
            if (ReflectUtils.isPrimitives(pt)) {// 获取参数类型,并且非基础类型(String, Integer等类型)
                continue;
            }

            try {
                String property = getSetterProperty(method);// 获取需要set的扩展点名称
                Object object = objectFactory.getExtension(pt, property);//从ExtensionLoader中加载指定的扩展点  比如有一个方法为setRandom(LoadBalance loadBalance),那么则以为着需要加载负载均衡中名为random的扩展点
                if (object != null) {
                    method.invoke(instance, object);
                }
            } catch (Exception e) {
                logger.error("Failed to inject via method " + method.getName()
                        + " of interface " + type.getName() + ": " + e.getMessage(), e);
            }

        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
    return instance;
}
getExtension

主要作用是根据name对扩展点进行处理和进行加锁来创建真实的引用,其中都是有使用缓存来处理

public T getExtension(String name) {
    if (StringUtils.isEmpty(name)) {
        throw new IllegalArgumentException("Extension name == null");
    }
    if ("true".equals(name)) {// 获取当前SPi的默认扩展实现类
        return getDefaultExtension();
    }
    final Holder<Object> holder = getOrCreateHolder(name);
    Object instance = holder.get();
    if (instance == null) {
        synchronized (holder) {
            instance = holder.get();
            if (instance == null) {
                instance = createExtension(name);//创建代理对象
                holder.set(instance);
            }
        }
    }
    return (T) instance;
}
private T createExtension(String name) {
    Class<?> clazz = getExtensionClasses().get(name);// 加载所有的扩展类 放入缓存 获取name对应的扩展类
    if (clazz == null) {
        throw findException(name);
    }
    try {
        T instance = (T) EXTENSION_INSTANCES.get(clazz);// 从缓存中获取实例
        if (instance == null) {
            EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }
        injectExtension(instance);//注入其他扩展类的对象
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (CollectionUtils.isNotEmpty(wrapperClasses)) {
            for (Class<?> wrapperClass : wrapperClasses) {// 遍历所有的包装类信息
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));//对包装的类进行包装实例化,并且返回自身引用 然后注入其他扩展类对象
            }
        }
        initExtension(instance);// 对扩展点进行初始化操作
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                type + ") couldn't be instantiated: " + t.getMessage(), t);
    }
}
getSupportedExtensions
public Set<String> getSupportedExtensions() {
    Map<String, Class<?>> clazzes = getExtensionClasses();//加载所有扩展类
    return Collections.unmodifiableSet(new TreeSet<>(clazzes.keySet()));
}
private Map<String, Class<?>> getExtensionClasses() {
    Map<String, Class<?>> classes = cachedClasses.get();
    if (classes == null) {
        synchronized (cachedClasses) {
            classes = cachedClasses.get();
            if (classes == null) {
                classes = loadExtensionClasses();//加载所有扩展类
                cachedClasses.set(classes);
            }
        }
    }
    return classes;
}
ExtensionFactory

通过传入扩展点类型和真正的名称来获取扩展类。

/**
 * Get extension.
 *
 * @param type object type.
 * @param name object name.
 * @return object instance.
 */
<T> T getExtension(Class<T> type, String name);

ExtensionFactory有三个实现

adaptive=org.apache.dubbo.common.extension.factory.AdaptiveExtensionFactory
spi=org.apache.dubbo.common.extension.factory.SpiExtensionFactory

还有一个是和spring相关的

spring=org.apache.dubbo.config.spring.extension.SpringExtensionFactory

AdaptiveExtensionFactory 实现中是使用 @Adaptive 标记的。主要作用是进行代理其他的ExtensionFactory。

private final List<ExtensionFactory> factories;

public AdaptiveExtensionFactory() {
    ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
    List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
    for (String name : loader.getSupportedExtensions()) {//获取ExtensionLoader的所有实现加入list
        list.add(loader.getExtension(name));
    }
    factories = Collections.unmodifiableList(list);//设置为不可修改
}

@Override
public <T> T getExtension(Class<T> type, String name) {
    for (ExtensionFactory factory : factories) {
        T extension = factory.getExtension(type, name);//交给具体的ExtensionFactory来处理
        if (extension != null) {
            return extension;
        }
    }
    return null;
}

服务注册与引用

目录说明

注册中心是Dubbo的重要组成部分,主要用于服务的注册与发现,可以选择Redis、Nacos、Zookeeper作为Dubbo的注册中心,Dubbo推荐用户使用Zookeeper作为注册中心。

  • Zookeeper中所有的信息都是在dubbo层级下的

  • dubbo根节点下面是当前所拥有的接口名称,如果有多个接口,则会以多个子节点的形式展开

  • 每个服务下面又分别有四个配置项

    • consumers: 当前服务下面所有的消费者列表(URL)
    • providers: 当前服务下面所有的提供者列表(URL)
    • configuration: 当前服务下面的配置信息信息,provider或者consumer会通过读取这里的配置信息来获取配置
    • routers: 当消费者在进行获取提供者的时,会通过这里配置好的路由来进行适配匹配规则。

dubbo基本上很多时候都是通过URL的形式来进行交互获取数据的,在URL中也会保存很多的信息

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

通过这张图可以了解到如下信息:

  • 提供者会在 providers 目录下进行自身的进行注册。
  • 消费者会在 consumers 目录下进行自身注册,并且监听 provider 目录,以此通过监听提供者增加或者减少,实现服务发现。
  • Monitor模块会对整个服务级别做监听,用来得知整体的服务情况。以此就能更多的对整体情况做监控
URL说明

dubbo://192.168.92.162:20889/com.edu.dubbo.service.HelloService?anyhost=true&application=service-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.edu.dubbo.service.HelloService&methods=sayHello&owner=glc&pid=52874&release=2.7.5&side=provider&threadpool=watching&timestamp=1659682837097

Dubbo 采用 URL 的方式来作为约定的参数类型,被称为公共契约.

URL 的格式为protocol://host:port/path?key=value&key=value 具体的参数如下:

  • protocol:指的是 dubbo 中的各种协议,如:dubbo thrift http
  • username/password:用户名/密码
  • host/port:主机/端口
  • path:接口的名称
  • parameters:参数键值对

Dubbo中的URL与java中的URL的区别:

  • Dubbo提供了针对于参数的 parameter 的增加和减少(支持动态更改)

  • 提供缓存功能,对一些基础的数据做缓存

xml配置解析

一般常用 XML 或者注解来进行 Dubbo 的配置.

对于xml,Dubbo 利用了 Spring 配置文件扩展了自定义的解析,像 dubbo.xsd 就是用来约束 XML 配置时候的标签和对应的属性用的,然后 Spring 在解析到自定义的标签的时候会查找 spring.schemas 和 spring.handlers。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

DubboNamespaceHandler对标签进行解析,DubboNamespaceHandler的init方法如下:

@Override
public void init() {
    registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
    registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
    registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
    registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
    registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
    registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
    registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
    registerBeanDefinitionParser("ssl", new DubboBeanDefinitionParser(SslConfig.class, true));
    registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
    registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
    registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
    registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
    registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
    registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}

对于service标签是委托给了ServiceBean进行解析

服务注册

容器创建完成之后,触发ContextRefreshEvent事件回调开始注册服务

代码的流程来看,注册的步骤如下:

  • 校验和初始化
  • 加载配置中心
  • 拼装服务的URL
  • 创建服务对应Invoker对象
  • 暴露和注册服务到本地和远程

对象构建转换的角度来看,注册的步骤如下:

  • 将服务实现类转成 Invoker
  • 将 Invoker 通过具体的协议转换成 Exporter

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

OneTimeExecutionApplicationContextEventListener类来专门负责dubbo的启动和停止,监听到ContextRefreshedEvent事件和ContextClosedEvent事件后来处理服务的启动和停止

DubboBootstrapApplicationListener继承了OneTimeExecutionApplicationContextEventListener,其onApplicationContextEvent方法如下:

@Override
public void onApplicationContextEvent(ApplicationContextEvent event) {
    if (event instanceof ContextRefreshedEvent) {
        onContextRefreshedEvent((ContextRefreshedEvent) event);
    } else if (event instanceof ContextClosedEvent) {
        onContextClosedEvent((ContextClosedEvent) event);
    }
}

private void onContextRefreshedEvent(ContextRefreshedEvent event) {
    dubboBootstrap.start();
}

private void onContextClosedEvent(ContextClosedEvent event) {
    dubboBootstrap.stop();
}

在启动时会调用DubboBootstrap类的.start()

public DubboBootstrap start() {
    if (started.compareAndSet(false, true)) {
        initialize();
        if (logger.isInfoEnabled()) {
            logger.info(NAME + " is starting...");
        }
        // 1. export Dubbo Services 暴露服务
        exportServices();

        // Not only provider register
        if (!isOnlyRegisterProvider() || hasExportedServices()) {
            // 2. export MetadataService 注册元数据服务
            exportMetadataService();
            //3. Register the local ServiceInstance if required 本地注册
            registerServiceInstance();
        }

        referServices();

        if (logger.isInfoEnabled()) {
            logger.info(NAME + " has started.");
        }
    }
    return this;
}

暴露服务的方法是exportServices():

private void exportServices() {
    configManager.getServices().forEach(sc -> {
        // TODO, compatible with ServiceConfig.export()
        ServiceConfig serviceConfig = (ServiceConfig) sc;
        serviceConfig.setBootstrap(this);

        if (exportAsync) {//异步暴露服务。
            ExecutorService executor = executorRepository.getServiceExporterExecutor();
            Future<?> future = executor.submit(() -> {
                sc.export();//异步调到具体的serviceBean的export方法
            });
            asyncExportingFutures.add(future);
        } else {
            sc.export();//同步调到具体的serviceBean的export方法
            exportedServices.add(sc);
        }
    });
}

调用ServiceConfig中的export():

public synchronized void export() {
    if (!shouldExport()) {
        return;
    }

    if (bootstrap == null) {
        bootstrap = DubboBootstrap.getInstance();
        bootstrap.init();
    }

    checkAndUpdateSubConfigs();//校验和初始化

    //init serviceMetadata
    serviceMetadata.setVersion(version);
    serviceMetadata.setGroup(group);
    serviceMetadata.setDefaultGroup(group);
    serviceMetadata.setServiceType(getInterfaceClass());
    serviceMetadata.setServiceInterfaceName(getInterface());
    serviceMetadata.setTarget(getRef());

    if (shouldDelay()) {
        DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
    } else {
        doExport();//暴露和注册
    }

    exported();
}

checkAndUpdateSubConfigs是进行了校验和初始化,主要步骤如下:

  • 检测 dubbo:service 中interface的属性是否为空为空则抛出异常
  • 检测 ProviderConfig、ApplicationConfig 等核心配置类对象是否为空,若为空,则尝试从其他配置类对象中获取相应的实例。
  • 检测并处理泛化服务和普通服务类
  • 检测本地存根(stub)配置,并进行相应的处理
  • 对 ApplicationConfig、RegistryConfig 等配置类进行检测,为空则尝试创建,若无法创建则抛出异常
private void checkAndUpdateSubConfigs() {
    // Use default configs defined explicitly with global scope
    completeCompoundConfigs();
    checkDefault();
    checkProtocol();
    // init some null configuration.
    List<ConfigInitializer> configInitializers = ExtensionLoader.getExtensionLoader(ConfigInitializer.class)
            .getActivateExtension(URL.valueOf("configInitializer://"), (String[]) null);
    configInitializers.forEach(e -> e.initServiceConfig(this));

    // if protocol is not injvm checkRegistry
    if (!isOnlyInJvm()) {
        checkRegistry();
    }
    this.refresh();

    if (StringUtils.isEmpty(interfaceName)) {
        throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
    }

    if (ref instanceof GenericService) {
        interfaceClass = GenericService.class;
        if (StringUtils.isEmpty(generic)) {
            generic = Boolean.TRUE.toString();
        }
    } else {
        try {
            interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                    .getContextClassLoader());
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        checkInterfaceAndMethods(interfaceClass, getMethods());
        checkRef();
        generic = Boolean.FALSE.toString();
    }
    if (local != null) {
        if ("true".equals(local)) {
            local = interfaceName + "Local";
        }
        Class<?> localClass;
        try {
            localClass = ClassUtils.forNameWithThreadContextClassLoader(local);
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        if (!interfaceClass.isAssignableFrom(localClass)) {
            throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
        }
    }
    if (stub != null) {
        if ("true".equals(stub)) {
            stub = interfaceName + "Stub";
        }
        Class<?> stubClass;
        try {
            stubClass = ClassUtils.forNameWithThreadContextClassLoader(stub);
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        if (!interfaceClass.isAssignableFrom(stubClass)) {
            throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
        }
    }
    checkStubAndLocal(interfaceClass);
    ConfigValidationUtils.checkMock(interfaceClass, this);
    ConfigValidationUtils.validateServiceConfig(this);
    postProcessConfig();
}

真正执行暴露逻辑的是doExport()方法中的doExportUrls():

private void doExportUrls() {
    ServiceRepository repository = ApplicationModel.getServiceRepository();
    ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
    repository.registerProvider(
            getUniqueServiceName(),
            ref,
            serviceDescriptor,
            this,
            serviceMetadata
    );

    List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);//加载配置中心 获取需要注册的配置中心

    for (ProtocolConfig protocolConfig : protocols) {
        String pathKey = URL.buildKey(getContextPath(protocolConfig)
                .map(p -> p + "/" + path)
                .orElse(path), group, version);
        // In case user specified path, register service one more time to map it to path.
        repository.registerService(pathKey, interfaceClass);
        // TODO, uncomment this line once service key is unified
        serviceMetadata.setServiceKey(pathKey);
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);//拼装服务的URL和暴露、注册服务
    }
}

doExportUrlsFor1Protocol是关键方法,用来拼装服务的URL和注册服务:

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();
        if (StringUtils.isEmpty(name)) {
            name = DUBBO;
        }

        Map<String, String> map = new HashMap<String, String>();//改map是用来生成url的
        map.put(SIDE_KEY, PROVIDER_SIDE);// 添加 side信息
        //通过反射将对象的字段信息添加到 map 中
        ServiceConfig.appendRuntimeParameters(map);
        AbstractConfig.appendParameters(map, getMetrics());
        AbstractConfig.appendParameters(map, getApplication());
        AbstractConfig.appendParameters(map, getModule());
        // remove 'default.' prefix for configs from ProviderConfig
        // appendParameters(map, provider, Constants.DEFAULT_KEY);
        AbstractConfig.appendParameters(map, provider);
        AbstractConfig.appendParameters(map, protocolConfig);
        AbstractConfig.appendParameters(map, this);
        MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
        if (metadataReportConfig != null && metadataReportConfig.isValid()) {
            map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
        }
        if (CollectionUtils.isNotEmpty(getMethods())) {
            for (MethodConfig method : getMethods()) {
                AbstractConfig.appendParameters(map, method, method.getName());
                String retryKey = method.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(method.getName() + ".retries", "0");
                    }
                }
                List<ArgumentConfig> arguments = method.getArguments();
                if (CollectionUtils.isNotEmpty(arguments)) {
                    for (ArgumentConfig argument : arguments) {
                        // convert argument type
                        if (argument.getType() != null && argument.getType().length() > 0) {
                            Method[] methods = interfaceClass.getMethods();
                            // visit all methods
                            if (methods.length > 0) {
                                for (int i = 0; i < methods.length; i++) {
                                    String methodName = methods[i].getName();
                                    // target the method, and get its signature
                                    if (methodName.equals(method.getName())) {
                                        Class<?>[] argtypes = methods[i].getParameterTypes();
                                        // one callback in the method
                                        if (argument.getIndex() != -1) {
                                            if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                                AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                            } else {
                                                throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                            }
                                        } else {
                                            // multiple callbacks in the method
                                            for (int j = 0; j < argtypes.length; j++) {
                                                Class<?> argclazz = argtypes[j];
                                                if (argclazz.getName().equals(argument.getType())) {
                                                    AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);
                                                    if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                        throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        } else if (argument.getIndex() != -1) {
                            AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                        } else {
                            throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                        }

                    }
                }
            } // end of methods for
        }

        if (ProtocolUtils.isGeneric(generic)) {
            map.put(GENERIC_KEY, generic);
            map.put(METHODS_KEY, ANY_VALUE);
        } else {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put(REVISION_KEY, revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }

        /**
         * Here the token value configured by the provider is used to assign the value to ServiceConfig#token
         */
        if(ConfigUtils.isEmpty(token) && provider != null) {
            token = provider.getToken();
        }

        if (!ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                map.put(TOKEN_KEY, UUID.randomUUID().toString());
            } else {
                map.put(TOKEN_KEY, token);
            }
        }
        //init serviceMetadata attachments
        serviceMetadata.getAttachments().putAll(map);

        // export service
        String host = findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = findConfigedPorts(protocolConfig, name, map);
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);//根据map生成map

        // You can customize Configurator to append extra parameters
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {//添加自定义的参数
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

        String scope = url.getParameter(SCOPE_KEY);
        // don't export when none is configured scope配置为none时不导出
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);//暴露、注册服务到本地
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {//将服务注册到所有注册中心上
                        //if protocol is only injvm ,not register
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());//如果有监控中心就添加
                        }
                        if (logger.isInfoEnabled()) {
                            if (url.getParameter(REGISTER_KEY, true)) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            } else {
                                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                            }
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }
                        //创建服务对应Invoker对象 根据ref中的具体实现类对象生成一个invoker
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);//根据invoker生成一个包装对象
                        //暴露、注册服务到远程
                        Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {//没有注册中心就直接暴露、注册
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                    exporters.add(exporter);
                }
                /**
                 * @since 2.7.0
                 * ServiceData Store
                 */
                WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
                if (metadataService != null) {
                    metadataService.publishServiceDefinition(url);
                }
            }
        }
        this.urls.add(url);
    }

其中的核心代码如下:

if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);//暴露、注册服务到本地
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {//将服务注册到所有注册中心上
                        //if protocol is only injvm ,not register
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());//如果有监控中心就添加
                        }
                        if (logger.isInfoEnabled()) {
                            if (url.getParameter(REGISTER_KEY, true)) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            } else {
                                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                            }
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }
                        //创建服务对应Invoker对象 根据ref中的具体实现类对象生成一个invoker
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);//根据invoker生成一个包装对象
                        //暴露、注册服务到远程
                        Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {//没有注册中心就直接暴露、注册
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                    exporters.add(exporter);
                }
                /**
                 * @since 2.7.0
                 * ServiceData Store
                 */
                WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
                if (metadataService != null) {
                    metadataService.publishServiceDefinition(url);
                }
            }
        }

本地注册方法为exportLocal,使用的协议是injvm:

private void exportLocal(URL url) {
        URL local = URLBuilder.from(url)
                .setProtocol(LOCAL_PROTOCOL)
                .setHost(LOCALHOST_VALUE)
                .setPort(0)
                .build();
        Exporter<?> exporter = PROTOCOL.export(
                PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));//创建服务对应Invoker对象 然后暴露、注册服务到本地
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
    }

需要注意不管是本地注册还是远程注册在调用export时的调用链如下:ProtocolListenerWrapper.export()–>ProtocolFilterWrapper.export()–>QosProtocolWrapper.export()–>真正的Protocol实现.

ProtocolFilterWrapper的export 里面就会把 invoker 组装上各种 Filter

对于暴露、注册服务到远程的核心代码在RegistryProtocol.export()中

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    URL registryUrl = getRegistryUrl(originInvoker);
    // url to export locally
    URL providerUrl = getProviderUrl(originInvoker);

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
    //  the same service. Because the subscribed is cached key with the name of the service, it causes the
    //  subscription information to cover.
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);//获取订阅 url
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);//创建监听器  监听overrideSubscribeUrl
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
    //export invoker 服务暴露
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    // url to registry  获取注册中心
    final Registry registry = getRegistry(originInvoker);
    final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
    // decide if we need to delay publish
    boolean register = providerUrl.getParameter(REGISTER_KEY, true);
    if (register) {
        register(registryUrl, registeredProviderUrl);//服务注册
    }

    // Deprecated! Subscribe to override rules in 2.6.x or before. 订阅对应的url
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    exporter.setRegisterUrl(registeredProviderUrl);//设置注册中心 url
    exporter.setSubscribeUrl(overrideSubscribeUrl);//设置订阅的url

    notifyExport(exporter);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<>(exporter);
}

服务暴露的方法是doLocalExport

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    String key = getCacheKey(originInvoker);

    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
    });
}

主要逻辑在protocol.export中,最终会调到DubboProtocol.export方法

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();//获取url

    // export service.
    String key = serviceKey(url);//key是接口全路径:port
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }

        }
    }

    openServer(url);//打开服务
    optimizeSerialization(url);

    return exporter;
}

private void openServer(URL url) {
    // find server.
    String key = url.getAddress();//获取ip
    //client can export a service which's only for server to invoke
    boolean isServer = url.getParameter(IS_SERVER_KEY, true);
    if (isServer) {
        ProtocolServer server = serverMap.get(key);
        if (server == null) {//检查是否已经存在
            synchronized (this) {
                server = serverMap.get(key);
                if (server == null) {
                    serverMap.put(key, createServer(url));//创建server
                }
            }
        } else {
            // server supports reset, use together with override
            server.reset(url);//如果有了就重置一下
        }
    }
}

private ProtocolServer createServer(URL url) {
    url = URLBuilder.from(url)
            // send readonly event when server closes, it's enabled by default
            .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
            // enable heartbeat by default
            .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
            .addParameter(CODEC_KEY, DubboCodec.NAME)
            .build();
    String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    }

    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);//根据URL 调用对应的Server 默认netty.并且初始化handler
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }

    str = url.getParameter(CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }

    return new DubboProtocolServer(server);
}

注册方法是register

public void register(URL registryUrl, URL registeredProviderUrl) {
    Registry registry = registryFactory.getRegistry(registryUrl);//获取注册中心
    registry.register(registeredProviderUrl);//注册

    ProviderModel model = ApplicationModel.getProviderModel(registeredProviderUrl.getServiceKey());
    model.addStatedUrl(new ProviderModel.RegisterStatedURL(
            registeredProviderUrl,
            registryUrl,
            true
    ));
}

registry.register会调用FailbackRegistry中的register方法

public void register(URL url) {
    if (!acceptable(url)) {
        logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
        return;
    }
    super.register(url);// 保存已经注册的地址列表
    removeFailedRegistered(url);// 将一些错误的信息移除
    removeFailedUnregistered(url);
    try {
        // Sending a registration request to the server side 进行注册操作
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // If the startup detection is opened, the Exception is thrown directly. 日志
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // Record a failed registration request to a failed list, retry regularly 异步进行重试
        addFailedRegistered(url);
    }
}

其中关键方法是doRegister,如果是使用zk做为注册中心就看ZookeeperRegistry中的

@Override
public void doRegister(URL url) {
    try {
      //创建地址
        zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
private String toUrlPath(URL url) {
        return toCategoryPath(url) + PATH_SEPARATOR + URL.encode(url.toFullString());// 分类地址 + url字符串
    }
private String toCategoryPath(URL url) {
        return toServicePath(url) + PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);// 服务名称 + category(在当前代码中是providers)
    }

private String toServicePath(URL url) {
        String name = url.getServiceInterface();// 接口地址
        if (ANY_VALUE.equals(name)) {
            return toRootPath();
        }
        return toRootDir() + URL.encode(name);// 根节点 + 接口地址
    }
服务发现

Provider通过服务注册将自己的服务暴露出来,注册到注册中心,而 Consumer就是从注册中心得知 Provider 的信息,然后自己封装一个调用类,通过rpc远程调用从而达到调用Provider

dubbo中服务的发现分为三种:

  • 本地:走 injvm 协议,当一个服务端既是 Provider 又是 Consumer 时通过本地引用可以避免远程网络调用的开销.在进行服务引用时会先去本地缓存中找本地服务.如果要关闭可以配置 scope = remote
  • 直连:不需要启动注册中心,由 Consumer 直接配置写死 Provider 的地址.一般都是测试的情况下用
  • 通过注册中心:Consumer 通过注册中心得知 Provider 的相关信息,然后进行服务的引入.该情况比较复杂,包括多注册中心,同一个服务多个提供者的情况,如何抉择如何封装,如何进行负载均衡、容错。

对于reference标签的解析是通过ReferenceBean进行的.和服务注册一样也是在 Spring 容器刷新完成之后开始暴露,而服务的引入时机有两种,第一种是饿汉式,第二种是懒汉式。默认是懒汉式,如果需要修改可以通过dubbo:reference的init来开启饿汉式

饿汉式是通过实现 Spring 的InitializingBean接口中的 afterPropertiesSet方法,容器通过调用 ReferenceBeanafterPropertiesSet方法时引入服务。

ReferenceBean 类实现了FactoryBean,所以其获取bean的方法是getObject(),其调用了ReferenceConfig#get()方法

在get方法中主要是调用了init(),而init主要是来解析配置文件然后调用createProxy方法来生成代理对象,其主要步骤如下:

  • 检查配置,通过配置构建一个 map
  • 然后利用 map 来构建 URL
  • 通过 URL 上的协议利用自适应扩展机制调用对应的 protocol.refer 得到相应的 invoker 。如果有多个 URL ,先遍历构建出 invoker 然后再由 StaticDirectory 封装一下,然后通过 cluster 进行合并,暴露出一个 invoker 。
  • 构建代理,封装 invoker 返回服务引用,之后 Comsumer 调用的就是这个代理类。

源码如下:

//    ReferenceConfig 类
public synchronized T get() {
    if (destroyed) {
        throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
    }
    if (ref == null) {
        init();
    }
    return ref;
}
public synchronized void init() {
        if (initialized) {
            return;
        }

        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.init();
        }

        checkAndUpdateSubConfigs();

        checkStubAndLocal(interfaceClass);
        ConfigValidationUtils.checkMock(interfaceClass, this);
        //用来解析配置文件的
        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, CONSUMER_SIDE);

        ReferenceConfigBase.appendRuntimeParameters(map);
        if (!ProtocolUtils.isGeneric(generic)) {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put(REVISION_KEY, revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
            }
        }
        map.put(INTERFACE_KEY, interfaceName);
        AbstractConfig.appendParameters(map, getMetrics());
        AbstractConfig.appendParameters(map, getApplication());
        AbstractConfig.appendParameters(map, getModule());
        // remove 'default.' prefix for configs from ConsumerConfig
        // appendParameters(map, consumer, Constants.DEFAULT_KEY);
        AbstractConfig.appendParameters(map, consumer);
        AbstractConfig.appendParameters(map, this);
        MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
        if (metadataReportConfig != null && metadataReportConfig.isValid()) {
            map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
        }
        Map<String, AsyncMethodInfo> attributes = null;
        if (CollectionUtils.isNotEmpty(getMethods())) {
            attributes = new HashMap<>();
            for (MethodConfig methodConfig : getMethods()) {
                AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
                String retryKey = methodConfig.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(methodConfig.getName() + ".retries", "0");
                    }
                }
                AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
                if (asyncMethodInfo != null) {
//                    consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
                    attributes.put(methodConfig.getName(), asyncMethodInfo);
                }
            }
        }

        String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
        if (StringUtils.isEmpty(hostToRegistry)) {
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        map.put(REGISTER_IP_KEY, hostToRegistry);

        serviceMetadata.getAttachments().putAll(map);
        //根据解析出的配置信息生成代理对象
        ref = createProxy(map);

        serviceMetadata.setTarget(ref);
        serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
        ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
        consumerModel.setProxyObject(ref);
        consumerModel.init(attributes);

        initialized = true;

        // dispatch a ReferenceConfigInitializedEvent since 2.7.4
        dispatch(new ReferenceConfigInitializedEvent(this, invoker));
    }
 @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
    private T createProxy(Map<String, String> map) {
        if (shouldJvmRefer(map)) {//判断是否是本地调用
            URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);//构建本地url
            invoker = REF_PROTOCOL.refer(interfaceClass, url);//进行本地调用
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            urls.clear();
            if (url != null && url.length() > 0) { //点对点直连的地址或者是配置中心的地址 进行处理加入urls中 user specified URL, could be peer-to-peer address, or register center's address.
                String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (StringUtils.isEmpty(url.getPath())) {
                            url = url.setPath(interfaceName);//设置接口路径
                        }
                        if (UrlUtils.isRegistry(url)) {//判断是否是注册中心地址
                            urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));//如果是注册中心地址 就把map的值放到url的refer中 然后把url放到urls中
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));//如果是点对点直连的地址就把url和map进行合并 并且把服务提供者的配置移除部分
                        }
                    }
                }
            } else { // assemble URL from register center's configuration
                // if protocols not injvm checkRegistry
                if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {//判断是否是本地
                    checkRegistry();
                    List<URL> us = ConfigValidationUtils.loadRegistries(this, false);//加载配置中心
                    if (CollectionUtils.isNotEmpty(us)) {
                        for (URL u : us) {
                            URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);//加载监控中心
                            if (monitorUrl != null) {
                                map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));//把map的值放到url的refer中 然后把url放到urls中
                        }
                    }
                    if (urls.isEmpty()) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                    }
                }
            }
            //url的协议都是registry
            if (urls.size() == 1) {
                invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));//如果只有一个url就直接转换成invoker
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(REF_PROTOCOL.refer(interfaceClass, url));//多个url 就都转成invoker
                    if (UrlUtils.isRegistry(url)) {
                        registryURL = url; // use last registry url 将最后一个注册中心的地址给registryURL
                    }
                }
                if (registryURL != null) { // registry url is available
                    // for multi-subscription scenario, use 'zone-aware' policy by default 指定cluster默认值为zone-aware策略
                    URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
                    //创建StaticDirectory实例 并由CLUSTER对多个invoker进行合并 The invoker wrap relation would be like: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                    invoker = CLUSTER.join(new StaticDirectory(u, invokers));
                } else { // not a registry url, must be direct invoke.
                    invoker = CLUSTER.join(new StaticDirectory(invokers));
                }
            }
        }

        if (shouldCheck() && !invoker.isAvailable()) {
            throw new IllegalStateException("Failed to check the status of the service "
                    + interfaceName
                    + ". No provider available for the service "
                    + (group == null ? "" : group + "/")
                    + interfaceName +
                    (version == null ? "" : ":" + version)
                    + " from the url "
                    + invoker.getUrl()
                    + " to the consumer "
                    + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        /**
         * @since 2.7.0
         * ServiceData Store
         */
        String metadata = map.get(METADATA_KEY);
        WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
        if (metadataService != null) {
            URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
            metadataService.publishServiceDefinition(consumerURL);
        }
        // create service proxy
        return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    }

在通过注册中心引入的流程中调用REF_PROTOCOL.refer时协议是 registry 因此走的是 RegistryProtocol#refer

@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    url = getRegistryUrl(url);//获取注册的协议 zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=service-consumer&dubbo=2.0.2&pid=81321&refer=application%3Dservice-consumer%26dubbo%3D2.0.2%26init%3Dfalse%26interface%3Dcom.edu.dubbo.service.HelloService%26methods%3DsayHello%26pid%3D81321%26register.ip%3D192.168.92.162%26release%3D2.7.5%26side%3Dconsumer%26sticky%3Dfalse%26timeout%3D4000%26timestamp%3D1660125465966&release=2.7.5&timestamp=1660125503448
    Registry registry = registryFactory.getRegistry(url);//获取注册中心
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // group="a,b" or group="*"
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    String group = qs.get(GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    return doRefer(cluster, registry, type, url);//生成invoker
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);//创建 directory RegistryDirectory实现了NotifyListener 接口,注册中心的监听是其处理的。 进行路由的时候也是其处理的
        directory.setRegistry(registry);//设置注册中心
        directory.setProtocol(protocol);//设置协议
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);//获取消费者需要注册的地址
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(subscribeUrl);
            registry.register(directory.getRegisteredConsumerUrl());//把消费者注册到注册中心
        }
        directory.buildRouterChain(subscribeUrl);//设置路由地址 在进行路由的时候会用到
        directory.subscribe(toSubscribeUrl(subscribeUrl));//向注册中心订阅 providers 节点、 configurators 节点 和 routers 节点,订阅了之后 RegistryDirectory 会收到这几个节点下的信息,会触发 每个服务者对应的DubboInvoker生成

        Invoker<T> invoker = cluster.join(directory);//封装directory 返回一个invoker
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }

        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, registryInvokerWrapper);//订阅服务时通知侦听器
        }
        return registryInvokerWrapper;
    }
buildRouterChain设置路由

调用了RouterChain的buildChain生成RouterChain

RegistryDirectory#buildRouterChain:

public void buildRouterChain(URL url) {
    this.setRouterChain(RouterChain.buildChain(url));
}

对于RouterChain中invokers的设置是在directory调用subscribe方法进行订阅时在调用到RegistryDirectory#refreshInvoker方法时,在refreshInvoker生成了服务者invoker之后 调用了RouterChain#setInvokers方法设置了服务者列表

RouterChain#RouterChain

private RouterChain(URL url) {
    List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
            .getActivateExtension(url, "router");//获取改地址对应的所有激活的RouterFactory

    List<Router> routers = extensionFactories.stream()
            .map(factory -> factory.getRouter(url))
            .collect(Collectors.toList());//通过RouterFactory 获取Router

    initWithRouters(routers); //保存routes
}

/**
 * the resident routers must being initialized before address notification.
 * FIXME: this method should not be public
 */
public void initWithRouters(List<Router> builtinRouters) {
    this.builtinRouters = builtinRouters;
    this.routers = new ArrayList<>(builtinRouters);
    this.sort();
}
public void setInvokers(List<Invoker<T>> invokers) {
        this.invokers = (invokers == null ? Collections.emptyList() : invokers);
        routers.forEach(router -> router.notify(this.invokers));
    }
subscribe订阅

其中directory调用subscribe方法进行订阅时会触发dubboInvoker的生成,使用zk作为注册中心时的调用流程大致如下:

RegistryDirectory#subscribe–>FailbackRegistry#subscribe–>ZookeeperRegistry#doSubscribe(会获取多个服务提供者的地址)–>FailbackRegistry#notify–>FailbackRegistry#doNotify–>AbstractRegistry#notify–>RegistryDirectory#notify–>RegistryDirectory#refreshOverrideAndInvoker–>RegistryDirectory#refreshInvoker–>RegistryDirectory#toInvokers(根据多个服务提供者地址生成对应invoker)–>AbstractProtocol#refer–>DubboProtocol#protocolBindingRefer

RegistryDirectory#toInvokers

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
    if (urls == null || urls.isEmpty()) {
        return newUrlInvokerMap;
    }
    Set<String> keys = new HashSet<>();
    String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
    for (URL providerUrl : urls) {
        // If protocol is configured at the reference side, only the matching protocol is selected
        if (queryProtocols != null && queryProtocols.length() > 0) {
            boolean accept = false;
            String[] acceptProtocols = queryProtocols.split(",");
            for (String acceptProtocol : acceptProtocols) {
                if (providerUrl.getProtocol().equals(acceptProtocol)) {
                    accept = true;
                    break;
                }
            }
            if (!accept) {
                continue;
            }
        }
        if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
            continue;
        }
        if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
            logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
                    " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
                    " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                    ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
            continue;
        }
        URL url = mergeUrl(providerUrl);

        String key = url.toFullString(); // The parameter urls are sorted
        if (keys.contains(key)) { // Repeated url
            continue;
        }
        keys.add(key);
        // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
        Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        if (invoker == null) { // Not in the cache, refer again
            try {
                boolean enabled = true;
                if (url.hasParameter(DISABLED_KEY)) {
                    enabled = !url.getParameter(DISABLED_KEY, false);
                } else {
                    enabled = url.getParameter(ENABLED_KEY, true);
                }
                if (enabled) {//获取invoker
                    invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
                }
            } catch (Throwable t) {
                logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
            }
            if (invoker != null) { // Put new invoker in cache
                newUrlInvokerMap.put(key, invoker);
            }
        } else {
            newUrlInvokerMap.put(key, invoker);
        }
    }
    keys.clear();
    return newUrlInvokerMap;
}

DubboProtocol#protocolBindingRefer

@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);

    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);

    return invoker;
}
private ExchangeClient[] getClients(URL url) {
        // whether to share connection

        boolean useShareConnect = false;

        int connections = url.getParameter(CONNECTIONS_KEY, 0);
        List<ReferenceCountExchangeClient> shareClients = null;
        // if not configured, connection is shared, otherwise, one connection for one service
        if (connections == 0) {
            useShareConnect = true;

            /*
             * The xml configuration should have a higher priority than properties.
             */
            String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                    DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
            shareClients = getSharedClient(url, connections);
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (useShareConnect) {
                clients[i] = shareClients.get(i);//默认是走共享客户端 通过远程地址找 client 

            } else {
                clients[i] = initClient(url);//初始化新的客户端
            }
        }

        return clients;
    }
private ExchangeClient initClient(URL url) {

        // client type setting. 获取客户端 默认是 netty
        String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
        //添加编解码
        url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
        // enable heartbeat by default 添加心跳
        url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));

        // BIO is not allowed since it has severe performance issue. 检查客户端是否合法
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        ExchangeClient client;
        try {
            // connection should be lazy 懒加载
            if (url.getParameter(LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);

            } else {
                client = Exchangers.connect(url, requestHandler);//连接远程
            }

        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }

        return client;
    }
join封装directory
private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) {
    AbstractClusterInvoker<T> last = clusterInvoker;
    List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtension(clusterInvoker.getUrl(), key);// 获取所有的拦截器

    if (!interceptors.isEmpty()) {
        for (int i = interceptors.size() - 1; i >= 0; i--) {
            final ClusterInterceptor interceptor = interceptors.get(i);// 对拦截器进行一层封装
            final AbstractClusterInvoker<T> next = last;
            last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next);
        }
    }
    return last;
}

@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY));
}

其中doJoin默认调的是FailoverCluster#doJoin 生成一个FailoverClusterInvoker

在集群容错时doInvoke对应的实现类就是doJoin生成的AbstractClusterInvoker

@Override
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
    return new FailoverClusterInvoker<>(directory);
}
服务缓存

Dubbo在订阅注册中心的回调处理逻辑当中会保存服务提供者信息到本地缓存文件当中(同步/异步两种方式),以URL纬度进行全量保存。

Dubbo在服务引用过程中会创建registry对象并加载本地缓存文件,会优先订阅注册中心,订阅注册中心失败后会访问本地缓存文件内容获取服务提供信息。

AbstractRegistry类的初始化方法会创建对象

public AbstractRegistry(URL url) {
    setUrl(url);
    if (url.getParameter(REGISTRY__LOCAL_FILE_CACHE_ENABLED, true)) {
        // Start file save timer
        syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false);
        // 默认保存路径(home/.dubbo/dubbo-registry-appName-address-port.cache)
        String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache";
        String filename = url.getParameter(FILE_KEY, defaultFilename);
        // 创建文件
        File file = null;
        if (ConfigUtils.isNotEmpty(filename)) {
            file = new File(filename);
            if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                if (!file.getParentFile().mkdirs()) {
                    throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                }
            }
        }
        this.file = file;
        // When starting the subscription center,
        // we need to read the local cache file for future Registry fault tolerance processing.  加载已有的配置文件
        loadProperties();
        notify(url.getBackupUrls());
    }
}

notify会调到AbstractRegistry类的saveProperties方法会保存数据

private void saveProperties(URL url) {
    if (file == null) {
        return;
    }

    try {
        StringBuilder buf = new StringBuilder();
        Map<String, List<URL>> categoryNotified = notified.get(url);// 获取所有通知到的地址
        if (categoryNotified != null) {
            for (List<URL> us : categoryNotified.values()) {
                for (URL u : us) {// 多个地址进行拼接
                    if (buf.length() > 0) {
                        buf.append(URL_SEPARATOR);
                    }
                    buf.append(u.toFullString());
                }
            }
        }
        properties.setProperty(url.getServiceKey(), buf.toString());// 保存数据
        long version = lastCacheChanged.incrementAndGet();// 版本号+1 可以保证之后保存的记录,在重试的时候,不会重试之前的版本
        if (syncSaveFile) {//同步处理还是异步
            doSaveProperties(version);
        } else {
            registryCacheExecutor.execute(new SaveProperties(version));
        }
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
}

doSaveProperties是保存数据到文件的关键方法

public void doSaveProperties(long version) {
    if (version < lastCacheChanged.get()) {
        return;
    }
    if (file == null) {
        return;
    }
    // Save
    try {
        File lockfile = new File(file.getAbsolutePath() + ".lock");// 文件锁,保证只有一个线程能进行读取操作
        if (!lockfile.exists()) {
            lockfile.createNewFile();
        }
        try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
             FileChannel channel = raf.getChannel()) {
            FileLock lock = channel.tryLock();// 利用文件锁来保证并发的执行的情况下,只会有一个线程执行成功(原因在于可能是跨 VM的)
            if (lock == null) {
                throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
            }
            // Save
            try {
                if (!file.exists()) {
                    file.createNewFile();
                }
                try (FileOutputStream outputFile = new FileOutputStream(file)) {
                    properties.store(outputFile, "Dubbo Registry Cache");// 保存新的配置
                }
            } finally {
                lock.release();// 释放文件锁
            }
        }
    } catch (Throwable e) {// 执行出现错误时,则交给专门的线程去进行重试
        savePropertiesRetryTimes.incrementAndGet();
        if (savePropertiesRetryTimes.get() >= MAX_RETRY_TIMES_SAVE_PROPERTIES) {
            logger.warn("Failed to save registry cache file after retrying " + MAX_RETRY_TIMES_SAVE_PROPERTIES + " times, cause: " + e.getMessage(), e);
            savePropertiesRetryTimes.set(0);
            return;
        }
        if (version < lastCacheChanged.get()) {
            savePropertiesRetryTimes.set(0);
            return;
        } else {
            registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
        }
        logger.warn("Failed to save registry cache file, will retry, cause: " + e.getMessage(), e);
    }
}

集群容错

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

集群工作过程可分为两个阶段

第一个阶段是在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。

第二个阶段是在服务消费者进行远程调用时。以 FailoverClusterInvoker 为例,该类型 Cluster Invoker 首先会调用 Directory 的 list 方法列举Invoker 列表(可将 Invoker 简单理解为服务提供者)。Directory 的用途是保存 Invoker列表,可简单类比为 List。其实现类 RegistryDirectory 是一个动态服务目录,可感知注册中心配置的变化,它所持有的 Invoker 列表会随着注册中心内容的变化而变化。每次变化后,RegistryDirectory 会动态增删Invoker,并调用 Router 的 route 方法进行路由,过滤掉不符合路由规则的 Invoker。当FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表后,它会通过 LoadBalance 从 Invoker 列表中选择一个 Invoker。最后 FailoverClusterInvoker 会将参数传给 LoadBalance 选择出的 Invoker实例的 invoke 方法,进行真正的远程调用

Dubbo 主要提供了这样几种容错方式:

  • Failover Cluster - 失败自动切换,失败时会重试其它服务器

  • Failfast Cluster - 快速失败,请求失败后快速返回异常结果不重试

  • Failsafe Cluster - 失败安全,出现异常直接忽略会对请求做负载均衡

  • Failback Cluster - 失败自动恢复,请求失败后会自动记录请求到失败队列中

  • Forking Cluster - 并行调用多个服务提供者 其中有一个返回 则立即返回结果

消费者在进行调用时经过层层调用最后会进入.AbstractCluster.InterceptorInvokerNode#invoke方法中,调用栈和invoker方法如下:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult;
    try {
        interceptor.before(next, invocation);
        asyncResult = interceptor.intercept(next, invocation);
    } catch (Exception e) {
        // onError callback
        if (interceptor instanceof ClusterInterceptor.Listener) {
            ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
            listener.onError(e, clusterInvoker, invocation);
        }
        throw e;
    } finally {
        interceptor.after(next, invocation);
    }
    return asyncResult.whenCompleteWithContext((r, t) -> {
        // onResponse callback
        if (interceptor instanceof ClusterInterceptor.Listener) {
            ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
            if (t == null) {
                listener.onMessage(r, clusterInvoker, invocation);
            } else {
                listener.onError(t, clusterInvoker, invocation);
            }
        }
    });
}

invoke中会进行拦截器链的调用,最终后调到.AbstractClusterInvoker#invoke方法中

@Override
public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();// 检查是否已经关闭了

    // binding attachments into invocation. 把RPCContext中的附加信息加到当前的invocation中
    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
    }

    List<Invoker<T>> invokers = list(invocation);// 进行路由,找出路由后所有的invoker
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);//获取负载均衡器
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);//适配异步请求
    return doInvoke(invocation, invokers, loadbalance);// 处理请求
}
list

进行路由,找出路由后所有的invoker,会调用directory中的list方法,最终会调用RegistryDirectory#doList

protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
    return directory.list(invocation);
}

会调用directory中的list方法

public List<Invoker<T>> doList(Invocation invocation) {
    if (forbidden) {
        // 1. No service provider 2. Service providers are disabled
        throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
                getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
                NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
                ", please check status of providers(disabled, not registered or in blacklist).");
    }

    if (multiGroup) {
        return this.invokers == null ? Collections.emptyList() : this.invokers;
    }

    List<Invoker<T>> invokers = null;
    try {
        // Get invokers from cache, only runtime routers will be executed. 获取路由后的服务者列表
        invokers = routerChain.route(getConsumerUrl(), invocation);
    } catch (Throwable t) {
        logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
    }

    return invokers == null ? Collections.emptyList() : invokers;
}

RouterChain#route

public List<Invoker<T>> route(URL url, Invocation invocation) {
    List<Invoker<T>> finalInvokers = invokers;
    for (Router router : routers) {//根据路由链进行路由
        finalInvokers = router.route(finalInvokers, url, invocation);
    }
    return finalInvokers;
}
initLoadBalance

主要是获取负载均衡器,默认是随机类型

protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
    if (CollectionUtils.isNotEmpty(invokers)) {
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
    } else {
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
    }
}
doInvoke

默认是调用FailoverClusterInvoker#doInvoke

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    checkInvokers(copyInvokers, invocation);// 判断invokers是否为空
    String methodName = RpcUtils.getMethodName(invocation);
    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;//获取重试次数 比如设置了重试2次 +1后进行最多调3次
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers. 保存当前执行过的invoker
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        if (i > 0) {
            checkWhetherDestroyed();//重试时 每次都执行一次是否关闭当前consumer的判断
            copyInvokers = list(invocation);//重试时要重新路由
            // check again
            checkInvokers(copyInvokers, invocation);//重试时要重新校验invokers是否为空
        }
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);// 通过负载均衡选择具体的invoker
        invoked.add(invoker);//保存已经执行过的invoker
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            Result result = invoker.invoke(invocation);//invoker执行
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " + methodName
                        + " in the service " + getInterface().getName()
                        + " was successful by the provider " + invoker.getUrl().getAddress()
                        + ", but there have been failed providers " + providers
                        + " (" + providers.size() + "/" + copyInvokers.size()
                        + ") from the registry " + directory.getUrl().getAddress()
                        + " on the consumer " + NetUtils.getLocalHost()
                        + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                        + le.getMessage(), le);
            }
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception. 业务异常直接抛出
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException(le.getCode(), "Failed to invoke the method "
            + methodName + " in the service " + getInterface().getName()
            + ". Tried " + len + " times of the providers " + providers
            + " (" + providers.size() + "/" + copyInvokers.size()
            + ") from the registry " + directory.getUrl().getAddress()
            + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
            + Version.getVersion() + ". Last error is: "
            + le.getMessage(), le.getCause() != null ? le.getCause() : le);//一直没成功就报异常
}
select进行负载均衡

select调到AbstractClusterInvoker#select中

protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName();

    boolean sticky = invokers.get(0).getUrl()
            .getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);

    //ignore overloaded method
    if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
        stickyInvoker = null;
    }
    //ignore concurrency problem
    if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
        if (availablecheck && stickyInvoker.isAvailable()) {
            return stickyInvoker;
        }
    }

    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);//进行负载均衡

    if (sticky) {
        stickyInvoker = invoker;
    }
    return invoker;
}

private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    if (invokers.size() == 1) {
        return invokers.get(0);
    }
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

    //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
    if ((selected != null && selected.contains(invoker))
            || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
        try {
            Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if (rInvoker != null) {
                invoker = rInvoker;
            } else {
                //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                int index = invokers.indexOf(invoker);
                try {
                    //Avoid collision
                    invoker = invokers.get((index + 1) % invokers.size());//进行负载均衡
                } catch (Exception e) {
                    logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                }
            }
        } catch (Throwable t) {
            logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
        }
    }
    return invoker;
}

loadbalance.select会调到AbstractLoadBalance#select

@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    if (invokers.size() == 1) {
        return invokers.get(0);
    }
    return doSelect(invokers, url, invocation);//根据容错的策略执行对应的策略
}

protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
invoke

invoker的调用在AbstractInvoker#invoke

@Override
public Result invoke(Invocation inv) throws RpcException {
    // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
    if (destroyed.get()) {// 判断系统是否已经关闭
        logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
    }
    RpcInvocation invocation = (RpcInvocation) inv;
    invocation.setInvoker(this);
    if (CollectionUtils.isNotEmptyMap(attachment)) {
        invocation.addObjectAttachmentsIfAbsent(attachment);
    }
    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
    if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
        /**
         * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
         * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
         * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
         * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
         */
        invocation.addObjectAttachments(contextAttachments);
    }

    invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));// 设置执行的模式
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);// 设置执行id,主要用于适配异步模式使用

    AsyncRpcResult asyncResult;
    try {
        asyncResult = (AsyncRpcResult) doInvoke(invocation);//执行
    } catch (InvocationTargetException e) { // biz exception
        Throwable te = e.getTargetException();//业务异常
        if (te == null) {
            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        } else {
            if (te instanceof RpcException) {
                ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
            }
            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
        }
    } catch (RpcException e) {//rpc异常
        if (e.isBiz()) {
            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        } else {
            throw e;
        }
    } catch (Throwable e) {
        asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
    }
    RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));//设置执行结果
    return asyncResult;
}

最终会调到DubboInvoker#doInvoke

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);

    ExchangeClient currentClient;// 客户端
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);// 是否有返回值,没有返回值也就是相当于发送了一个指令,不在乎服务端的返回 通常适用于异步请求
        int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);// 获取超时的配置
        if (isOneway) {// 不需要返回值信息(异步)
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);// 发送命令
            return AsyncRpcResult.newDefaultAsyncResult(invocation);// 告知为异步的结果
        } else {
            ExecutorService executor = getCallbackExecutor(getUrl(), inv);// 获取真正执行的线程池(ThreadPool中的SPI)
            CompletableFuture<AppResponse> appResponseFuture =
                    currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);// 发送请求并且等待结果
            // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter 设置完成的结果信息
            FutureContext.getContext().setCompatibleFuture(appResponseFuture);
            AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);// 创建新的结果信息并且返回
            result.setExecutor(executor);
            return result;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

对于需要返回值的请求是调用的HeaderExchangeClient#request

@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
    return channel.request(request, timeout, executor);
}

最后会调到HeaderExchangeChannel#request

@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request. // 创建一个新的request对象
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);// 创建一个执行结果的回调信息处理
    try {
        channel.send(req);// 交给真正的业务渠道进行处理 渠道是Transporter这个SPI进行创建的 默认就是Netty
    } catch (RemotingException e) {
        future.cancel();// 请求出现异常则取消当前的请求封装
        throw e;
    }
    return future;
}

网络

数据包

dubbo协议采用固定长度的消息头(16字节)和不定长度的消息体来进行数据传输,消息头定义了底层框架(netty)在IO线程处理时需要的信息,协议的报文格式如下:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

协议详情
  • Magic- Magic High & Magic Low (16 bits)

    标识协议版本号,Dubbo 协议:0xdabb

  • Serialization ID (5 bit)

    标识序列化类型:比如 fastjson 的值为6。

  • Event (1 bit)

    标识是否是事件消息,例如,心跳事件。如果这是一个事件,则设置为1。

  • Two Way (1 bit)

    仅在 Req/Res 为1(请求)时才有用,标记是否期望从服务器返回值。如果需要来自服务器的返回值,则设置为1。

  • Req/Res (1 bit)

    标识是请求或响应。请求: 1; 响应: 0。

  • Status (8 bits)

    仅在 Req/Res 为0(响应)时有用,用于标识响应的状态。

    • 20 - OK
    • 30 - CLIENT_TIMEOUT
    • 31 - SERVER_TIMEOUT
    • 40 - BAD_REQUEST
    • 50 - BAD_RESPONSE
    • 60 - SERVICE_NOT_FOUND
    • 70 - SERVICE_ERROR
    • 80 - SERVER_ERROR
    • 90 - CLIENT_ERROR
    • 100 - SERVER_THREADPOOL_EXHAUSTED_ERROR
  • Request ID

    (64 bits)标识唯一请求。类型为long。

  • Data Length (32 bits)

    序列化后的内容长度(可变部分),按字节计数。int类型。

  • Variable Part(消息体)

    被特定的序列化类型(由序列化 ID 标识)序列化后,每个部分都是一个 byte [] 或者 byte

    • 如果是请求包 ( Req/Res = 1),则每个部分依次为:

      ​ Dubbo version

      ​ Service name

      ​ Service version

      ​ Method name

      ​ Method parameter types

      ​ Method arguments

      ​ Attachments

    • 如果是响应包(Req/Res = 0),则每个部分依次为:

      ​ 返回值类型(byte),用来标识从服务器端返回的值类型:

      ​ 返回空值:RESPONSE_NULL_VALUE 2

      ​ 正常响应值: RESPONSE_VALUE 1

      ​ 异常:RESPONSE_WITH_EXCEPTION 0

      ​ 返回值

      ​ 从服务端返回的响应bytes

**注意:**对于(Variable Part)可变部分,当前版本的Dubbo 框架使用json序列化时,在每部分内容间额外增加了换行符作为分隔

Dubbo version bytes (换行符)

Service name bytes (换行符)

优点
  • 协议设计上很紧凑,能用 1 个 bit 表示的,不会用一个 byte 来表示,比如 boolean 类型的标识。

  • 请求、响应的 header 一致,通过序列化器对 content 组装特定的内容,代码实现起来简单

缺点
  • 类似于 http 请求,通过 header 就可以确定要访问的资源,而 Dubbo 需要涉及到用特定序列化协议才可以将服务名、方法、方法签名解析出来,并且这些资源定位符是 string 类型或者 string数组,很容易转成 bytes,因此可以组装到 header 中。类似于 http2 的 header 压缩,对于 rpc 调用的资源也可以协商出来一个int来标识,从而提升性能,如果在 header 上组装资源定位符的话,该功能则更易实现。

  • 通过 req/res 是否是请求后,可以精细定制协议,去掉一些不需要的标识和添加一些特定的标识。比如 status , twoWay 标识可以严格定制,去掉冗余标识。还有超时时间是作为 Dubbo 的 attachment 进行传输的,理论上应该放到请求协议的header中,因为超时是网络请求中必不可少的。提到 attachment ,通过实现可以看到 attachment 中有一些是跟协议 content 中已有的字段是重复的,比如 path 和 version 等字段,这些会增大协议尺寸。

  • Dubbo 会将服务名com.alibaba.middleware.hsf.guide.api.param.ModifyOrderPriceParam ,转换为Lcom/alibaba/middleware/hsf/guide/api/param/ModifyOrderPriceParam; ,理论上是不必要的,最后追加一个 ; 即可。

  • Dubbo 协议没有预留扩展字段,没法新增标识,扩展性不太好,比如新增 响应上下文 的功能,只有改协议版本号的方式,但是这样要求客户端和服务端的版本都进行升级,对于分布式场景很不友好。

数据协议ExchangeCodec

ExchangeCodec 类,这个也是Dubbo在进行数据传输中的数据协议类。

类常量定义
// header length. 请求头长度
protected static final int HEADER_LENGTH = 16;
// magic header.
protected static final short MAGIC = (short) 0xdabb;
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
// message flag. 消息中的内容
protected static final byte FLAG_REQUEST = (byte) 0x80;
protected static final byte FLAG_TWOWAY = (byte) 0x40;
protected static final byte FLAG_EVENT = (byte) 0x20;
protected static final int SERIALIZATION_MASK = 0x1f;
private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);
encode
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
    if (msg instanceof Request) {
        encodeRequest(channel, buffer, (Request) msg);//处理请求
    } else if (msg instanceof Response) {
        encodeResponse(channel, buffer, (Response) msg);//处理响应
    } else {
        super.encode(channel, buffer, msg);
    }
}
encodeRequest
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
    Serialization serialization = getSerialization(channel);//请求的序列化类型
    // header. 请求头
    byte[] header = new byte[HEADER_LENGTH];
    // set magic number. 写入magic
    Bytes.short2bytes(MAGIC, header);

    // set request and serialization flag. 标记 是否是请求和序列化id
    header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

    if (req.isTwoWay()) {// 标记是否期望从服务器返回值(异步)
        header[2] |= FLAG_TWOWAY;
    }
    if (req.isEvent()) {//标识是否是事件消息
        header[2] |= FLAG_EVENT;
    }

    // set request id. 设置当前的请求ID
    Bytes.long2bytes(req.getId(), header, 4);

    // encode request data. 保存当前写入的位置,将其写入的位置往后面偏移,保留出写入内容大小的位置,先进行写入body 内容
    int savedWriteIndex = buffer.writerIndex();
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
    if (req.isEvent()) {
        encodeEventData(channel, out, req.getData());//是事件消息
    } else {
        encodeRequestData(channel, out, req.getData(), req.getVersion());//写入请求内容
    }
    out.flushBuffer();
    if (out instanceof Cleanable) {
        ((Cleanable) out).cleanup();
    }
    bos.flush();
    bos.close();
    int len = bos.writtenBytes();// 记录body中写入的长度
    checkPayload(channel, len);
    Bytes.int2bytes(len, header, 12);// 将其写入到header中的位置中

    // write 把写入的位置切回来
    buffer.writerIndex(savedWriteIndex);
    buffer.writeBytes(header); // write header. 写入请求头
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);//设置写入位置
}

DubboCodec#encodeRequestData

@Override
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
    RpcInvocation inv = (RpcInvocation) data;

    out.writeUTF(version);// 写入版本
    out.writeUTF(inv.getAttachment(PATH_KEY));// 写入接口全名称
    out.writeUTF(inv.getAttachment(VERSION_KEY));// 写入接口版本号

    out.writeUTF(inv.getMethodName());// 写入方法名称
    out.writeUTF(inv.getParameterTypesDesc());// 写入参数描述
    Object[] args = inv.getArguments();// 写入所有参数
    if (args != null) {
        for (int i = 0; i < args.length; i++) {
            out.writeObject(encodeInvocationArgument(channel, inv, i));
        }
    }
    out.writeAttachments(inv.getObjectAttachments());// 写入所有的附加信息
}
encodeResponse

和encodeRequest基本一样

protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
        int savedWriteIndex = buffer.writerIndex();
        try {
            Serialization serialization = getSerialization(channel);
            // header.
            byte[] header = new byte[HEADER_LENGTH];
            // set magic number.
            Bytes.short2bytes(MAGIC, header);
            // set request and serialization flag.
            header[2] = serialization.getContentTypeId();
            if (res.isHeartbeat()) {
                header[2] |= FLAG_EVENT;
            }
            // set response status. 写入状态码
            byte status = res.getStatus();
            header[3] = status;
            // set request id.
            Bytes.long2bytes(res.getId(), header, 4);

            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
            ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
            // encode response data or error message.
            if (status == Response.OK) {
                if (res.isHeartbeat()) {
                    encodeEventData(channel, out, res.getResult());
                } else {
                    encodeResponseData(channel, out, res.getResult(), res.getVersion());
                }
            } else {
                out.writeUTF(res.getErrorMessage());// 如果错误,则直接将错误信息写入,不需要再交由序列化
            }
            out.flushBuffer();
            if (out instanceof Cleanable) {
                ((Cleanable) out).cleanup();
            }
            bos.flush();
            bos.close();

            int len = bos.writtenBytes();
            checkPayload(channel, len);
            Bytes.int2bytes(len, header, 12);
            // write
            buffer.writerIndex(savedWriteIndex);
            buffer.writeBytes(header); // write header.
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
        } catch (Throwable t) {
            // clear buffer
            buffer.writerIndex(savedWriteIndex);
            // send error message to Consumer, otherwise, Consumer will wait till timeout.
            if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
                Response r = new Response(res.getId(), res.getVersion());
                r.setStatus(Response.BAD_RESPONSE);

                if (t instanceof ExceedPayloadLimitException) {// 如果是超过内容长度则重新设置内容大小并写入
                    logger.warn(t.getMessage(), t);
                    try {
                        r.setErrorMessage(t.getMessage());
                        channel.send(r);
                        return;
                    } catch (RemotingException e) {
                        logger.warn("Failed to send bad_response info back: " + t.getMessage() + ", cause: " + e.getMessage(), e);
                    }
                } else {
                    // FIXME log error message in Codec and handle in caught() of IoHanndler?
                    logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);
                    try {
                        r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
                        channel.send(r);
                        return;
                    } catch (RemotingException e) {
                        logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);
                    }
                }
            }

            // Rethrow exception
            if (t instanceof IOException) {
                throw (IOException) t;
            } else if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else if (t instanceof Error) {
                throw (Error) t;
            } else {
                throw new RuntimeException(t.getMessage(), t);
            }
        }
    }

DubboCodec#encodeResponseData

@Override
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
    Result result = (Result) data;
    // currently, the version value in Response records the version of Request  是否支持返回attachment参数
    boolean attach = Version.isSupportResponseAttachment(version);
    Throwable th = result.getException();
    if (th == null) {
        Object ret = result.getValue();// 如果没有异常信息,则直接写入内容
        if (ret == null) {
            out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE);
        } else {
            out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE);
            out.writeObject(ret);
        }
    } else {// 否则的话则将异常信息序列写入
        out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION);
        out.writeThrowable(th);
    }

    if (attach) {// 支持写入attachment,则写入
        // returns current version of Response to consumer side.
        result.getObjectAttachments().put(DUBBO_VERSION_KEY, Version.getProtocolVersion());
        out.writeAttachments(result.getObjectAttachments());
    }
}
decode
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int readable = buffer.readableBytes();// 可读字节数
    byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];// 选取可读字节数 和 HEADER_LENGTH 中小的
    buffer.readBytes(header);
    return decode(channel, buffer, readable, header);
}

@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
    // check magic number.
    if (readable > 0 && header[0] != MAGIC_HIGH
            || readable > 1 && header[1] != MAGIC_LOW) {// 检查魔数
        int length = header.length;
        if (header.length < readable) {
            header = Bytes.copyOf(header, readable);
            buffer.readBytes(header, length, readable - length);
        }
        for (int i = 1; i < header.length - 1; i++) {
            if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                buffer.readerIndex(buffer.readerIndex() - header.length + i);
                header = Bytes.copyOf(header, i);
                break;
            }
        }
        return super.decode(channel, buffer, readable, header);
    }
    // check length. // check length. 不完整的包 需要继续读取
    if (readable < HEADER_LENGTH) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // get data length.  获取数据长度
    int len = Bytes.bytes2int(header, 12);
    checkPayload(channel, len);

    int tt = len + HEADER_LENGTH;
    if (readable < tt) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // limit input stream.
    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

    try {
        return decodeBody(channel, is, header);// 解码数据
    } finally {
        if (is.available() > 0) {
            try {
                if (logger.isWarnEnabled()) {
                    logger.warn("Skip input stream " + is.available());
                }
                StreamUtils.skipUnusedStream(is);
            } catch (IOException e) {
                logger.warn(e.getMessage(), e);
            }
        }
    }
}

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
    byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
    // get request id. 获取请求ID
    long id = Bytes.bytes2long(header, 4);
    if ((flag & FLAG_REQUEST) == 0) {//判断是请求还是响应
        // decode response. 响应
        Response res = new Response(id);
        if ((flag & FLAG_EVENT) != 0) {// 是否是event事件
            res.setEvent(true);
        }
        // get status. 获取请求的状态码
        byte status = header[3];
        res.setStatus(status);
        try {
            ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);// 进行数据内容解析
            if (status == Response.OK) {
                Object data;
                if (res.isHeartbeat()) {// 根据不同的类型来进行解析
                    data = decodeHeartbeatData(channel, in);
                } else if (res.isEvent()) {
                    data = decodeEventData(channel, in);
                } else {
                    data = decodeResponseData(channel, in, getRequestData(id));//解析body
                }
                res.setResult(data);
            } else {
                res.setErrorMessage(in.readUTF());
            }
        } catch (Throwable t) {
            res.setStatus(Response.CLIENT_ERROR);
            res.setErrorMessage(StringUtils.toString(t));
        }
        return res;
    } else {
        // decode request. 请求
        Request req = new Request(id);
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay((flag & FLAG_TWOWAY) != 0);
        if ((flag & FLAG_EVENT) != 0) {
            req.setEvent(true);
        }
        try {
            ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);//进行内容解析
            Object data;
            if (req.isHeartbeat()) {
                data = decodeHeartbeatData(channel, in);
            } else if (req.isEvent()) {
                data = decodeEventData(channel, in);
            } else {
                data = decodeRequestData(channel, in);
            }
            req.setData(data);
        } catch (Throwable t) {
            // bad request
            req.setBroken(true);
            req.setData(t);
        }
        return req;
    }
}
处理拆包和粘包问题
拆包

当发生TCP拆包问题时候 这里假设之前还没有发生过任何数据交互,系统刚刚初始化好,那么这个时候在 InternalDecoder里面的buffer属性会是EMPTY_BUFFER。当发生第一次inbound数据的时候,第一次在InternalDecoder里面接收的肯定是dubbo消息头的部分(这个由TCP协议保证),由于发生了拆包情 况,那么此时接收的inbound消息可能存在一下几种情况

  • 当前inbound消息只包含dubbo协议头的一部分
  • 当前inbound消息只包含dubbo的协议头
  • 当前inbound消息只包含dubbo消息头和部分消息体

发生上面三种情况,都会触发ExchangeCodec返回NEED_MORE_INPUT,由于 在DubboCountCodec对于返回NEED_MORE_INPUT会回滚读索引,所以此时的buffer里面的数据可以当作并没有发生过读取操作,并且DubboCountCodec的decode也会返回NEED_MORE_INPUT

@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int save = buffer.readerIndex();
    MultiMessage result = MultiMessage.create();
    do {
        Object obj = codec.decode(channel, buffer);
        if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
            buffer.readerIndex(save);//回滚索引
            break;
        } else {
            result.addMessage(obj);
            logMessageLength(obj, buffer.readerIndex() - save);
            save = buffer.readerIndex();
        }
    } while (true);
    if (result.isEmpty()) {
        return Codec2.DecodeResult.NEED_MORE_INPUT;
    }
    if (result.size() == 1) {
        return result.get(0);
    }
    return result;
}

在 InternalDecoder对于当判断返回NEED_MORE_INPUT,也会进行读索引回滚,并且退出循环,最后会执行finally内容,这里会判断inbound消息是否还有可读的,由于在DubboCountCodec里面进行了读索引回滚,所以此时的buffer里面不是完整的inbound消息,等待第二次的inbound消息的到来,当第二次 inbound消息过来的时候,再次经过判断。

@Override
public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception {
    int readable = in.limit();
    if (readable <= 0) {
        return;
    }

    ChannelBuffer frame;

    if (buffer.readable()) {//如果buffer可读
        if (buffer instanceof DynamicChannelBuffer) {
            buffer.writeBytes(in.buf());
            frame = buffer;
        } else {//把buffer中的数据也读出来
            int size = buffer.readableBytes() + in.remaining();
            frame = ChannelBuffers.dynamicBuffer(size > bufferSize ? size : bufferSize);
            frame.writeBytes(buffer, buffer.readableBytes());
            frame.writeBytes(in.buf());
        }
    } else {
        frame = ChannelBuffers.wrappedBuffer(in.buf());
    }

    Channel channel = MinaChannel.getOrAddChannel(session, url, handler);
    Object msg;
    int savedReadIndex;

    try {
        do {
            savedReadIndex = frame.readerIndex();
            try {
                msg = codec.decode(channel, frame);
            } catch (Exception e) {
                buffer = ChannelBuffers.EMPTY_BUFFER;
                throw e;
            }
            if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                frame.readerIndex(savedReadIndex);// 回滚索引
                break;
            } else {
                if (savedReadIndex == frame.readerIndex()) {
                    buffer = ChannelBuffers.EMPTY_BUFFER;
                    throw new Exception("Decode without read data.");
                }
                if (msg != null) {
                    out.write(msg);
                }
            }
        } while (frame.readable());
    } finally {
        if (frame.readable()) {// 判断消息是否可读
            frame.discardReadBytes();
            buffer = frame;
        } else {
            buffer = ChannelBuffers.EMPTY_BUFFER;
        }
        MinaChannel.removeChannelIfDisconnected(session);
    }
}
粘包

当发生TCP粘包的时候 是tcp将几个dubbo协议栈放在一个tcp包中,那么有可能发生下面几种情况

  • 当前inbound消息只包含一个dubbo协议栈
  • 当前inbound消息包含一个dubbo协议栈,同时包含部分另一个或者多个dubbo协议栈内容
    • 如果发生只包含一个协议栈,那么当前buffer通过ExchangeCodec解析协议之后,当前的buffer的 readeIndex位置应该是buffer尾部,那么在返回到InternalDecoder中message的方法readable返回的是false,那么就会对buffer重新赋予EMPTY_BUFFER实体
    • 如果包含一个以上的dubbo协议栈,当也会解析出其中一个dubbo协议栈,但是经过ExchangeCodec解析之后,frame的readIndex不在 buffer尾部,所以frame的readable方法返回的是true。那么则会继续遍历frame,读取下面的信息。最终要么frame刚好整数倍包含完整的dubbo协议栈,要不ExchangeCodec返回NEED_MORE_INPUT,最后将未读完的数据缓存到buffer中,等待下次inbound事件,将buffer中的消息合并到下次的inbound消息中,就又回到了拆包的问题上。

dubbo在处理tcp的粘包和拆包时是借助InternalDecoder的buffer缓存对象来缓存不完整的dubbo协议栈数据,等待下次inbound事件,合并进去。所以说在dubbo中解决TCP拆包和粘包的时候是通过buffer 变量来解决的。


原文地址:https://blog.csdn.net/DingKG/article/details/140515910

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!