自学内容网 自学内容网

dolphinscheduler服务RPC框架源码解析(二)RPC核心注解@RpcService和@RpcMethod设计实现

1.工程目录

从3.2.1版本之后这个dolphinscheduler中的RPC框架工程就从原来的dolphinscheduler-remote工程重构到了dolphinscheduler-extract工程。
在这里插入图片描述

  • dolphinscheduler 父项目
    • dolphinscheduler-extract RPC服务项目
      • dolphinscheduler-extract-alert 监控告警服务RPC接口定义、请求响应封装设计工程
      • dolphinscheduler-extract-base RPC框架核心工程
      • dolphinscheduler-extract-common RPC框架通用工程
      • dolphinscheduler-extract-master Master调度服务的RPC接口定义、请求响应封装设计工程
      • dolphinscheduler-extract-worker Worker任务执行服务的RPC接口定义、请求响应封装设计工程

1.核心注解的设计

Dolphinscheduler中的RPC核心注解包含**@RpcService@RpcMethod**。

1.1.@RpcService注解

在这里插入图片描述
这个注解的主要作用就是用来标记被它注解的接口是一个RPC服务接口

/**
 *  这个注解的主要作用就是用来标记被它注解的接口是一个RPC服务接口
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RpcService {
}

1.1.@RpcMethod注解

这个注解需要再被@RpcService注解的接口类中定义的方法中进行使用,表明这个方法是一个RPC方法。

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RpcMethod {

    long timeout() default 3000L;

}

2.核心注解的使用

这两个注解再Dolphinscheduler中一般结合进行使用。用来定义一个RPC服务接口,这个接口需要再服务提供者服务中存在具体的接口实现类。,以下是一个RPC远程日志管理的接口定义。这个RPC服务接口再MasterServer服务和WorkerServer服务中都有对应的实现类。

@RpcService
public interface ILogService {

    @RpcMethod
    TaskInstanceLogFileDownloadResponse getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest taskInstanceLogFileDownloadRequest);

    @RpcMethod
    TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest);

    @RpcMethod
    GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest);

    @RpcMethod
    void removeTaskInstanceLog(String taskInstanceLogAbsolutePath);

}

MasterServer服务中实现类

package org.apache.dolphinscheduler.server.master.rpc;

import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.LogUtils;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;

import java.util.Collections;
import java.util.List;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Service;

@Slf4j
@Service
public class MasterLogServiceImpl implements ILogService {

    @Override
    public TaskInstanceLogFileDownloadResponse getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest logicTaskInstanceLogFileDownloadRequest) {
        byte[] bytes =
                LogUtils.getFileContentBytes(logicTaskInstanceLogFileDownloadRequest.getTaskInstanceLogAbsolutePath());
        // todo: if file not exists, return error result
        return new TaskInstanceLogFileDownloadResponse(bytes);
    }

    @Override
    public TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest) {

        List<String> lines = LogUtils.readPartFileContent(
                taskInstanceLogPageQueryRequest.getTaskInstanceLogAbsolutePath(),
                taskInstanceLogPageQueryRequest.getSkipLineNum(),
                taskInstanceLogPageQueryRequest.getLimit());

        String logContent = LogUtils.rollViewLogLines(lines);
        return new TaskInstanceLogPageQueryResponse(logContent);
    }

    @Override
    public GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest) {
        return new GetAppIdResponse(Collections.emptyList());
    }

    @Override
    public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) {
        FileUtils.deleteFile(taskInstanceLogAbsolutePath);
    }
}

WorkerServer服务中实现类

package org.apache.dolphinscheduler.server.worker.rpc;

import static org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT;
import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY;

import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.LogUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;

import java.util.List;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Service;

@Slf4j
@Service
public class WorkerLogServiceImpl implements ILogService {

    @Override
    public TaskInstanceLogFileDownloadResponse getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest taskInstanceLogFileDownloadRequest) {
        byte[] bytes = LogUtils
                .getFileContentBytes(taskInstanceLogFileDownloadRequest.getTaskInstanceLogAbsolutePath());
        // todo: if file not exists, return error result
        return new TaskInstanceLogFileDownloadResponse(bytes);
    }

    @Override
    public TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest) {
        List<String> lines = LogUtils.readPartFileContent(
                taskInstanceLogPageQueryRequest.getTaskInstanceLogAbsolutePath(),
                taskInstanceLogPageQueryRequest.getSkipLineNum(),
                taskInstanceLogPageQueryRequest.getLimit());

        String logContent = LogUtils.rollViewLogLines(lines);
        return new TaskInstanceLogPageQueryResponse(logContent);
    }

    @Override
    public GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest) {
        String appInfoPath = null;
        WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(getAppIdRequest.getTaskInstanceId());
        if (workerTaskExecutor != null) {
            // todo: remove this kind of logic, and remove get appId method, the appId should be send by worker rather
            // than query by master
            appInfoPath = workerTaskExecutor.getTaskExecutionContext().getAppInfoPath();
        }
        String logPath = getAppIdRequest.getLogPath();
        List<String> appIds = org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils.getAppIds(logPath, appInfoPath,
                PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));
        return new GetAppIdResponse(appIds);
    }

    @Override
    public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) {
        FileUtils.deleteFile(taskInstanceLogAbsolutePath);
    }
}

3.核心注解的扫描和解析

核心注解的扫描和解析一般都是有RPC的服务提供者进行,Dolphinscheduler中的RPC服务提供者是基于Netty的RpcServer。RpcServer启动时,会获取Spring容器中的所有被@RpcService注解的接口的实现类以及接口的方法被@RpcMethod注解修复的服务Bean注册中调用程序, 等待向客户端提供服务。服务器启动后,它将监听端口并等待客户端连接。
在这里插入图片描述

public class RpcServer implements ServerMethodInvokerRegistry, AutoCloseable {

    private final NettyRemotingServer nettyRemotingServer;

    public RpcServer(NettyServerConfig nettyServerConfig) {
        this.nettyRemotingServer = NettyRemotingServerFactory.buildNettyRemotingServer(nettyServerConfig);
    }

    public void start() {
        nettyRemotingServer.start();
    }

    @Override
    public void registerServerMethodInvokerProvider(Object serverMethodInvokerProviderBean) {
    // 获取到服务提供者服务Bean的所有接口,如果接口被@RpcService注解, 说明是RPC服务接口的实现类
    // 再获取这个接口中所有被@RpcMethod方法注解的方法,然后将这个Bean对象及这些方法注册到提供者服务中,等待RPC客户端调用
        for (Class<?> anInterface : serverMethodInvokerProviderBean.getClass().getInterfaces()) {
            if (anInterface.getAnnotation(RpcService.class) == null) {
                continue;
            }
            for (Method method : anInterface.getDeclaredMethods()) {
            // 获取RPC接口中所有被@RpcMethod方法注解的方法
                RpcMethod rpcMethod = method.getAnnotation(RpcMethod.class);
                if (rpcMethod == null) {
                    continue;
                }
                ServerMethodInvoker serverMethodInvoker =
                        new ServerMethodInvokerImpl(serverMethodInvokerProviderBean, method);
                // 创建serverMethodInvoker对象注册到提供者服务中,等待RPC客户端调用
                nettyRemotingServer.registerMethodInvoker(serverMethodInvoker);
                log.debug("Register ServerMethodInvoker: {} to bean: {}",
                        serverMethodInvoker.getMethodIdentify(), serverMethodInvoker.getMethodProviderIdentify());
            }
        }
    }

    @Override
    public void close() {
        nettyRemotingServer.close();
    }
}

以上就是RPC服务接口定义、使用及服务提供者如何扫描解析RPC服务接口实现并注册的整个实现过程。希望大家看完都能有所收获,如果觉得文章写的还不错,喜欢的童鞋们请点赞收藏,送你一朵小红花哈~~~~~~


原文地址:https://blog.csdn.net/qq_41865652/article/details/144423354

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!