自学内容网 自学内容网

muduo库的使用

1. 下载安装mudu库

  1. git方法

git clone https://github.com/chenshuo/muduo.git

  1. 安装依赖环境

sudo apt-get install libz-dev libboost-all-dev

  1. 进入目录运行脚本编译安装

[……]$ ./build.sh
[……]$ ./build.sh install

此时会在同级目录下生成一个build目录

2. muduo库的原理

Muduoku是由陈硕大佬开发的一个基于非阻塞IO和时间驱动的C++高并发TCP网络编程库。它是一款使用epoll多路转接,基于主从Reactor模型的网络库,其使用的线程模型是one loop per thread,所谓one loop per thread指的是:

  • 一个线程只能有一个事件循环(EventLoop),用于响应计时器和IO事件。
  • 一个文件描述符只能由一个线程进行读写,换句话说就是一个TCP连接必须属于某个EventLoop管理

在我们之前我们讲解的select,poll,epoll这些多路转接模型的工作原理都是基于单线程的事件监听。将listen_socket添加到事件监控中,然后根据listen_socket监听,一旦由client进行连接就创建socket并也把这个socket添加到事件监听中,然后就可以对已添加的socket进行事件监控,一旦触发了某个事件就对其进行处理。这种通过多路转接模型进行事件触发的IO处理的模型就叫做Reactor模型。但是这也会带来很大的缺陷,前面我们说过了他们都是基于单线程实现的,也就是说执行流只会有一个,如果client有大量的请求连接的话,假如同一时间有1000个请求,那么第1000个请求就需要等带前999个请求处理完后才会执行第1000个请求,而没处理一个请求时需要花费一定时间的,这个效率对用户的体验是非常差的,所以对于高并发场景的单个Reactor模型是远远不够的。

所以muduo库就诞生了,muduo库的模型是由一个主Reactor和多个子Reactor组成的,每个Reactor由一个线程进行维护,主Reactor负责监听新的连接请求,并将新连接分发给子Reactor。主Reactor使用epoll机制监听端口,当有新的连接请求时,将其分配给某个子Reactor‌。子Reactor负责处理分配给它的连接的所有IO操作。每个子Reactor在一个独立的线程中运行,减少了线程间的上下文切换,提高了效率‌

在这里插入图片描述

3. muduo库常见接口介绍

3.1 TcpServer类基础介绍

主要代码逻辑框架

typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::function<void(const TcpConnectionPtr &)> ConnectionCallback;
typedef std::function<void(const TcpConnectionPtr &,
                           Buffer *,
                           Timestamp)>
    MessageCallback;
class InetAddress : public muduo::copyable
{
public:
    InetAddress(StringArg ip, uint16_t port, bool ipv6 = false);
};
class TcpServer : noncopyable
{
public:
    enum Option
    {
        kNoReusePort,
        kReusePort,
    };
    TcpServer(EventLoop *loop,
              const InetAddress &listenAddr,
              const string &nameArg,
              Option option = kNoReusePort); // 第四个参数:是否设置为地址重用
    // 设置线程数量,也就是设置几个Reactor,默认是只有一个线程也就是主线程
    void setThreadNum(int numThreads);
    // 开始时间监控,获取新的连接
    void start();
    /// 当⼀个新连接建⽴成功的时候被调⽤(一个连接建立成功/断开时执行,比如一个广播聊天,一个用户上线里就通知某某上线了,当他线下了也通知某某下线了。)
    void setConnectionCallback(const ConnectionCallback &cb)
    {
        connectionCallback_ = cb;
    }
    /// 消息的业务处理回调函数---这是收到新连接消息的时候被调⽤的函数
    void setMessageCallback(const MessageCallback &cb)
    {
        messageCallback_ = cb;
    }
};

3.2 EventLoop类基础介绍

class EventLoop : noncopyable  
{  
public:  
    // 开始事件循环,直到调用quit()方法为止  
    void loop();  
      
    // 请求退出事件循环  
    void quit();  
      
    // 在指定的时间运行回调函数一次  
    // 参数time是回调应该被调用的时间戳  
    // 参数cb是当时间到达时应该被调用的回调函数  
    // 返回TimerId,可用于取消定时器  
    TimerId runAt(Timestamp time, TimerCallback cb);  
      
    // 在当前时间加上指定的延迟后运行回调函数一次  
    // 参数delay是延迟时间(秒)  
    // 参数cb是当延迟时间过后应该被调用的回调函数  
    // 返回TimerId,可用于取消定时器  
    TimerId runAfter(double delay, TimerCallback cb);  
      
    // 每隔指定的时间间隔重复运行回调函数  
    // 参数interval是时间间隔(秒)  
    // 参数cb是每隔interval秒应该被调用的回调函数  
    // 返回TimerId,可用于取消定时器  
    TimerId runEvery(double interval, TimerCallback cb);  
      
    // 取消指定的定时器  
    // 参数timerId是之前通过runAt、runAfter或runEvery方法返回的定时器ID  
    void cancel(TimerId timerId);  
      
private:  
    // 原子变量,用于指示事件循环是否应该退出  
    std::atomic<bool> quit_;
      
    // 指向Poller对象的智能指针,Poller负责轮询I/O事件
    std::unique_ptr<Poller> poller_;  
      
    // 互斥锁,用于保护多线程访问共享数据  
    mutable MutexLock mutex_;  
    
    // 存储待执行函数的向量,这些函数将在事件循环的某个点被执行
    std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);  
};  

3.3TcpConnection类基础介绍

class TcpConnection : noncopyable,  
                      public std::enable_shared_from_this<TcpConnection>  
{  
public:  
    // 构造函数,用于创建TcpConnection对象  
    // 参数包括事件循环指针、连接名称、套接字文件描述符、本地地址和远程地址  
    TcpConnection(EventLoop* loop,  
                  const string& name,  
                  int sockfd,  
                  const InetAddress& localAddr,  
                  const InetAddress& peerAddr);  
  
    // 检查连接是否已建立  
    bool connected() const { return state_ == kConnected; }  
  
    // 检查连接是否已断开  
    bool disconnected() const { return state_ == kDisconnected; }  
  
    // 发送字符串消息(使用C++11的移动语义)  
    void send(string&& message);  
  
    // 发送原始数据  
    void send(const void* message, int len);  
  
    // 使用StringPiece发送消息(StringPiece是Google的字符串切片类,用于高效处理字符串片段)  
    void send(const StringPiece& message);  
  
    // 发送Buffer对象中的数据  
    void send(Buffer* message);  
  
    // 关闭连接  
    void shutdown();  
  
    // 设置连接上下文,上下文可以是任意类型的数据,通过boost::any存储  
    void setContext(const boost::any& context)  
    { context_ = context; }  
  
    // 获取连接上下文(常量引用)  
    const boost::any& getContext() const  
    { return context_; }  
  
    // 获取连接上下文的可修改指针(注意:这可能不是线程安全的,使用时需要小心)  
    boost::any* getMutableContext()  
    { return &context_; }  
  
    // 设置连接建立时的回调函数  
    void setConnectionCallback(const ConnectionCallback& cb)  
    { connectionCallback_ = cb; }  
  
    // 设置接收到消息时的回调函数  
    void setMessageCallback(const MessageCallback& cb)  
    { messageCallback_ = cb; }  
  
private:  
    // 连接的状态枚举  
    enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };  
  
    // 指向事件循环的指针,用于在连接上执行定时任务或异步操作  
    EventLoop* loop_;  
  
    // 连接建立时的回调函数  
    ConnectionCallback connectionCallback_;  
  
    // 接收到消息时的回调函数  
    MessageCallback messageCallback_;  
  
    // 发送完成时的回调函数
    WriteCompleteCallback writeCompleteCallback_;  
  
    // 上下文信息,可以是任意类型的数据,通过boost::any存储  
    boost::any context_;  
  
    // 连接的状态
    StateE state_;  
 
};

3.4 Buffer类基础介绍

// 缓冲区类
// Buffer 类是一个字节缓冲区类,支持从两端读写数据,以及处理整数和基本字符串。  
// 它继承自 muduo::copyable,表明这个类是可以被拷贝的。  
class Buffer : public muduo::copyable  
{  
public:  
    // 定义了一个便宜的前置空间大小,用于优化读操作。  
    static const size_t kCheapPrepend = 8;  
    // 定义了缓冲区的初始大小。  
    static const size_t kInitialSize = 1024;  
  
    // 构造函数,接受一个可选的初始大小参数。  
    // 缓冲区实际大小为 kCheapPrepend + initialSize,其中 kCheapPrepend 用于优化读操作。  
    explicit Buffer(size_t initialSize = kInitialSize)  
        : buffer_(kCheapPrepend + initialSize),  
          readerIndex_(kCheapPrepend),  
          writerIndex_(kCheapPrepend)  
    {}  
  
    // 与另一个Buffer对象交换内容。  
    void swap(Buffer& rhs);  
  
    // 返回可读字节数,即 writerIndex_ - readerIndex_。  
    size_t readableBytes() const;  
  
    // 返回可写字节数,即 buffer_.size() - writerIndex_。  
    size_t writableBytes() const;  
  
    // 返回一个指向可读数据的指针,但不移动读写索引。  
    const char* peek() const;  
  
    // 查找并返回指向缓冲区中第一个EOL(如"\r\n")的指针,从头开始搜索。  
    const char* findEOL() const;  
  
    // 查找并返回指向缓冲区中从指定位置开始的第一个EOL的指针。  
    const char* findEOL(const char* start) const;  
  
    // 从缓冲区中移除指定长度的数据。  
    void retrieve(size_t len);  
  
    // 移除并返回缓冲区中下一个 int64_t 类型的数据。  
    void retrieveInt64();  
  
    // 移除并返回缓冲区中下一个 int32_t 类型的数据。  
    void retrieveInt32();  
  
    // 移除并返回缓冲区中下一个 int16_t 类型的数据。  
    void retrieveInt16();  
  
    // 移除并返回缓冲区中下一个 int8_t 类型的数据。  
    void retrieveInt8();  
  
    // 移除并返回缓冲区中所有可读数据作为字符串。  
    string retrieveAllAsString();  
  
    // 移除并返回缓冲区中指定长度的数据作为字符串。  
    string retrieveAsString(size_t len);  
  
    // 向缓冲区末尾追加 StringPiece 对象。  
    void append(const StringPiece& str);  
  
    // 向缓冲区末尾追加指定长度的数据。  
    void append(const char* /*restrict*/ data, size_t len);  
  
    // 向缓冲区末尾追加指定长度的数据(泛型版本)。  
    void append(const void* /*restrict*/ data, size_t len);  
  
    // 返回一个指向缓冲区末尾(用于写入)的指针。  
    char* beginWrite();  
  
    // 返回一个指向缓冲区末尾(用于写入)的常量指针。  
    const char* beginWrite() const;  
  
    // 更新写入索引,表示已经写入了指定长度的数据。  
    void hasWritten(size_t len);  
  
    // 向缓冲区末尾追加一个 int64_t 类型的数据。  
    void appendInt64(int64_t x);  
  
    // 向缓冲区末尾追加一个 int32_t 类型的数据。  
    void appendInt32(int32_t x);  
  
    // 向缓冲区末尾追加一个 int16_t 类型的数据。  
    void appendInt16(int16_t x);  
  
    // 向缓冲区末尾追加一个 int8_t 类型的数据。  
    void appendInt8(int8_t x);  
  
    // 从缓冲区读取一个 int64_t 类型的数据,并移动读索引。  
    int64_t readInt64();  
  
    // 从缓冲区读取一个 int32_t 类型的数据,并移动读索引。  
    int32_t readInt32();  
  
    // 从缓冲区读取一个 int16_t 类型的数据,并移动读索引。  
    int16_t readInt16();  
  
    // 从缓冲区读取一个 int8_t 类型的数据,并移动读索引。  
    int8_t readInt8();  
  
    // 从缓冲区中查看(不移动读索引)下一个 int64_t 类型的数据。  
    int64_t peekInt64() const;  
  
    // 从缓冲区中查看(不移动读索引)下一个 int32_t 类型的数据。  
    int32_t peekInt32() const;  
  
    // 从缓冲区中查看(不移动读索引)下一个 int16_t 类型的数据。  
    int16_t peekInt16() const;  
  
    // 从缓冲区中查看(不移动读索引)下一个 int8_t 类型的数据。  
    int8_t peekInt8() const;  
  
    // 在缓冲区开头(readerIndex_ 之前)追加一个 int64_t 类型的数据。  
    void prependInt64(int64_t x);  
  
    // 在缓冲区开头(readerIndex_ 之前)追加一个 int32_t 类型的数据。  
    void prependInt32(int32_t x);  
  
    // 在缓冲区开头(readerIndex_ 之前)追加一个 int16_t 类型的数据。  
    void prependInt16(int16_t x);  
  
    // 在缓冲区开头(readerIndex_ 之前)追加一个 int8_t 类型的数据。  
    void prependInt8(int8_t x);  
  
    // 在缓冲区开头(readerIndex_ 之前)追加指定长度的数据。  
    void prepend(const void* /*restrict*/ data, size_t len);  
  
private:  
    std::vector<char> buffer_; // 存储字节数据的向量。  
    size_t readerIndex_; // 读索引,指向下一个可读字节的位置。  
    size_t writerIndex_; // 写索引,指向下一个可写字节的位置。  
    static const char kCRLF[]; // 可能的行结束符,如 "\r\n"。  
};

3.5 TcpClient类基础介绍

class TcpClient : noncopyable  
{  
public:  
    // 构造函数,用于创建 TcpClient 对象。  
    // 需要提供事件循环指针、服务器地址和客户端名称。  
    TcpClient(EventLoop* loop,  
              const InetAddress& serverAddr,  
              const string& nameArg);  
  
    // 析构函数,声明为 out-of-line(在类定义外部实现),  
    // 以便处理 std::unique_ptr 成员(尽管在这个类的定义中没有直接显示)。  
    ~TcpClient();  
  
    // 连接到服务器。  
    void connect();  
  
    // 关闭连接。  
    void disconnect();  
  
    // 停止客户端操作,可能包括关闭连接和清理资源。  
    void stop();  
  
    // 获取客户端对应的通信连接 TcpConnection 对象的接口。  
    // 注意:在发起 connect 后,连接可能尚未建立成功。  
    TcpConnectionPtr connection() const  
    {  
        MutexLockGuard lock(mutex_); // 加锁以保护 connection_  
        return connection_; // 返回当前连接(如果有的话)  
    }  
  
    // 设置连接成功时的回调函数。  
    void setConnectionCallback(ConnectionCallback cb)  
    {  
        connectionCallback_ = std::move(cb); // 使用移动语义设置回调  
    }  
  
    // 设置收到服务器发送的消息时的回调函数。  
    void setMessageCallback(MessageCallback cb)  
    {  
        messageCallback_ = std::move(cb); // 使用移动语义设置回调  
    }  
  
private:  
    EventLoop* loop_; // 指向事件循环的指针,用于处理异步事件  
    ConnectionCallback connectionCallback_; // 连接成功时的回调函数  
    MessageCallback messageCallback_; // 收到消息时的回调函数  
    // 注意:WriteCompleteCallback 在此类的定义中没有直接出现,但可能在其他地方使用  
    TcpConnectionPtr connection_ GUARDED_BY(mutex_); // 当前连接(受 mutex_ 保护)  
    mutable MutexLock mutex_; // 用于保护 connection_ 的互斥锁  
};  
  
// CountDownLatch 类是一个同步辅助类,用于让一个或多个线程等待直到其他线程的一系列操作完成。  
// 它继承自 noncopyable 以防止被复制。  
class CountDownLatch : noncopyable  
{  
public:  
    // 构造函数,初始化计数器。  
    explicit CountDownLatch(int count);  
  
    // 等待计数器变为零。如果计数器不为零,则当前线程将阻塞。  
    void wait()  
    {  
        MutexLockGuard lock(mutex_); // 加锁以保护 count_ 和 condition_  
        while (count_ > 0) // 如果计数器大于零,则等待  
        {  
            condition_.wait(); // 释放锁并进入等待状态,直到被唤醒  
        }  
    }  
  
    // 将计数器减一。如果计数器变为零,则唤醒所有等待的线程。  
    void countDown()  
    {  
        MutexLockGuard lock(mutex_); // 加锁以保护 count_ 和 condition_  
        --count_; // 计数器减一  
        if (count_ == 0) // 如果计数器变为零  
        {  
            condition_.notifyAll(); // 唤醒所有等待的线程  
        }  
    }  
  
    // 获取当前计数器的值(主要用于调试)。  
    int getCount() const;  
  
private:  
    mutable MutexLock mutex_; // 用于保护 count_ 和 condition_ 的互斥锁  
    Condition condition_ GUARDED_BY(mutex_); // 条件变量,与 mutex_ 一起使用以实现等待/通知机制  
    int count_ GUARDED_BY(mutex_); // 计数器,表示需要等待的操作数量  
};

4. 示例加深理解

编写一个简单的翻译模块。

4.1 服务端

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <iostream>
#include <string>
#include <unordered_map>

class DictServer{
public:
    DictServer(int port = 9090):_server(&_baseloop, 
        muduo::net::InetAddress("127.0.0.1",port), 
        "DictServer", 
        muduo::net::TcpServer::kReusePort)//第四个参数:是否启动地址重用
    {
        // 设置回调函数
        // 设置连接(连接建立/管理)事件的回调
        _server.setConnectionCallback(std::bind(&DictServer::onConnection, this, std::placeholders::_1));
        // 设置连接消息的回调
        _server.setMessageCallback(std::bind(&DictServer::onMessage, 
                                    this, 
                                    std::placeholders::_1, 
                                    std::placeholders::_2, 
                                    std::placeholders::_3));
    }
// 启动服务器
    void start(){
        _server.start();// 开始监听
        _baseloop.loop();// 开始事件循环监控
    }

    void onConnection(const muduo::net::TcpConnectionPtr& conn)
    {
       if (conn->connected()){
            std::cout << "连接成功\n";
       }else{
            std::cout << "连接失败\n";
       }
    }

    void onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buf, muduo::Timestamp time){
        static std::unordered_map<std::string, std::string> dict_map = {
            {"hello", "你好"},
            {"world", "世界"},
            {"map","图"}
        };
        // 获取所有数据,不需要提供rceve接收消息的接口,直接从缓冲区中拿数据
        std::string msg = buf->retrieveAllAsString();
        auto it = dict_map.find(msg);
        std::string res;
        if (it != dict_map.end()){
            res = it->second;
        }else{
            res = "未知单词!";
        }
        conn->send(res);
    }
private:
// 事件循环对象,用户处理网络事件
    muduo::net::EventLoop _baseloop; // 这个一定要声明在_server前面,因为需要用EventLoop来构造TcpServer 
    // TCP服务对象,用于监听和接收连接
    muduo::net::TcpServer _server;
};

int main()
{
    DictServer server(9090);
    server.start();
    return 0;
}

4.2 客户端

#include <muduo/net/TcpClient.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <muduo/base/CountDownLatch.h> // 用来同步连接,即连接成功后才开始发消息
#include <iostream>
#include <string>

class DictClient
{
public:
    DictClient(const std::string& sip, int port) 
    : _baseloop(_threadloop.startLoop()), // 这里threadloop返回的是一个EventLoop类型
      _client(_baseloop, muduo::net::InetAddress(sip, port), "DictClient"),
      _downlatch(1) // 这里初始化为1,为0时才会被唤醒
    {
        // 设置回调函数
        // 设置连接(连接建立/管理)事件的回调
        _client.setConnectionCallback(std::bind(&DictClient::onConnection, this, std::placeholders::_1));
        // 设置连接消息的回调
        _client.setMessageCallback(std::bind(&DictClient::onMessage,
                                             this,
                                             std::placeholders::_1,
                                             std::placeholders::_2,
                                             std::placeholders::_3));
        _client.connect();
        _downlatch.wait(); // 阻塞等待连接成功后被唤醒
        //_baseloop.loop(); // 客户端不能这样写,因为loop是一个死循环,一旦这里执行了loop就一定不会走到send发送数据那一块
        // 所以这里要给baseloop.loop在一个新的线程里跑,muduo库提供了一个EventLoopThread可以做到
        // 此时我们就不需要使用_baseloop.loop();循环了EventLoopThread里面会帮我们去loop、
    }

    bool send(const std::string &msg)
    {
        if (_conn->connected() == false)
        {
            return false;
            std::cout << "连接已断开,发送数据失败" << std::endl;
        }
        _conn->send(msg);
        return true;
    }

    void onConnection(const muduo::net::TcpConnectionPtr &conn)
    {
        if (conn->connected())
        {
            std::cout << "连接成功\n";
            _downlatch.countDown(); // 连接成功计数器--,为0时唤醒阻塞
            _conn = conn;
        }
        else
        {
            std::cout << "连接失败\n";
            _conn.reset(); // 清空
        }
    }

    void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp time)
    {
        std::string res = buf->retrieveAllAsString();
        std::cout << res << std::endl;
    }

private:
    muduo::net::TcpConnectionPtr _conn; // TcpConnection的智能指针,用于存放连接对象
    muduo::CountDownLatch _downlatch; // 计数器用于实现同步
    muduo::net::EventLoopThread _threadloop; // 定义这个的时候EventLoopThread 内部执行startloop便可以触发EventLoop 的loop事件循环,
    muduo::net::EventLoop *_baseloop;指向EventLoopThread中EventLoop指针
    muduo::net::TcpClient _client; // TcpClient对象,用于连接和发送数据
};

int main()
{
    DictClient client("127.0.0.1", 9090);
    while (true)
    {
        std::string msg;
        std::cin >> msg;
        client.send(msg);
    }

    return 0;
}

这里我们可以跳转到startLoop的实现:

EventLoop* EventLoopThread::startLoop()
{
  assert(!thread_.started());
  thread_.start();

  EventLoop* loop = NULL;
  {
    MutexLockGuard lock(mutex_);
    while (loop_ == NULL)
    {
      cond_.wait();
    }
    loop = loop_;
  }
  return loop;
}

在跳转到thread_.start()

void Thread::start()
{
  assert(!started_);
  started_ = true;
  // FIXME: move(func_)
  detail::ThreadData* data = new detail::ThreadData(func_, name_, &tid_, &latch_);
  if (pthread_create(&pthreadId_, NULL, &detail::startThread, data)) // 这里就会创建一个线程,这个线程实现的方法就是EventLoop的loop事件循环监控行为
  {
    started_ = false;
    delete data; // or no delete?
    LOG_SYSFATAL << "Failed in pthread_create";
  }
  else
  {
    latch_.wait();
    assert(tid_ > 0);
  }
}
void* startThread(void* obj)
{
  ThreadData* data = static_cast<ThreadData*>(obj);
  data->runInThread();
  delete data;
  return NULL;
}

void runInThread()
{
……
    func_();
    ……
};

这个func_()是一个回调函数,我们可以去看一下EventLoopThread的构造函数,构造了一个
void EventLoopThread::threadFunc()
{
……
  EventLoop loop;
  loop.loop(); // 这个就是
……
}

4.3 Makefile编写

CFLAG = -std=c++11 -I ../../build/release-install-cpp11/include/ #指定头文件路径(这里需要根据自身的情况具体填写)
LFLAG = -L ../../build/release-install-cpp11/lib -lmuduo_net -lmuduo_base -lpthread #指定包含库文件路径,注意这里lmuduo_net要先连接(这里需要根据自身的情况具体填写)
all:server client
server:server.cpp
g++ -o $@ $^ $(CFLAG) $(LFLAG)

client:client.cpp
g++ -o $@ $^ $(CFLAG) $(LFLAG)
.PHONY:clean
clean:
rm -fr server client

原文地址:https://blog.csdn.net/qq_74276498/article/details/143921890

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!