自学内容网 自学内容网

thrift rpc 四种类型的服务端的实现详细介绍

thrift rpc 四种类型的服务端的实现详细介绍

这里主要是使用 thrift 开发的时候服务器端的实现,以及 thrift 提供给我们多钟的服务的实现,以及每个实现的服务器的特点和 API 介绍,TServer 主要包含以下几种实现

  • TSimpleServer
    • 阻塞的但线程模式,他并没有实战价值,只是对于学习 thrift 服务器的简单入门是十分友好的
  • TThreadPoolServer
    • 阻塞线程池的设计模式
  • TNonblockingServer
    • 非阻塞的但线程模式
  • TSelectorServer
    • 他的实现主要是 selector 模型,主从 selector 模式,跟我们了解过的 netty 中使用的设计十分类似

TSimpleServer

既然他是一个没有实战价值的服务实现,那为什么我们要学习他兵器在入门阶段使用它呢,因为就是因为它的简单,才利于学习,利于我们了解 thrift 中对于服务的设计,对于接下来对多线程乃至主从 selector 的学习也会有很好的铺垫作用。

TSimpleServer 的内部实现原理

内部的实现是依靠 JDK 的 SocketServer + accept 并且没有使用 thread 来提高效率,是一个阻塞的但线程模式。

// 打开查看方法 org.apache.thrift.server.TSimpleServer#serve
// 这里有一个执行的片段 client = serverTransport_.accept(); 这就是实现 accept 的核心逻辑

TThreadPoolServer

线程池的版本他使用了一个线程池来解决的阻塞的请求,但是他也是阻塞的,一个线程池只能处理一个请求,它的大致流程 accept -> 接收到请求 -> 分配线程执行 它的好处不言而喻可以同时提供给多个客户端的服务,但是坏处是如果线程池的设置太多会占用太多资源,如果某个线程发生阻塞这个时候只能阻塞等待这个线程执行完毕,但是如果太少又没有办法给更多的客户端提供服务

private static void tThreadPoolServer() throws TTransportException {
    // TTransportFactory
    try (final TServerTransport serverTransport = new TServerSocket(9090)) {
        final TThreadPoolServer threadPoolServer = new TThreadPoolServer(
                new TThreadPoolServer.Args(serverTransport)
                        .protocolFactory(new TBinaryProtocol.Factory())
                        .processor(new UserService.Processor<>(new UserServiceImpl()))
        );

        threadPoolServer.serve();
    }
}

和 simple 的版本最重要的区别就是在 TServer 使用的是 TThreadPoolServer,他内置一个线程池

/**
 * 可以在构造器中看到有一个线程池的创建和复制逻辑
 */
public TThreadPoolServer(Args args) {
  super(args);

  stopTimeoutUnit = args.stopTimeoutUnit;
  stopTimeoutVal = args.stopTimeoutVal;

  // 赋值线程池
  executorService_ =
      args.executorService != null ? args.executorService : createDefaultExecutorService(args);
}

他的线程池创建逻辑

public int minWorkerThreads = 5;
public int maxWorkerThreads = Integer.MAX_VALUE;

private static ExecutorService createDefaultExecutorService(Args args) {
  return new ThreadPoolExecutor(
      args.minWorkerThreads, // 默认的核心线程
      args.maxWorkerThreads, // 最大的线程池
      60L,
      TimeUnit.SECONDS,
      new SynchronousQueue<>(),
      // 线程工厂
      new ThreadFactory() {
        final AtomicLong count = new AtomicLong();

        @Override
        public Thread newThread(Runnable r) {
          Thread thread = new Thread(r);
          thread.setDaemon(true);
          thread.setName (
              String.format("TThreadPoolServer WorkerProcess-%d", count.getAndIncrement()));
          return thread;
        }
      });
}

public Args maxWorkerThreads(int n) {
  maxWorkerThreads = n;
  return this;
}

我们可以看到最大的默认线程池的数量是 Integer.MAX_VALUE,但是是可以配置的,我们尽量不要使用它的默认配置可以自己创建线程池,如果不自己创建可以在使用默认线程池的时候指定最大的线程池,使用方法 maxWorkerThreads,防止在使用的时候出现 OOM

执行流程

accept -> 分配 worker 线程执行任务(这里面的内容就是但线程的执行流程)

非阻塞 IO

thrift 的规定中,非阻塞的 server 必须使用的 tTransportTFramedTransport,但是如果使用 TFramedTransport 必须对应的 tProtocol 则必须是搭配的是 TCompactProtocol,因此在客户端的选择上肯定也是要使用的网络传输 -> TFramedTransport,使用的协议肯定自然也是 -> TCompactProtocol,但是为什么会存在这样强制的搭配选择呢?大概官方的意图是既然选择了 NIO 的模式肯定是选择了更高效的模式,所以一不做二不休,直接一次性到位,必须搭配对应更高效的压缩的模式来传输数据。

TNonBlockingServer

TNonBlockingServer 是一个非阻塞 IO,底层的实现是使用的 NIO,但是他使用的是但线程,并没有使用多线程,这就导致在实际生产中并不会选择这个作为我们的服务端的实现方式。不过可以见到看一下他们的编码实现方式

服务端的代码实现
public static void main(String[] args) throws TTransportException {
    tNonBlockingServer();
}

private static void tNonBlockingServer() throws TTransportException {
    try (final TNonblockingServerTransport framedTransport = new TNonblockingServerSocket(9090)) {
        final TNonblockingServer nonblockingServer = new TNonblockingServer(
                new TNonblockingServer.Args(framedTransport)
                        .protocolFactory(new TCompactProtocol.Factory())
                        .processor(new UserService.Processor<>(new UserServiceImpl()))
        );
        framedTransport.accept();
        nonblockingServer.serve();
    }
}
客户端的代码实现
public static void main(String[] args) {
    simpleNioClient();
}

public static void simpleNioClient() {
    try (TFramedTransport tFramedTransport = new TFramedTransport(new TSocket("localhost", 9090))) {
        tFramedTransport.open();
        System.out.println(new UserService.Client(new TCompactProtocol(tFramedTransport))
                .queryUserByNameAndPassword("new User()", "222"));
    } catch (TException e) {
        throw new RuntimeException(e);
    }
}

TThreadSelectorServer

TThreadSelectorServer 其实底层的实现是一个主从的 Reactor 模型,一下是他的架构实现图,关于主从 reactor 的调度流程

/**
 * 服务端的代码实现
 */
private static void tThreadSelectorServer() throws TTransportException {
    try (TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(9090)) {
        serverSocket.accept();
        final TThreadedSelectorServer selectorServer = new TThreadedSelectorServer(
                new TThreadedSelectorServer.Args(serverSocket)
                        .protocolFactory(new TCompactProtocol.Factory())
                        .processor(new UserService.Processor<>(new UserServiceImpl()))
        );
        selectorServer.serve();
    }
}

默认的实现线程池大小是 5,可以配置 workerThreads 这个其实就是指定的核心线程数。默认的是 5 使用的创建线程池的方式为 new ThreadPoolExecutor(...),但是也可以自己创建线程池,使用 executorService 这个参数进行配置。底层其实和 TNonBlockingServer 是一样的,知识在单线程的基础上增加了线程池的支持使其在执行 work 任务的时候可以更高效的使用线程池来执行任务。

在这里插入图片描述

注意点地方

  • 客户端的 thrift 版本必须和服务端使用的 thrift 的版本一致
  • 客户端使用的纯属和协议必须和服务端的一直,也就是 tTransport,tProtocol

原文地址:https://blog.csdn.net/qq_39116472/article/details/143470712

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!