自学内容网 自学内容网

消息中间件之RocketMQ源码分析(十二)

Namesrv启动流程

Broker启动流程
BrokerStartup.java类主要负责为真正的启动过程做准备,解析脚本传过来的参数,初始化Broker配置,创建BrokerController实例等工作。BrokerController.java类是Broker的掌控者,它管理和控制Broker的各个模块,包含通信模块、存储模块、索引模块、定时任务等。在BrokerController全部模块初始化并启动成功后,将在日志中输出info信息"boot success"

在这里插入图片描述

第一步:初始化启动环境

这是由./bin/mqbroker和./bin/runbroker.sh两个脚本来完成的,/bin/mqbroker脚本主要用于设置RocketMQ根目录环境变量

if [ -z "$ROCKETMQ_HOME" ] ; then
 ....
fi
export ROCKETMQ_HOME
sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@

./bin/runbroker.sh脚本的主要功能是检测JDK的环境配置和JVM的参数配置。JDK的环境配置的检查逻辑的实现代码如下:


[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=.:${BASE_DIR}/conf:${BASE_DIR}/lib/*:${CLASSPATH}

下面是JVM的参数配置,通常-Xms -Xmx -Xmn -XX:MaxDirectMemorySize这四个参数会随着部署RocketMQ服务器的物理内存大小的变化而进行相应的改变

choose_gc_options()
{
    JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | head -1 | cut -d'"' -f2 | sed 's/^1\.//' | cut -d'.' -f1)
    if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "8" ] ; then
      JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
    else
      JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
    fi

    if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then
      JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
      JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
    else
      JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
      JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"
    fi
}

choose_gc_log_directory

JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
choose_gc_options
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

第二步:初始化BrokerController

该初始化主要包含ROcketMQ启动命令行参数解析、Broker各个模块配置参数解析、Broker各个模块初始化、进程关机Hook初始化等过程.

RocketMQ启动命令行参数解析。其代码在BrokerStartup.createBrokerController()方法中,RocketMQ的启动参数支持启动命令指定,也可以在配置文件中进行配置。通常,命令行参数的优先级大于配置文件。通过第三方库将命令行输入参数解析为commandLine对象,再获取输入参数值。命令行参数的启动比较简单,如果大量的RocketMQ配置项放在启动命令中,就会导致启动命令较长,难以维护,一般推荐启动RocketMQ使用配置文件的方式。配置文件在createBrokerController()
方法中被解析的代码如图所示
在这里插入图片描述
在brokerConfig、nettyServerConfig、nettyClientConfig、messageStoreConfig这些基本配置对象初始化完毕后,还有后续代码依据各种启动条件重新调整部分参数。在各个配置对象初始化完毕后,程序会调用BrokerController.initialize()方法对Broker的各个模块进行初始化
在这里插入图片描述
xxxConfigManager.load()方法的功能是加载Broker基础数据配置,包含Broker中的Topic、消费位点、订阅关系、消费过滤(无实际数据需要加载).这些配置加载成功后,初始化存储层服务对象messageStore和Broker监控统计对象brokerStats.然后,Broker会初始化通信层服务和一些列定时任务,通信层服务主要初始化正常通信通道、VIP通信通道和通信线程池。这里以VIP通道为例,分析通信层服务初始化,以消费进度定时持久化为例,分析定时任务初始化。fastConfig就是VIP通信层的配置,其配置对象"克隆"自正常通信的配置对象,唯独通信端口是nettyServerConfig.getListenPort()-2,
也就是10911-2.利用fastConfig初始化fastRemotingServer的结果也就是我们常用的VIP通道.
从fastConfig和fastRemotingServer的实现类命名来看,RocketMQ的通信层实现本质上是基于Netty的,那么通信层又是如何处理客户端发送的Netty请求的呢?
在这里插入图片描述
在这里插入图片描述
通信层对象初始化完成后,会调用this.registerProcessor()方法,这里将正常的通信层对象和VIP通道的通信层对象与各个请求处理器进行关联,比如将发送消息的请求交给接收消息的请求处理器进行处理
在这里插入图片描述

消费进度定时持久化。
Broker在接收到消费者上报的消费进度后,会定期持久化到物理文件中,当消费者因为重新发布或者宕机而重启时,能从消费进度中得知恢复,不至于重复消费,持久化周期可以通过参数flushConsumerOffsetInterval(以ms为单位)进行配置
在这里插入图片描述


原文地址:https://blog.csdn.net/Cover_sky/article/details/136223010

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!