自学内容网 自学内容网

Redis

1、Redis是什么?

非关系型数据库,以键值对(key-value)形式存储数据,也是一种NoSQL数据库
NoSQL可用于超大规模数据存储
image.png

2、有了Map为什么还需要Redis?

  • Map是Java提供的,是存在于虚拟机中的,所以是有大小限制的,不能存储大量数据
  • Map不能进行横向扩展和纵向扩展,但是Redis可以,并且Redis支持集群模式
  • Redis的读写速度很快
  • Redis支持多种持久化
  • Redis支持多种数据类型(字符串、哈希、列表、集合、有序集合…)
  • Redis支持多种数据淘汰策略(LRU算法、LFU 算法、随机、ttl、noeviction)

3、Redis能干嘛?

  • 可以存储用户的登录信息和认证信息
  • 系统的数据缓存(高速缓存)
  • 秒杀系统
  • 可以存储一些容忍丢失的数据
  • 接口防刷和限流

4、RESP协议

REdis Serialization Protocol,简称RESP
RESP是基于TCP传输的应用层协议,底层采用TCP传输
在RESP协议中,数据的类型取决于第一个字符:

  • +:单行字符串
  • -:表示错误的类型
  • *::表示整数
  • $:表示多行字符
  • *:表示数组
    在RESP协议中,构成协议的每一部分必须使用\r\n作为结束符
    示例:
set key value

*3\r\n        表示这个命令由三部分组成
$3\r\n        表示第一部分长度为3
set\r\n       表示第一部分的内容
$3\r\n        表示第二部分长度为3
key\r\n       表示第二部分的内容
$5\r\n        表示第三部分长度为35
value\r\n     表示第三部分的内容

5、通用命令

del key //删除一个键

exists key //是否存在一个键

expire key seconds //为一个键设置过期时间、单位秒

pexpire key millionSeconds //为一个键设置过期时间、单位毫秒

ttl key //查看键的剩余过期时间,单位秒

pttl key //查看键的过期时间,单位毫秒

keys * //查看所有的键

persist key //持久化键,相当于永不过期

type key //查看键的类型

select 0~15 //选择0~15之间的一个数据库

move key db //将一个键移动到另一个数据库中

flushdb //刷新数据库

dbsize //查看当前数据库中键的数量(当前数据库的大小)

lastsave //查看最近一次操作时间,时间戳

monitor //实时监控Redis接收到的命令

6、字符串命令

字符串(string)是Redis中的一种基本数据类型

set key value //为键设置一个值,如果存在就是修改,不存在就是添加

get key //获取一个键的值

mset key1 value1 key2 value2... //同时为多个键设置值

mget key1 key2 key3... //同时获取多个键的值

setex key seconds value //设置键的值,同时设置过期时间,单位秒

setnx key value //如果不存在键就为其设置值,如果存在就返回0,不改变原来的值

incr key //为整数键自增1

incrby key increment //指定键增加increment增量

decr key //为整数键自减1

decrby key increment //指定键减少increment增量

append key value //将value追加到指定key的末尾

7、Hash命令

Hash也是Redis中的一种基本数据类型

hset key field value //为键设置字段和值

hget key field //获取键中的字段值

hmset key field1 value1 field2 value2... //同时为键的多个字段设置值

hmget key field1 field2 filed3... //同时获取键中多个字段的值

hincrby key field increment //为键的字段设置增量

hsetnx key field value //不存在就添加字段和值

hexists key field //检查键中是否存在字段

hdel key field1 field2 ... //删除键中的字段

hgetall key //获取键中的所有字段和值

hkeys key //获取键中所有的字段

hvals key //获取键中所有的值

hlen key //获取键中字段的数量

8、List命令

lpush key value1 value2 value3.. //从左往右依次添加值到列表
rpush key value1 value2 value3.. //从右往左依次添加值到列表

lpushx key value //添加一个值到列表的头部
rpushx key value //添加一个值到列表的尾部

lpop key //从列表的尾部弹出一个值
rpop key //从列表的头部弹出一个值

lrange key start stop //从下标为start到下标为stop为列表进行切片,stop为-1表示最后一个

lindex key index //通过下标index取出列表中对用的值

llen key //获取列表的长度

lrem key count value //从列表中删除指定count数量的value,count>0代表从左往右依次寻找,count<0代表从右往左依次寻找,count==0代表全部删除

ltrim key start stop //对从下标start到stop区间的数据进行修剪,不在范围内的数据将会被删除

rpoplpush key1 key2 //从一个列表的尾部弹出一个值添加到另一个列表的头部

9、Set命令

使用场景:黑白名单、

sadd key member1 member2 ... //向set中添加成员

smembers key //获取set中所有的成员

spop key count //随机获取set中count数量(不写为1)的成员

sinter set1 set2 ... //获取set1和set2的交集

sunion set1 set2 ... //获取set1和set2的并集

sdiff set1 set2 ... //获取set1和set2的差集(以set顺序为准)

srem key member1 member2 ... //从set中删除指定的成员,不存在返回0

sismember key member //判断一个成员是否在set中,存在返回1,不存在返回0

10、Zset

使用场景:排行

zadd key score1 member1 score2 member2... //添加key设置成员及其分数

zincrby key increment member //为成员的分数设置增量(增量可以为正数或者负数)

zscore key member //获取成员的分数

zcard key //查看key中成员数量

zcount key min max //查找min到max范围的成员

zrem key member1 member2 ... //删除成员

zrange key start stop [withscores] //从下标start到stop根据分数对成员进行顺序排序

zrevrange key start stop [withscores] //从下标start到stop根据分数对成员进行倒序排序

zrangebyscore key min max [withscores] [limit offset count] //指定最小值到最大值区间内的成员进行顺序排序

zrevrangebyscore key max min [withscores] [limit offset count] //指定最大值到最小值区间内的成员进行逆序排序

11、Java连接redis

11.1、使用Socket与Redis交互

由于Redis的底层使用的是tcp传输,则可以通过Socket编程实现Java与Redis进行交互

[!note]
中文乱码未解决

public class RedisTest {  
  
    public static void main(String[] args) throws IOException {  
  
  
       // 创建Socket连接  
       Socket socket = new Socket("localhost", 6379);  
       OutputStream out = socket.getOutputStream();  
       InputStream in = socket.getInputStream();  
  
       // 构建Redis命令  
       String[] commands = {"set", "name", "liuxu"};  
  
       // 组装命令  
       StringBuilder sb = new StringBuilder();  
       int cc_len = commands.length;  
       sb.append("*").append(cc_len).append("\r\n");  
       for (String command : commands) {  
          int c_len = command.length();  
          sb.append("$").append(c_len).append("\r\n").append(command).append("\r\n");  
       }  
  
       // 将命令转化为RESP协议字符串  
       String command = sb.toString();  
       System.out.println(command);  
  
       // 发送命令  
       out.write(command.getBytes(StandardCharsets.UTF_8));  
       out.flush();  
  
       // 读取Redis响应  
       byte[] bytes = new byte[1024];  
       int length = in.read(bytes);  
       String response = new String(bytes, 0, length, StandardCharsets.UTF_8);  
  
       // 查询结果  
       String[] split = response.split("\r\n");  
  
       // 存储结果  
       List<Object> list = new ArrayList<>();  
  
       // 匹配开始符号:+ - $ : *  
       String start = String.valueOf(split[0].toCharArray()[0]);  
  
       // 匹配不同结果  
       if (split.length == 1 && !"-".equals(start)) {  
          list.add(Integer.parseInt(String.valueOf(split[0].toCharArray()[1])));  
       } else if (split.length == 1) {  
          list.add(split[0]);  
       } else {  
          list.addAll(Arrays.asList(split).subList(1, split.length));  
       }  
  
       // 处理不同结果  
       Object res;  
       // System.out.println(start);  
       switch (start) {  
          case "*":// 处理数组  
             res = list.stream().filter(i -> list.size() % 2 == 0).collect(Collectors.toList());  
             System.out.println(res);  
             break;  
          case ":":// 处理数字  
             Optional<Object> num = list.stream().findFirst();  
             res = num.orElse(null);  
             System.out.println(res);  
             break;  
          case "-":// 遇到错误信息  
             String error = list.get(0).toString();  
             throw new RuntimeException(error);  
          case "$":// 处理多行字符串  
             Optional<Object> first = list.stream().findFirst();  
             res = first.orElse(null);  
             System.out.println(res);  
             break;  
          case "+":// 处理单行字符串  
             Optional<Object> str = list.stream().findFirst();  
             res = str.orElse(null);  
             System.out.println(res);  
             break;  
       }  
    }  
}

11.2、使用Jedis

导入依赖

<dependency>  
    <groupId>redis.clients</groupId>  
    <artifactId>jedis</artifactId>  
    <version>5.1.2</version>  
</dependency>

11.2.1、存储字符串

创建一个简单的命令

@Test
public static void set() {  
    // 1. 连接Redis  
    Jedis jedis = new Jedis("127.0.0.1", 6379);  
    // 2. 操作Redis - 因为Redis的命令是什么,Jedis的方法就是什么  
    jedis.set("name", "李四");  
    // 3. 释放资源  
    jedis.close();  
}

@Test
public static void get(){  
    // 1. 连接Redis  
    Jedis jedis = new Jedis("127.0.0.1", 6379);  
    // 2. 操作Redis - 因为Redis的命令是什么,Jedis的方法就是什么  
    String s = jedis.get("name");  
    System.out.println(s);  //李四
    // 3. 释放资源  
    jedis.close();  
}

11.2.2、存储字符数组

11.2.3、SCAN 操作

使用命令keys * 来查询所有的键是阻塞的,但是使用scan是非阻塞的

11.2.3.1、字符串的scan
public void scan() {  
    Jedis jedis = new Jedis("localhost", 6379);  
    String cursor = ScanParams.SCAN_POINTER_START; // 游标,记录的是下标  
    ScanParams params = new ScanParams().match("*a*").count(3); // 参数,对应的是扫描条件  
    do {  
       ScanResult<String> result = jedis.scan(cursor, params);// 每次扫描的结果集  
       List<String> list = result.getResult();// 从结果集中获取查询的结果  
       list.forEach(System.out::println);  
       cursor = result.getCursor();// 从结果集中获取游标  
       System.out.println(cursor);  
       System.out.println("==========================");  
    } while (!cursor.equals(ScanParams.SCAN_POINTER_START));// 当游标归0的时候代表扫描完成  
    jedis.close();  
}
11.2.3.2、Hash的hscan
public void hscan() {  
    Jedis jedis = new Jedis("localhost", 6379);  
    String cursor = ScanParams.SCAN_POINTER_START; // 游标,记录的是下标  
    ScanParams params = new ScanParams().match("*a*").count(3); // 参数,对应的是扫描条件  
    do {  
    //hscan传入的参数: key cursor params
       ScanResult<Map.Entry<String, String>> result = jedis.hscan("hash", cursor, params);// 每次扫描的结果集  
       List<Map.Entry<String, String>> list = result.getResult();// 从结果集中获取查询的结果  
       list.forEach(entry -> System.out.println(entry.getKey() + ":" + entry.getValue()));  
       cursor = result.getCursor();// 从结果集中获取游标  
       // System.out.println(cursor);  
       System.out.println("==========================");  
    } while (!cursor.equals(ScanParams.SCAN_POINTER_START));// 当游标归0的时候代表扫描完成  
    jedis.close();  
}
11.2.3.3、Set的sscan
public void sscan() {  
    Jedis jedis = new Jedis("localhost", 6379);  
    String cursor = ScanParams.SCAN_POINTER_START; // 游标,记录的是下标  
    ScanParams params = new ScanParams().match("*a*").count(3); // 参数,对应的是扫描条件  
    do {  
       ScanResult<String> result = jedis.sscan("set", cursor, params);// 每次扫描的结果集  
       List<String> list = result.getResult();// 从结果集中获取查询的结果  
       list.forEach(System.out::println);  
       cursor = result.getCursor();// 从结果集中获取游标  
       // System.out.println(cursor);  
       System.out.println("==========================");  
    } while (!cursor.equals(ScanParams.SCAN_POINTER_START));// 当游标归0的时候代表扫描完成  
    jedis.close();  
}
11.2.3.4、ZSet的zscan
public void zscan() {  
    Jedis jedis = new Jedis("localhost", 6379);  
    String cursor = ScanParams.SCAN_POINTER_START; // 游标,记录的是下标  
    ScanParams params = new ScanParams().match("*a*").count(3); // 参数,对应的是扫描条件  
    do {  
       ScanResult<Tuple> result = jedis.zscan("zset", cursor, params);  
       List<Tuple> list = result.getResult();  
       list.forEach(it -> System.out.println(it.getElement() + ":" + it.getScore()));  
       cursor = result.getCursor();// 从结果集中获取游标  
       // System.out.println(cursor);  
       System.out.println("==========================");  
    } while (!cursor.equals(ScanParams.SCAN_POINTER_START));// 当游标归0的时候代表扫描完成  
    jedis.close();  
}
11.2.3.5、redisTemplate 的扫描操作

springboot 3.3 之前不支持 redisTemplate 直接的 scan 动作
引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
public List<Object> scan(String pattern, int count) {  
    RedisSerializer<?> keySerializer = redisTemplate.getKeySerializer();  
    ScanOptions options = ScanOptions.scanOptions().match(pattern).count(count).build();  
    return redisTemplate.execute((RedisCallback<List<Object>>) connection -> {  
       List<Object> keys = new ArrayList<>();  
       Cursor<byte[]> cursor = connection.scan(options);  
       while (cursor.hasNext()) {  
          byte[] next = cursor.next();  
          keys.add(keySerializer.deserialize(next));  
       }  
       cursor.close();  
       return keys;  
    });  
}  
  
  
/**  
 * 使用redisTemplate.opsForHash()封装scan  
 * * @param key  
 * @param pattern  
 * @param count  
 * @return  
 */  
public List<Map.Entry<Object, Object>> hScan(String key, String pattern, int count) {  
    List<Map.Entry<Object, Object>> list = new ArrayList<>();  
    Cursor<Map.Entry<Object, Object>> cursor = redisTemplate.opsForHash().scan(key, ScanOptions.scanOptions().match(pattern).count(count).build());  
    while (cursor.hasNext()) {  
       Map.Entry<Object, Object> next = cursor.next();  
       list.add(next);  
    }  
    return list;  
}  
  
public Set<Object> sScan(String key, String pattern, int count) {  
    Set<Object> result = new HashSet<>();  
    Cursor<Object> cursor = redisTemplate.opsForSet().scan(key, ScanOptions.scanOptions().match(pattern).count(count).build());  
    while (cursor.hasNext()) {  
       result.add(cursor.next());  
    }  
    return result;  
}  
  
public Set<ZSetOperations.TypedTuple<Object>> zScan(String key, String pattern, int count) {  
    Set<ZSetOperations.TypedTuple<Object>> result = new HashSet<>();  
    Cursor<ZSetOperations.TypedTuple<Object>> cursor = redisTemplate.opsForZSet().scan(key, ScanOptions.scanOptions().match(pattern).count(count).build());  
    while (cursor.hasNext()) {  
       result.add(cursor.next());  
    }  
    return result;  
}

11.2.4、 Redis 数据类型应用场景

  • String:点赞数、评论数、粉丝数等(incr

  • Hash:购物车、抢购、限购等

    因为数据的存储形式应该为

    用户:{key1:value1, key2:value2,…….}

  • List:消息队列、最新评论、最近回复等

    List能够实现左近右出,所以能做消息队列

    List能够实现先进后出,所以能够做最新评论、最近回复

  • Set:交集、并集、差集、黑白名单等。

  • ZSet:延迟队列、排行榜、限流等

    由于ZSet的每个成员都存储了score和member,所以能够实现排序

    延迟队列:使用每个对象的未来处理时间作为score,当前时间等于score时进行处理

    排行榜:使用每个对象的分数、访问次数等作为score,根据score进行zrevrangebyscore操作

    限流:使用每个对象的时间片段内的访问次数作为score,超过阈值jiu

11.3、Jedis连接池

public void jedisPoolTest() {  
    // 创建 JedisPoolConfig 对象来配置连接池参数  
    JedisPoolConfig config = new JedisPoolConfig();  
    config.setMaxTotal(50); // 最大连接数(可根据实际需求调整)  
    config.setMaxIdle(10); // 最大空闲连接数  
    config.setMinIdle(0); // 最小空闲连接数  
  
    // 创建 Jedis 连接池对象  
    JedisPool jedisPool = new JedisPool(config, "localhost", 6379);  
  
    // 从连接池中获取连接  
    Jedis jedis = jedisPool.getResource();  
  
    // 使用 Jedis 进行操作(示例:设置一个键值对)  
    jedis.set("name", "liuxu");  
    System.out.println(jedis.get("name"));  
  
    // 关闭连接,将其归还到连接池中(注意:不是真正关闭连接)  
    jedis.close();  
  
    // 关闭连接池  
    jedisPool.close();  
}

11.4、Jedis管道操作

管道操作,是将客户端的多个命令进行打包,然后一次性发送给服务器进行执行,管道执行的多条命令实际上相当于一次命令,大大减少了网络传输的开销。需要注意到是用pipeline方式打包命令发送,redis必须在处理完所有命令前先缓存起所有命令的处理结果。打包的命令越多,缓存占用的内存就越多,所以并不是打包的命令越多越好

public void pipeline() {  
    Jedis jedis = new Jedis("localhost", 6379);  
    Pipeline pipeline = jedis.pipelined();  
    for (int i = 0; i < 20; i++) {  
       pipeline.incrBy("age", i);  
    }  
    List<Object> objects = pipeline.syncAndReturnAll();  
    objects.forEach(System.out::println);  
    jedis.close();  
}

11.5、Redis事务

Redis的事务需要使用到watch机制,开启事务之前需要对某个键进行监听,如果在事务执行之前对这个键进行了修改,那么事务会自动取消;事务执行完毕或事务被取消,对键的监听自动取消

  • watch(key):对某个键开启监听
  • 需要输入的命令:被放入一个队列中
  • exec():执行事务
  • discard():取消执行事务

Redis事务的原理: 在开启事务后,会执行一系列的命令,但这些命令并非真正执行,而是将这些命令放在一个队列中。如果执行事务,那么这个队列中的命令全部执行,如果取消了事务,这个队列中的命令全部作废

public void transactional() throws InterruptedException {  
    Jedis jedis = new Jedis("localhost", 6379);  
    jedis.set("name", "张三");  
    jedis.set("age", "20");  
    jedis.watch("name"); // 监听 键name 一旦name键被其他的操作修改,该键上的事务自动取消  
    System.out.println("-------------开始执行事务---------------");  
    Transaction transaction = jedis.multi();// 开启事务  
    Thread.sleep(10000L);// 睡眠过程中可以修改name和age的值,这样可以使得watch监听自动消除,事务从而取消  
    transaction.set("name", "李四");  
    transaction.set("age", "25");  
    transaction.exec();// 执行事务  
    System.out.println("-------------事务执行完毕---------------");  
    List<String> list = jedis.mget("name", "age");  
    list.forEach(System.out::println);  
    jedis.close();  
}

11.6、Redis密码设置

Redis的密码可以通过修改安装目录下的redis.windows.conf的requirepass配置

requirepass 123456

使用命令行连接

redis-cli.exe auth 123456

使用 Jedis 连接

Jedis jedis = new Jedis("localhost", 6379);
jedis.auth("123456")

使用 JedisPool 连接

JedisConfig config = new JedisConfig();
JedisPool pool = new JedisPool(config, "localhost", 6379, 2000, "123456");
Jedis jedis = pool.getResource();

11.7、持久化策略

Redis 3. x 后提供的持久化机制有三种:RDB、AOF(Append Only File)、混合持久化。其中 RDB 是默认的持久化策略,AOF 是关闭的

11.7.1、RDB

使用快照的方式将数据以二进制的形式存储在文件中,其数据存储在硬盘上。二进制的数据对于计算机来说传输方便、速度页快。对快照的配置主要由两个参数决定

  • 时间(秒)
  • 改动键的个数

在默认的配置文件中已经有三个示例

save 900 1 # 900秒内至少改动1个键
save 300 10 # 300秒内至少改动10个键
save 60 10000 # 60秒内至少改动10000个键

由此可以看到,RDB 是在一个时间范围内出发事件才会持久化,如果在时间范围内没有触发事件就有数据丢失的风险
并且在 RDB 模式下,首先会清除 rdb 文件中所有的文件,再将下一次快照写入 rdb 文件,所以也有数据丢失的风险
既然 RDB 有数据丢失的风险,为什么还要使用?
因为 RDB 是以二进制的格式存储数据,在进行数据同步的时候速度很快,所以要使用 RDB

11.7.2、AOF

AOF 持久化存储的是一个文本文件,速度相对于 RDB 比较慢,而且后期文件比较大,传输困难。
在 aof 文件中存储的是每一次操作的 RESP 协议的命令 [[#4、RESP协议]]
可以在配置文件中开启 AOF 设置

appendonly yes # 开启AOF持久化
appendfilename "appendonly.aof" # AOF持久化文件

#appendfsync always # 每一条命令都会持久化,性能较低
appendfsync everysec # 每一秒都会持久化,折中的方案
#appendfsync no # 不同步,数据保存在内存中

11.7.3、混合持久化

开启混合持久化

aof-use-rdb-preamble yes # 开启混合持久化

使用 AOF 进行持久化时,如果在短时间内执行了大量同样的操作,就会记录很多相同的命令,这样 aof 文件就会很大,所以需要进行优化。如果需要优化就需要手动开启 bgrewriteaof 命令来优化。
Redis 提供了自动开启混合持久化

auto-aof-rewrite-percentage 100 # 触发的条件:下面设置的文件大小增加100%,也就是1倍,才会重写
auto-aof-rewrite-min-size 64mb # aof文件大小达到64M才可能重写记录的命令

此时 aof 文件中存储的就是二进制数据和文本数据

重复的命令只会记录一次文本,其余的命令以二进制形式存储

11.8、数据淘汰策略

Redis服务器的内存是有限的,硬件一旦确定了,内存大小也就确定了。当不停的向内存中写数据时,就会出现内存写满的情况,此时,就需要使用到Redis的数据淘汰策略了。Redis中设计了多种数据的淘汰策略,详情如下:

volatile主要针对的是设置了过期时间的key,如果需要淘汰redis中的数据,那么这些设置了过期时间的key优先被淘汰,如果空间依然不足,那么才会在没有设置过期时间的key进行淘汰。

allkeys主要针对的是所有的键,不管有没有设置过期时间

maxmemory-policy volatile-lru  #这个就是配置缓存的淘汰策略的
maxmemory <bytes> #这个是配置Redis的缓存的大小

11.8.1、LRU

# 优先淘汰掉设置了过期时间的key,然后才淘汰掉使用的比较少的key,假设key没有设置过期时间,那么不会优先淘
# 汰,这种模式也是在开发中使用的比较多的一种缓存策略模式
#volatile-lru -> Evict using approximated LRU among the keys with an expire set.
# 对所有key通用,优先删除最近最少使用的Key
# allkeys-lru -> Evict any key using approximated LRU.

11.8.2、LFU

# Redis中存储的每一个key都有一个内部时钟,当key使用频率高时,内部时钟会递增,当key使用频率低时,内部时钟会
# 递减,该策略是淘汰设置了过期时间且内部时钟最小的key
# volatile-lfu -> Evict using approximated LFU among the keys with an expire set.
# Redis中存储的每一个key都有一个内部时钟,当key使用频率高时,内部时钟会递增,当key使用频率低时,内部时钟会
# 递减,该策略是淘汰所有key中内部时钟最小的key
# allkeys-lfu -> Evict any key using approximated LFU.

11.8.3、random

# 随机淘汰具有过期时间的key
# volatile-random -> Remove a random key among the ones with an expire set.
# 淘汰的是随机的key
# allkeys-random -> Remove a random key, any key.

11.8.4、volatile-ttl

# 根据key剩余的过期时间(ttl值)来进行数据淘汰,  ttl值越小,越先被淘汰
# volatile-ttl -> Remove the key with the nearest expire time (minor TTL)

11.8.5、noeviction

# 只要缓存满了,就不继续服务器里面的写请求,读请求是可以完成的,这种模式缓存里面的所有数据都不会丢失,但会导致
# 参与Redis的业务会失败
# noeviction -> Don't evict anything, just return an error on write operations.

12、Redis集群

12.1、主从复制

主从复制:蒋前一台服务器的数据复制到其他服务器,前者叫主节点(master),后者叫从节点(slave);数据的复制是单向的,只能从主节点到从节点
image.png

12.1.1、工作流程:

  • 从服务器连接主服务器,发送SYNC命令;
  • 主服务器接收到SYNC命名后,开始执行BGSAVE命令生成RDB文件并使用缓冲区记录此后执行的所有写命令;
  • 主服务器BGSAVE执行完后,向所有从服务器发送快照文件,并在发送期间继续记录被执行的写命令;
  • 从服务器收到快照文件后丢弃所有旧数据,载入收到的快照;
  • 主服务器快照发送完毕后开始向从服务器发送缓冲区中的写命令;
  • 从服务器完成对快照的载入,开始接收命令请求,并执行来自主服务器缓冲区的写命令;(从服务器初始化完成
  • 主服务器每执行一个写命令就会向从服务器发送相同的写命令,从服务器接收并执行收到的写命令(从服务器初始化完成后的操作

12.1.2、优点:

  • 主从复制实现了读写分离
  • salve能够降低 Master 的读的压力
  • Master 和 salve 都是非阻塞的,即同步的同时也能够提供服务
  • 主从复制是高可用的基石

12.1.3、缺点:

  • 不具备自动容错和恢复的功能,主节点和从节点宕机都会导致部分读写失败,只能等待重启或者手动切换 ip
  • 较难支持在线扩容
  • 主机宕机,切换 IP 后还有可能导致数据不一致的问题(主机还有部分数据未同步到从服务器)

12.1.4、Jedis 访问主从复制

引入依赖

<dependency>  
    <groupId>redis.clients</groupId>  
    <artifactId>jedis</artifactId>  
    <version>5.1.2</version>  
</dependency>
@Test
public void MS(){
Jedis master = new Jedis("localhost",6379);
Jedis slave1 = new Jedis("localhost",6380);
Jedis slave2 = new Jedis("localhost",6381);
master.auth("123456");
slave1.auth("123456");
slave2.auth("123456");

//从节点绑定主节点
slave1.slaveof("localhost",6379);
slave2.slaveof("localhost",6379);

//主节点写入,从节点读取
master.set("name", "liuxu");
String name1 = slave1.get("name");
String name2 = slave2.get("name");
System.out.println(name1);
System.out.println(name2);
}

12.2、哨兵模式

image.png

12.2.1、主要功能

哨兵模式是建立在主从的基础之上的,主要功能有两个:

  • 监控主服务器和从服务器是否正常运行
  • 主服务器宕机之后自动将从服务器升级为主服务器

12.2.2、工作流程

  • 每个Sentinel(哨兵)进程以每秒钟一次的频率向整个集群中的Master主服务器,Slave从服务器以及其他Sentinel(哨兵)进程发送一个 PING 命令。
  • 如果一个实例(instance)距离最后一次有效回复 PING 命令的时间超过 down-after-milliseconds 选项所指定的值, 则这个实例会被 Sentinel(哨兵)进程标记为主观下线(SDOWN)
  • 如果一个Master主服务器被标记为主观下线(SDOWN),则正在监视这个Master主服务器的所有 Sentinel(哨兵)进程要以每秒一次的频率确认Master主服务器的确进入了主观下线状态
  • 当有足够数量的 Sentinel(哨兵)进程(大于等于配置文件指定的值)在指定的时间范围内确认Master主服务器进入了主观下线状态(SDOWN), 则Master主服务器会被标记为客观下线(ODOWN)
  • 在一般情况下, 每个 Sentinel(哨兵)进程会以每 10 秒一次的频率向集群中的所有Master主服务器、Slave从服务器发送 INFO 命令。
  • 当Master主服务器被 Sentinel(哨兵)进程标记为客观下线(ODOWN)时,Sentinel(哨兵)进程向下线的 Master主服务器的所有 Slave从服务器发送 INFO 命令的频率会从 10 秒一次改为每秒一次。
  • 若没有足够数量的 Sentinel(哨兵)进程同意 Master主服务器下线, Master主服务器的客观下线状态就会被移除。若 Master主服务器重新向 Sentinel(哨兵)进程发送 PING 命令返回有效回复,Master主服务器的主观下线状态就会被移除。
  • 若有足够数量的 Sentinel(哨兵)进程同意 Master主服务器下线,监视该服务的所有sentinel(哨兵)将进行协商,并选出一个领头的sentinel(哨兵),然后对Master 主服务器进行故障转移操作:
    • 从所有Slave从服务器里面挑选一个Slave从服务器,将其转成主服务器,领头Sentinel(哨兵)向被选出从服务器(新主)发送SLAVEOF no one命令,以每秒一次的频率向被升级的从服务器发送INFO命令,观察回复的角色信息,直到Slave从服务器变成Master主服务器。
    • 之前下线的Master主服务器下的所有Slave从服务器改为复制新的Master主服务器
    • 将之前下线的Master主服务器改为新的Master主服务器下的Slave从服务器

12.2.3、优点:

  • 具有主从复制的优点
  • 主从可以自动切换,更健壮,可用性更高

12.2.4、缺点:

  • 较难支持在线扩容

12.2.5、Jedis 访问哨兵模式

引入依赖

<dependency>  
    <groupId>redis.clients</groupId>  
    <artifactId>jedis</artifactId>  
    <version>5.1.2</version>  
</dependency>

测试案例

@Test
public void Sentinel(){
Set<String> sentinels = new HashSet<>();
//只需要配置哨兵的服务器信息即可
sentinels.add(new HostAndPort("localhost", 6379).toString());
sentinels.add(new HostAndPort("localhost", 6380).toString());
sentinels.add(new HostAndPort("localhost", 6381).toString());
JedisSentinelPool pool = new JedisSentinelPool("master", sentinels, "123456");
Jedis jedis = pool.getResource();
jedis.select(1);
String result = jedis.set("success", "OK");
System.out.println(result);
jedis.close();
pool.close();
}

12.2.6、springboot 集成 Redis 哨兵

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

修改 application. yaml 文件

spring:
redis:
  sentinel:
    master: master
    nodes:
    - 127.0.0.1:6379
    - 127.0.0.1:6380
    - 127.0.0.1:6381
  jedis:
    pool:
      max-active: 100
      max-idle: 10
      min-idle: 3
      max-wait: -1
  password: 123456

Redis 配置类

@Configuration  
public class RedisConfig {  
  
    @Bean  
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory){  
       RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();  
       redisTemplate.setConnectionFactory(factory);  
  
       StringRedisSerializer keySerializer = new StringRedisSerializer();  
  
       redisTemplate.setKeySerializer(keySerializer);  
       redisTemplate.setHashKeySerializer(keySerializer);  
  
       Jackson2JsonRedisSerializer<?> valueSerializer = new Jackson2JsonRedisSerializer<>(Object.class);  
       ObjectMapper mapper = new ObjectMapper();  
       mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);  
       mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);  
       valueSerializer.setObjectMapper(mapper);  
  
       redisTemplate.setValueSerializer(valueSerializer);  
       redisTemplate.setHashValueSerializer(valueSerializer);  
       return redisTemplate;  
    }  
}

测试案例

@Test  
public void sentinelTest(){  
    redisUtil.set("name", "liuxu");  
    Object name = redisUtil.get("name");  
    System.out.println(name);  
}

12.3、Redis Cluster

主从复制的缺陷:
主从复制的容量限制取决于主服务器和从服务器之间最小的容量,并且主从复制实现在线扩容比较难;并且主从复制是读写分离的,可以通过增加从服务器的容量来降低读的压力, 但是写和存储的上限就是 master 的上限,因此小数据是可以使用哨兵模式,但是大数据量怎么办呢?
从 Redis 3.0 开始,redis 集群可以采用去中心化的方式搭建,客户端直接和节点相连,通过相同的 hash 算法计算出 key 对应的 slot(槽),然后直接在对应的槽上进行响应的操作。每个不同的节点之间通过 gossip 协议交换相互的状态,以及探测新加入节点的信息。redis 集群支持动态加入节点、动态迁移 slot,以及自动故障转移
image.png

图片中的命令不支持 Redis 3.0.504 版本

Cluster 集群至少由三个 master 才能组成一个集群,每个 master 至少挂载一个 salve。
对于每个节点来说,每个 master 可以挂载至少一个 slave,这些 salve 只有数据备份的功能;只有当 master 挂机了,才让 slave 替代 master 成为新的 master。

12.3.1、数据分片:

Cluster 集群采用了数据分片(sharding)而不是非一致性哈希(consistency hashing)来实现:一个 Redis 集群包含了 16384 个哈希槽(hash slot),而存储的键就属于这 16384 个哈希槽之一,计算公式
C R C 16 ( k e y ) m o d    16384 CRC16(key) \mod 16384 CRC16(key)mod16384
集群中的每个节点就处理自己对应部分哈希槽(hash slot)上的数据,每个节点上的 slot 数量是均分的。
假如集群由三个节点组成:

  • 节点 A 负责[0-5100]区间的 slot
  • 节点 A 负责[5101-11000]区间的 slot
  • 节点 A 负责[11001-16383]区间的 slot
    如果新添加了一个节点 D,只需要将 A、B、C 节点上的某些哈希槽迁移到 D 上就可以了
    如果要删除节点 A,只需要将 A 节点上的 slot 迁移到其他节点,然后再移除节点 A

12.3.2、故障转移

由于集群中每个节点是主从复制的模式,所以当 master 出现故障的时候,集群会自动将某个 slave 升级为 master,当原来的 master 恢复之后,再重新成为主节点的从节点

12.3.3、数据访问

Cluster 是一个去中心化的集群,节点之间通过 meet 命令相互握手,通过 gossip 进行状态感知。每个节点会保存一份数据分布表,并将自己的 slot 信息发送给其他节点。
当需要访问数据的时候,客户端会先尝试执行,如果操作的 key 正好在对应的 slot 区间,就由负责这部分 slot 的节点处理,如果不在,就会返回 MOVED 错误和这个 key 对应的 slot 区间所属的节点。
错误信息:在访问集群的时候,节点可能返回 ASK 错误。这是因为 key 所在的节点的 slot 正在迁移所导致的,如果向 slot 的源节点访问,如果 key 在源节点上能执行成功,如果不在,就会返回 ASK 错误,描述信息会附带目标迁移节点的信息。客户端要先向目标迁移节点发送 ASKING 命令,再执行原来的命令。

12.3.4、Jedis 访问 Cluster 集群

//创建一连接,JedisCluster对象,在系统中是单例存在
Set<HostAndPort> nodes = new HashSet<>();
nodes.add(new HostAndPort("121.199.174.183", 7001));
nodes.add(new HostAndPort("121.199.174.183", 7002));
nodes.add(new HostAndPort("121.199.174.183", 7003));
nodes.add(new HostAndPort("121.199.174.183", 7004));
nodes.add(new HostAndPort("121.199.174.183", 7005));
nodes.add(new HostAndPort("121.199.174.183", 7006));
JedisCluster cluster = new JedisCluster(nodes);
//执行JedisCluster对象中的方法,方法和redis一一对应。
cluster.set("cdqf2107", "快毕业了");
String result = cluster.get("cluster-test");
System.out.println(result);
//程序结束时需要关闭JedisCluster对象
cluster.close();

12.3.5、springboot 集成

引入依赖

<!--springboot对redis支持-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--jackosn序列化支持-->
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
  <groupId>com.fasterxml.jackson.datatype</groupId>
  <artifactId>jackson-datatype-jdk8</artifactId>
</dependency>

<!--支持redis存储的时候将jdk8提供的新的时间类存储进去-->
<dependency>
  <groupId>com.fasterxml.jackson.datatype</groupId>
  <artifactId>jackson-datatype-jsr310</artifactId>
</dependency>

修改 application. yaml 文件

spring:  
  redis:  
    cluster:  
      nodes:  
        - 10.7.183.60:6379  
        - 10.7.183.60:6380  
        - 10.7.183.60:6381  
        - 10.7.183.60:6382  
        - 10.7.183.60:6383  
        - 10.7.183.60:6384  
    jedis:  
      pool:  
        max-active: 100  
        max-idle: 10  
        min-idle: 3  
        max-wait: -1

配置类(直接拷贝)

@Configuration
public class RedisConfig {
    //spring data提供的整合包在使用的时候,核心类都是XxxTemplate
    //我们整合的redis核心类就是RedisTemplate

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory){
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory); //设置RedisTemplate使用的redis连接工厂

        RedisSerializer<String> keySerializer = new StringRedisSerializer();

        ObjectMapper mapper = new ObjectMapper();//对象映射器,因为要把对象转换成JSON格式
        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
        //对象映射器配置哪些属性可见:PropertyAccessor.ALL表示所有的属性可以是可以转换成JSON格式的
        //Visibility.ANY表示类中定义的属性使用任何访问修饰符都可以
        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
//        LocalDateTime
//        LocalDate
//        LocalTime
        mapper.registerModule(new JavaTimeModule());
        //支持JDK8的相关处理
        mapper.registerModule(new Jdk8Module());
        //支持存储地理位置
        mapper.registerModule(new GeoModule());
        GenericJackson2JsonRedisSerializer valueSerializer = new GenericJackson2JsonRedisSerializer(mapper);

        template.setKeySerializer(keySerializer);
        template.setValueSerializer(valueSerializer);

        template.setHashKeySerializer(keySerializer);
        template.setHashValueSerializer(valueSerializer);
        return template;
    }
}

测试案例

@Test  
public void sentinelTest(){  
    redisUtil.set("name", "liuxu");  
    Object name = redisUtil.get("name");  
    System.out.println(name);  
}

13、分布式锁

13.1、缓存击穿

缓存击穿是指缓存中的 key 失效了,请求直接打到了数据库,描述为在缓存上开了个洞
image.png
解决方案:给查询加上一个互斥锁,只允许一个线程去查询数据,其他线程等待

13.2、缓存穿透

缓存穿透是指要查询的数据缓存中没有,数据库也没有,一般为恶意攻击
image.png
解决方案:即使数据库中没有数据,也在缓存中设置一个 null 或者 “” 或者使用布隆过滤器

13.2.1、布隆过滤器

类似于 HashMap 的实现,布隆过滤器会提供多个哈希函数计算值对应的下标,对于哈希表来说,多次插入的值通过计算得到的下标位置可能重复,所以只能知道改下标位置上可能有值,但是两次的值是否相同却并不知道,产生的原因是 hash 冲突,所以布隆过滤器有一个误判率 FPP;但是如果位置上没有值,就能判断一定没有这个值。
利用布隆过滤器我们可以预先把数据查询的主键,比如用户 ID 或文章 ID 缓存到过滤器中。当根据 ID 进行数据查询的时候,我们先判断该 ID 是否存在,若存在的话,则进行下一步处理。若不存在的话,直接返回,这样就不会触发后续的数据库查询。需要注意的是缓存穿透不能完全解决,我们只能将其控制在一个可以容忍的范围内。

13.3、缓存雪崩

缓存雪崩与缓存击穿类似,缓存雪崩是指同一时间缓存中的数据大面积失效,那么请求就大面积打到数据库上,可能导致数据库压力过大从而宕机
image.png
解决方案:为缓存数据的过期时间加上一个随机值(毫秒、纳秒等小单位随机值)

13.4、如何实现一个分布式锁

分布式锁的原理是使用 Redis 的特性:key 是唯一的。如果 Redis 中不存在某个 key,则可以设置某个 key,如果某个 key 已经存在,则不会存在第二个相同的 key。我们 iiu 可以在 Redis 中添加一个 key 并为其设置一个过期时间,表示该线程获取到了锁,等待业务完成后再删除这个 key释放锁,其余线程再获取锁,这样就实现了一个分布式锁。
需要注意的是,如果 key 的过期时间小于业务处理时间,则需要续约。对于这个问题,Redission 采用看门狗(watch dog)来处理,其主要就是使用 Timer 和 TimeTask 来实现。当 key 快过期的时候,计时器就执行计时器任务为这个 key 续约,直到业务完成取消计时器,再删除 key 释放锁。
其次,对于删除出现的问题(业务时间比 key 的过期时间长,业务完成时,锁已经被其他线程拿到,这个线程删除 key 释放锁就可能删除其他线程的 key 导致冲突),可以将 key 的值设置为当前线程的 ID(不一定,但是能够区分两个不同线程),再执行后续操作
对于判断是否获取到锁和删除锁两个步骤需要同时执行,就要使用 lua 脚本来实现(lua 脚本具有原子性)

-- 删除锁 判断key是否存在,存在就删除key,返回1,否则返回0
if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end

-- 添加锁 判断是否存在key,存在就设置值和过期时间,返回1,否则返回0
if redis.call('exists', KEYS[1]) == 0 then redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2]); return 1; else return 0; end

-- 续约,如果key存在,就重置过期时间,返回1,否则返回0
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('pexpire', KEYS[1], ARGV[2]) else return 0 end

14、RedisUtil. java

配置文件直接复制 [[#12.3.5、springboot 集成]]

@Component  
public class RedisService {  
  
  
    @Autowired  
    private RedisTemplate<String, Object> redisTemplate;  
  
    @Autowired  
    private JedisPool jedisPool;  

  
    public void set(String key, Object value) {  
       redisTemplate.opsForValue().set(key, value);  
    }  
  
    public void set(String key, Object value, long expire, TimeUnit unit) {  
       redisTemplate.opsForValue().set(key, value, expire, unit);  
    }  
  
    public void setNx(String key, Object value, long expire, TimeUnit unit) {  
       redisTemplate.opsForValue().setIfAbsent(key, value, expire, unit);  
    }  
  
    public Object get(String key) {  
       return redisTemplate.opsForValue().get(key);  
    }  
  
    public void hset(String key, String field, Object value) {  
       redisTemplate.opsForHash().put(key, field, value);  
    }  
  
    public void hmset(String key, Map<String, Object> fieldValues) {  
       redisTemplate.opsForHash().putAll(key, fieldValues);  
    }  
  
    public Object hget(String key, String field) {  
       return redisTemplate.opsForHash().get(key, field);  
    }  
  
    public Map<Object, Object> hgetAll(String key) {  
       return redisTemplate.opsForHash().entries(key);  
    }  
  
    public boolean hexists(String key, String field) {  
       return redisTemplate.opsForHash().hasKey(key, field);  
    }  
  
    public void hdelete(String key, String... fields) {  
       redisTemplate.opsForHash().delete(key, (Object) fields);  
    }  
  
    public void lpush(String key, Object value) {  
       redisTemplate.opsForList().leftPush(key, value);  
    }  
  
    public void rpush(String key, Object value) {  
       redisTemplate.opsForList().rightPush(key, value);  
    }  
  
    public Object lpop(String key) {  
       return redisTemplate.opsForList().leftPop(key);  
    }  
  
    public Object rpop(String key) {  
       return redisTemplate.opsForList().rightPop(key);  
    }  
  
    public List<Object> range(String key, long start, long end) {  
       return redisTemplate.opsForList().range(key, start, end);  
    }  
  
    public void sadd(String key, Object... values) {  
       redisTemplate.opsForSet().add(key, values);  
    }  
  
    public boolean sIsMember(String key, Object value) {  
       return Boolean.TRUE.equals(redisTemplate.opsForSet().isMember(key, value));  
    }  
  
    public Set<Object> sinter(String key, String... otherKeys) {  
       return redisTemplate.opsForSet().intersect(key, Arrays.asList(otherKeys));  
    }  
  
    public Set<Object> sunion(String key, String... otherKeys) {  
       return redisTemplate.opsForSet().union(key, Arrays.asList(otherKeys));  
    }  
  
    public Set<Object> sdiff(String key, String... otherKeys) {  
       return redisTemplate.opsForSet().difference(key, Arrays.asList(otherKeys));  
    }  
  
    public void zadd(String key, Object value, double score) {  
       redisTemplate.opsForZSet().add(key, value, score);  
    }  
  
    public void zrem(String key, Object... values) {  
       redisTemplate.opsForZSet().remove(key, values);  
    }  
  
    public Long zcard(String key) {  
       return redisTemplate.opsForZSet().zCard(key);  
    }  
  
    public Set<Object> zrange(String key, long start, long end) {  
       return redisTemplate.opsForZSet().range(key, start, end);  
    }  
  
    public Set<Object> zRevRange(String key, long start, long end) {  
       return redisTemplate.opsForZSet().reverseRange(key, start, end);  
    }  
  
    public Set<Object> zRangeByScore(String key, double min, double max) {  
       return redisTemplate.opsForZSet().rangeByScore(key, min, max);  
    }  
  
    public Set<Object> zRevRangeByScore(String key, double min, double max) {  
       return redisTemplate.opsForZSet().reverseRangeByScore(key, min, max);  
    }  
  
    public Set<ZSetOperations.TypedTuple<Object>> zRangeByScoreWithScore(String key, double min, double max) {  
       return redisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max);  
    }  
  
    public Set<ZSetOperations.TypedTuple<Object>> zRevRangeByScoreWithScore(String key, double min, double max) {  
       return redisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, min, max);  
    }  
  
    public List<Object> scan(String pattern, int count) {  
       RedisSerializer<?> keySerializer = redisTemplate.getKeySerializer();  
       ScanOptions options = ScanOptions.scanOptions().match(pattern).count(count).build();  
       return redisTemplate.execute((RedisCallback<List<Object>>) connection -> {  
          List<Object> keys = new ArrayList<>();  
          Cursor<byte[]> cursor = connection.scan(options);  
          while (cursor.hasNext()) {  
             byte[] next = cursor.next();  
             keys.add(keySerializer.deserialize(next));  
          }  
          cursor.close();  
          return keys;  
       });  
    }  
  
  
    /**  
     * 使用redisTemplate.opsForHash()封装scan  
     *     * @param key  
     * @param pattern  
     * @param count  
     * @return  
     */  
    public List<Map.Entry<Object, Object>> hScan(String key, String pattern, int count) {  
       List<Map.Entry<Object, Object>> list = new ArrayList<>();  
       Cursor<Map.Entry<Object, Object>> cursor = redisTemplate.opsForHash().scan(key, ScanOptions.scanOptions().match(pattern).count(count).build());  
       while (cursor.hasNext()) {  
          Map.Entry<Object, Object> next = cursor.next();  
          list.add(next);  
       }  
       return list;  
    }  
  
    public Set<Object> sScan(String key, String pattern, int count) {  
       Set<Object> result = new HashSet<>();  
       Cursor<Object> cursor = redisTemplate.opsForSet().scan(key, ScanOptions.scanOptions().match(pattern).count(count).build());  
       while (cursor.hasNext()) {  
          result.add(cursor.next());  
       }  
       return result;  
    }  
  
    public Set<ZSetOperations.TypedTuple<Object>> zScan(String key, String pattern, int count) {  
       Set<ZSetOperations.TypedTuple<Object>> result = new HashSet<>();  
       Cursor<ZSetOperations.TypedTuple<Object>> cursor = redisTemplate.opsForZSet().scan(key, ScanOptions.scanOptions().match(pattern).count(count).build());  
       while (cursor.hasNext()) {  
          result.add(cursor.next());  
       }  
       return result;  
    }  
  
    /**  
     * 使用Jedis封装hscan  
     *     * @param key  
     * @param pattern  
     * @param count  
     * @return  
     */  
    @Deprecated  
    public List<Map.Entry<String, String>> hscan(String key, String pattern, int count) {  
       List<Map.Entry<String, String>> entryList = new ArrayList<>();  
       String cursor = String.valueOf(0);  
       ScanParams params = new ScanParams().match(pattern).count(count);  
       do {  
          ScanResult<Map.Entry<String, String>> result = jedisPool.getResource().hscan(key, cursor, params);  
          List<Map.Entry<String, String>> list = result.getResult();  
          entryList.addAll(list);  
          cursor = result.getCursor();  
       } while (!cursor.equals(String.valueOf(0)));  
       return entryList;  
    }  
  
    @Deprecated  
    public List<Object> sscan(String key, String pattern, int count) {  
       List<Object> setList = new ArrayList<>();  
       String cursor = String.valueOf(0);  
       ScanParams params = new ScanParams().match(pattern).count(count);  
       do {  
          ScanResult<String> result = jedisPool.getResource().sscan(key, cursor, params);  
          setList.addAll(result.getResult());  
          cursor = result.getCursor();  
       } while (!cursor.equals(String.valueOf(0)));  
       return setList;  
    }  
  
    @Deprecated  
    public List<Tuple> zscan(String key, String pattern, int count) {  
       List<Tuple> setList = new ArrayList<>();  
       String cursor = String.valueOf(0);  
       ScanParams params = new ScanParams().match(pattern).count(count);  
       do {  
          ScanResult<Tuple> result = jedisPool.getResource().zscan(key, cursor, params);  
          setList.addAll(result.getResult());  
          cursor = result.getCursor();  
       } while (!cursor.equals(String.valueOf(0)));  
       return setList;  
    }  
  
  
    public void tryLock(String lockKey, long expire, TimeUnit unit, BusinessWork work) {  
       long currentTime = System.currentTimeMillis();  // 系统当前时间  
       long millis = unit.toMillis(expire); // key的过期时间  
       long maxTime = currentTime + millis; // key最后存活的最大时间点  
       while (System.currentTimeMillis() <= maxTime) {  
          String luaScript = "if redis.call('exists', KEYS[1]) == 0 then redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2]); return 1; else return 0; end";  
          RedisScript<Integer> script = new DefaultRedisScript<>(luaScript, Integer.class);  
          long lockValue = Thread.currentThread().getId();  
          Integer result = (Integer) redisTemplate.execute(script, Collections.singletonList(lockKey), Arrays.asList(lockValue, millis));  
          if (result != null && result == 1) {// 拿到了分布式锁  
             Timer timer = new Timer(); // 创建一个计时器对象  
             TimerTask task = new TimerTask() {// 创建一个计时器任务  
                @Override  
                public void run() {  
                   String s = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('pexpire', KEYS[1], ARGV[2]) else return 0 end";  
                   RedisScript<Integer> script = new DefaultRedisScript<>(s, Integer.class);  
                   Integer result = (Integer) redisTemplate.execute(script, Collections.singletonList(lockKey), Arrays.asList(lockValue, millis));  
                   if (result != null && result == 1) {  
                      System.out.println("续约成功");  
                   }  
                }  
             };  
             long period = 2 * millis / 3;  
             timer.schedule(task, period, period);// 计时器调度计时器任务  
             // 这里就可以执行相关的业务操作了  
             work.processBusiness();  
             // 业务执行完成后,直接将计时器给取消掉  
             timer.cancel();  
             // 业务处理器完成后,还需要释放分布式锁  
             luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";  
             script = new DefaultRedisScript<>(luaScript, Integer.class);  
             result = (Integer) redisTemplate.execute(script, Collections.singletonList(lockKey), Collections.singletonList(lockValue));  
             if (result != null && result == 1) {  
                // 释放分布式锁成功  
                System.out.println("释放分布式锁成功");  
             }  
             break;  
          }  
       }  
    }  
  
}

15、自制 Redis 服务启动包

https://gitee.com/liuxu0206/picture/raw/master/typora/markdown_pic/my-redis-spring-boot-starter.jar

<dependency>  
    <groupId>cn.cnmd.redis</groupId>  
    <artifactId>my-redis-spring-boot-starter</artifactId>  
    <version>1.0.0</version>  
</dependency>

原文地址:https://blog.csdn.net/qq_70314704/article/details/140599885

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!