自学内容网 自学内容网

C++ redis实现分布式锁

分布式锁概述

在单机环境中,常见的锁机制如互斥锁(Mutex)用于保护共享资源。然而,在分布式系统中,由于多个进程可能运行在不同的机器上,单纯依赖本地锁无法实现资源的同步访问。这就需要分布式锁来确保跨多个节点的互斥访问。

分布式锁的关键特性:

  • 互斥性(Mutual Exclusion): 同一时刻只有一个客户端可以获取锁。
  • 死锁避免(Deadlock Avoidance): 锁有超时机制,防止因客户端故障导致的死锁。
  • 容错性(Fault Tolerance): 即使部分节点失效,锁机制仍然可靠。

分布式锁的常见实现

基于 Redis 的分布式锁

Redis 提供了简单且高效的分布式锁机制,尤其是通过 SET 命令配合 NXPX 选项,可以实现基本的锁功能。更高级的实现如 Redlock 进一步提高了分布式环境下的可靠性。

优点:

  • 简单易用
  • 高性能
  • 支持自动过期

缺点:

  • 单点故障风险(除非使用 Redis 集群或主从架构)

基于 ZooKeeper 的分布式锁

ZooKeeper 提供了更强大的分布式协调功能,包括分布式锁。通过创建顺序节点和监听节点变化,可以实现锁的获取和释放。

优点:

  • 高可靠性
  • 提供了更丰富的协调功能,如选举、组管理等

缺点:

  • 配置和维护相对复杂
  • 相比 Redis,性能略低

基于 etcd 的分布式锁

etcd 是一个分布式键值存储,具备强一致性和高可用性。通过其租约(Lease)和事务(Transaction)机制,可以实现分布式锁。

优点:

  • 高一致性
  • 集成了服务发现和配置管理功能

缺点:

  • 类似于 ZooKeeper,需要额外的学习和维护

在 C++ 中实现分布式锁

以下以基于 Redis 的分布式锁为例,介绍如何在 C++ 中实现分布式锁。我们将使用 hiredis 作为 Redis 的 C 客户端库,并封装分布式锁的逻辑。

示例:使用 Redis 实现分布式锁

步骤:

  1. 连接到 Redis 服务器
  2. 尝试获取锁(使用 SET 命令的 NXPX 选项)
  3. 释放锁(确保只有持有锁的客户端才能释放)

注意: 为了确保锁的安全释放,建议使用唯一标识符(如 UUID)来标记锁的拥有者。

示例代码:

#include <hiredis/hiredis.h>
#include <string>
#include <chrono>
#include <thread>
#include <uuid/uuid.h>
#include <iostream>

// 生成唯一标识符
std::string generate_uuid() {
    uuid_t uuid;
    uuid_generate(uuid);
    char uuid_str[37]; // 36 characters + null terminator
    uuid_unparse(uuid, uuid_str);
    return std::string(uuid_str);
}

class RedisDistributedLock {
public:
    RedisDistributedLock(redisContext* context, const std::string& lock_key, int lock_timeout_ms = 10000)
        : context_(context), lock_key_(lock_key), lock_timeout_ms_(lock_timeout_ms), locked_(false) {
        uuid_ = generate_uuid();
    }

    // 尝试获取锁
    bool lock() {
        // 使用 SET 命令,并设置 NX 和 PX 选项
        std::string command = "SET " + lock_key_ + " " + uuid_ + " NX PX " + std::to_string(lock_timeout_ms_);
        redisReply* reply = (redisReply*)redisCommand(context_, command.c_str());
        if (reply == nullptr) {
            std::cerr << "Redis command failed\n";
            return false;
        }

        bool success = false;
        if (reply->type == REDIS_REPLY_STATUS && std::string(reply->str) == "OK") {
            success = true;
            locked_ = true;
        }
        freeReplyObject(reply);
        return success;
    }

    // 释放锁
    bool unlock() {
        if (!locked_) {
            return false;
        }

        // 使用 Lua 脚本确保原子性:检查值是否匹配,再删除
        const char* lua_script = 
            "if redis.call('GET', KEYS[1]) == ARGV[1] then "
            "   return redis.call('DEL', KEYS[1]) "
            "else "
            "   return 0 "
            "end";

        redisReply* reply = (redisReply*)redisCommand(context_, "EVAL %s 1 %s %s", 
            lua_script, lock_key_.c_str(), uuid_.c_str());

        if (reply == nullptr) {
            std::cerr << "Redis EVAL command failed\n";
            return false;
        }

        bool success = false;
        if (reply->type == REDIS_REPLY_INTEGER && reply->integer == 1) {
            success = true;
            locked_ = false;
        }
        freeReplyObject(reply);
        return success;
    }

    ~RedisDistributedLock() {
        if (locked_) {
            unlock();
        }
    }

private:
    redisContext* context_;
    std::string lock_key_;
    std::string uuid_;
    int lock_timeout_ms_;
    bool locked_;
};

int main() {
    // 连接到 Redis
    redisContext* context = redisConnect("127.0.0.1", 6379);
    if (context == nullptr || context->err) {
        if (context) {
            std::cerr << "Connection error: " << context->errstr << "\n";
            redisFree(context);
        } else {
            std::cerr << "Connection error: can't allocate redis context\n";
        }
        return 1;
    }

    std::string lock_key = "my_distributed_lock";
    RedisDistributedLock lock(context, lock_key, 5000); // 锁超时 5 秒

    if (lock.lock()) {
        std::cout << "Lock acquired!\n";

        // 执行临界区代码
        std::this_thread::sleep_for(std::chrono::seconds(2));

        // 释放锁
        if (lock.unlock()) {
            std::cout << "Lock released!\n";
        } else {
            std::cout << "Failed to release lock.\n";
        }
    } else {
        std::cout << "Failed to acquire lock.\n";
    }

    // 关闭 Redis 连接
    redisFree(context);
    return 0;
}

依赖项:

  • hiredis:Redis 的 C 客户端库
  • libuuid:用于生成唯一标识符

编译示例:

确保已安装 hiredislibuuid,然后使用以下命令编译:

g++ -std=c++11 -o distributed_lock main.cpp -lhiredis -luuid -lpthread

解释:

  1. 生成唯一标识符:
    使用 libuuid 生成一个唯一的 UUID,确保每个客户端有独特的标识。
  2. 获取锁:
    通过 SET <key> <value> NX PX <timeout> 命令尝试设置锁。如果键不存在,则设置成功,返回 OK;否则获取失败。
  3. 释放锁:
    通过执行 Lua 脚本,确保只有拥有锁的客户端才能释放锁。脚本首先检查键的值是否与 UUID 匹配,匹配则删除键。
  4. 自动释放锁:
    在析构函数中,如果锁仍被持有,则尝试释放锁,以防止因忘记释放锁导致的死锁。

分布式锁的注意事项

  1. 锁超时:
    设置合理的锁超时时间,防止因客户端故障导致的死锁。确保超时时间足够完成临界区代码。
  2. 锁重入:
    分布式锁通常不支持重入。如果需要重入锁机制,需在应用层实现相关逻辑。
  3. 时钟同步:
    在使用 Redlock 等算法时,服务器之间时钟的同步至关重要,以确保锁超时的准确性。
  4. 高可用性:
    为了提高锁的可靠性,建议使用多个 Redis 实例(如集群模式或主从架构),确保单点故障不会影响锁的功能。
  5. 性能:
    分布式锁引入了网络延迟,需评估锁机制对系统性能的影响,避免过度使用锁导致性能瓶颈。

分布式锁在不同进程中的工作原理

基本原理

分布式锁依赖于一个共享的存储系统(如 Redis)来协调多个进程之间的锁状态。所有需要竞争同一资源的进程都会连接到同一个 Redis 实例,并尝试通过设置相同的锁键(lock_key)来获取锁。由于 Redis 的 SET 命令中的 NX 选项保证了键的唯一性,因此在同一时间只有一个进程能够成功获取锁。

关键点

  1. 共享的 Redis 服务器:
    • 所有进程必须连接到同一个 Redis 服务器实例或集群。如果分布在不同的机器上,确保网络连接稳定且 Redis 服务对所有进程可达。
  2. 统一的锁键 (lock_key):
    • 所有进程在尝试获取锁时,必须使用相同的锁键。这确保了他们竞争的是同一个锁。
  3. 唯一的标识符 (uuid):
    • 每个进程在尝试获取锁时生成一个唯一的 uuid,用于标识锁的持有者。这防止了一个进程错误地释放另一个进程的锁。
  4. 锁超时机制:
    • 锁的超时时间(lock_timeout_ms)应根据实际需求合理设置,以防止锁被长期占用,导致其他进程无法获取。

多进程使用示例

假设我们有两个独立的进程(进程 A 和进程 B),它们都尝试获取同一个锁:

进程 A

#include <hiredis/hiredis.h>
#include <string>
#include <chrono>
#include <thread>
#include <uuid/uuid.h>
#include <iostream>

// ... [RedisDistributedLock 类定义与之前相同] ...

int main() {
    redisContext* context = redisConnect("127.0.0.1", 6379);
    if (context == nullptr || context->err) {
        if (context) {
            std::cerr << "Connection error: " << context->errstr << "\n";
            redisFree(context);
        } else {
            std::cerr << "Connection error: can't allocate redis context\n";
        }
        return 1;
    }

    std::string lock_key = "my_distributed_lock";
    RedisDistributedLock lock(context, lock_key, 5000); // 锁超时 5 秒

    if (lock.lock()) {
        std::cout << "进程 A: 锁已获取!\n";

        // 执行临界区代码
        std::this_thread::sleep_for(std::chrono::seconds(10)); // 模拟长时间操作

        if (lock.unlock()) {
            std::cout << "进程 A: 锁已释放!\n";
        } else {
            std::cout << "进程 A: 释放锁失败。\n";
        }
    } else {
        std::cout << "进程 A: 获取锁失败。\n";
    }

    redisFree(context);
    return 0;
}

进程 B

#include <hiredis/hiredis.h>
#include <string>
#include <chrono>
#include <thread>
#include <uuid/uuid.h>
#include <iostream>

// ... [RedisDistributedLock 类定义与之前相同] ...

int main() {
    redisContext* context = redisConnect("127.0.0.1", 6379);
    if (context == nullptr || context->err) {
        if (context) {
            std::cerr << "Connection error: " << context->errstr << "\n";
            redisFree(context);
        } else {
            std::cerr << "Connection error: can't allocate redis context\n";
        }
        return 1;
    }

    std::string lock_key = "my_distributed_lock";
    RedisDistributedLock lock(context, lock_key, 5000); // 锁超时 5 秒

    if (lock.lock()) {
        std::cout << "进程 B: 锁已获取!\n";

        // 执行临界区代码
        std::this_thread::sleep_for(std::chrono::seconds(2));

        if (lock.unlock()) {
            std::cout << "进程 B: 锁已释放!\n";
        } else {
            std::cout << "进程 B: 释放锁失败。\n";
        }
    } else {
        std::cout << "进程 B: 获取锁失败。\n";
    }

    redisFree(context);
    return 0;
}

运行结果

假设进程 A 先启动并成功获取锁,进程 B 在 A 持有锁期间尝试获取锁会失败:

进程 A: 锁已获取!
进程 B: 获取锁失败。

当进程 A 释放锁后,如果进程 B 重新尝试获取锁(例如通过重试机制),它将能够成功获取锁。

增强多进程协作的建议

  1. 锁重试机制:
    • 在多个进程中,锁获取失败时可以实现重试机制,等待一定时间后再次尝试获取锁,或使用指数退避(exponential backoff)策略以减少争用。
  2. 续约机制:
    • 对于需要长时间持有锁的场景,可以实现锁续约机制,定期延长锁的有效期,防止因长时间操作导致锁过期。
  3. 错误处理与恢复:
    • 确保在进程崩溃或异常退出时,锁能够被正确释放。虽然 Redis 的锁超时机制在一定程度上缓解了这一问题,但结合适当的应用层错误处理是必要的。
  4. 高可用 Redis 部署:
    • 考虑使用 Redis 集群或主从架构,以提高 Redis 服务的可用性和容错性,避免 Redis 成为单点故障。
  5. 分布式锁库的使用:
    • 除了自定义实现,可以考虑使用现有的分布式锁库或框架,如 Redisson(虽然主要是 Java 的,但有类似的 C++ 库)或者其他专门为 C++ 设计的库,以简化开发并确保实现的可靠性。

完整多进程示例

以下是一个更完整的示例,展示如何在多个进程中使用上述 RedisDistributedLock 类,并实现重试机制:

锁类(RedisDistributedLock.hpp

#ifndef REDIS_DISTRIBUTED_LOCK_HPP
#define REDIS_DISTRIBUTED_LOCK_HPP

#include <hiredis/hiredis.h>
#include <string>
#include <uuid/uuid.h>
#include <iostream>

std::string generate_uuid() {
    uuid_t uuid;
    uuid_generate(uuid);
    char uuid_str[37]; // 36 characters + null terminator
    uuid_unparse(uuid, uuid_str);
    return std::string(uuid_str);
}

class RedisDistributedLock {
public:
    RedisDistributedLock(redisContext* context, const std::string& lock_key, int lock_timeout_ms = 10000)
        : context_(context), lock_key_(lock_key), lock_timeout_ms_(lock_timeout_ms), locked_(false) {
        uuid_ = generate_uuid();
    }

    // 尝试获取锁
    bool lock() {
        std::string command = "SET " + lock_key_ + " " + uuid_ + " NX PX " + std::to_string(lock_timeout_ms_);
        redisReply* reply = (redisReply*)redisCommand(context_, command.c_str());
        if (reply == nullptr) {
            std::cerr << "Redis command failed\n";
            return false;
        }

        bool success = false;
        if (reply->type == REDIS_REPLY_STATUS && std::string(reply->str) == "OK") {
            success = true;
            locked_ = true;
        }
        freeReplyObject(reply);
        return success;
    }

    // 尝试释放锁
    bool unlock() {
        if (!locked_) {
            return false;
        }

        const char* lua_script = 
            "if redis.call('GET', KEYS[1]) == ARGV[1] then "
            "   return redis.call('DEL', KEYS[1]) "
            "else "
            "   return 0 "
            "end";

        redisReply* reply = (redisReply*)redisCommand(context_, "EVAL %s 1 %s %s", 
            lua_script, lock_key_.c_str(), uuid_.c_str());

        if (reply == nullptr) {
            std::cerr << "Redis EVAL command failed\n";
            return false;
        }

        bool success = false;
        if (reply->type == REDIS_REPLY_INTEGER && reply->integer == 1) {
            success = true;
            locked_ = false;
        }
        freeReplyObject(reply);
        return success;
    }

    ~RedisDistributedLock() {
        if (locked_) {
            unlock();
        }
    }

private:
    redisContext* context_;
    std::string lock_key_;
    std::string uuid_;
    int lock_timeout_ms_;
    bool locked_;
};

#endif // REDIS_DISTRIBUTED_LOCK_HPP

进程代码(process.cpp

#include "RedisDistributedLock.hpp"
#include <string>
#include <chrono>
#include <thread>
#include <iostream>

int main(int argc, char* argv[]) {
    // 检查进程名称
    std::string process_name = (argc > 1) ? argv[1] : "Process";

    // 连接到 Redis
    redisContext* context = redisConnect("127.0.0.1", 6379);
    if (context == nullptr || context->err) {
        if (context) {
            std::cerr << process_name << ": Connection error: " << context->errstr << "\n";
            redisFree(context);
        } else {
            std::cerr << process_name << ": Connection error: can't allocate redis context\n";
        }
        return 1;
    }

    std::string lock_key = "my_distributed_lock";
    RedisDistributedLock lock(context, lock_key, 5000); // 锁超时 5 秒

    int retry_count = 5;
    int retry_delay_ms = 1000;

    for (int i = 0; i < retry_count; ++i) {
        if (lock.lock()) {
            std::cout << process_name << ": Lock acquired!\n";

            // 执行临界区代码
            std::this_thread::sleep_for(std::chrono::seconds(2));

            if (lock.unlock()) {
                std::cout << process_name << ": Lock released!\n";
            } else {
                std::cout << process_name << ": Failed to release lock.\n";
            }
            break;
        } else {
            std::cout << process_name << ": Failed to acquire lock. Retrying in " 
                      << retry_delay_ms << " ms...\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(retry_delay_ms));
        }

        if (i == retry_count - 1) {
            std::cout << process_name << ": Could not acquire lock after " << retry_count << " attempts.\n";
        }
    }

    redisFree(context);
    return 0;
}

编译与运行

  1. 编译:

    假设文件结构如下:

    .
    ├── RedisDistributedLock.hpp
    └── process.cpp
    

    使用以下命令编译:

    g++ -std=c++11 -o process process.cpp -lhiredis -luuid -lpthread
    
  2. 运行多个进程:

    在不同的终端窗口,启动多个进程实例:

    ./process Process_A
    ./process Process_B
    

预期输出

进程 A:

Process_A: Lock acquired!
Process_A: Lock released!

进程 B:

Process_B: Failed to acquire lock. Retrying in 1000 ms...
Process_B: Lock acquired!
Process_B: Lock released!

这样,进程 B 在进程 A 持有锁期间无法获取锁,但在进程 A 释放锁后,进程 B 成功获取并释放锁。具体结果取决于启动顺序和锁超时设置。

进一步优化与考虑

  1. 锁续租(Lock Renewal):
    • 如果某些进程需要在持有锁期间执行长时间操作,可以实现锁续租机制,定期重新设置锁的过期时间,防止在操作期间锁自动失效。
  2. 多 Redis 实例(Redlock):
    • 为了提高分布式锁的容错性,可以实现 Redlock 算法,该算法在多个独立的 Redis 实例上获取锁,以减少单点故障的风险。
  3. 使用现成的库:
    • 虽然自定义实现灵活,但使用现成的、经过社区验证的分布式锁库能够减少错误风险,并提供更多功能。例如,可以考虑使用专门的分布式锁库,如 cpp-redis 结合自定义逻辑,或者查找其他适用于 C++ 的分布式锁解决方案。
  4. 错误处理与日志记录:
    • 在实际应用中,增强错误处理和日志记录对调试和运维非常重要。确保所有 Redis 操作的错误都被妥善处理,并记录足够的日志信息。
  5. 性能优化:
    • 分布式锁涉及网络通信,可能会带来一定的延迟。在高性能要求的场景下,评估分布式锁对系统整体性能的影响,并优化锁的获取与释放逻辑。

原文地址:https://blog.csdn.net/secondtonone1/article/details/142844796

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!