学习Redis
第十章 Redis
回顾
1. Nginx作为虚拟主机怎么配
http{
server{
listen 端口号
host_name localhost;
location / {
root 磁盘位置;
index index.html;
}
}
}
2. Nginx 反向代理配置
location /test/ {
proxy_pass http://服务器IP:服务器端口;
}
3. Nginx负载均衡配置
upstream 服务器列表名称 { # 默认使用的负载均衡规则是轮询
server 服务器IP:服务器端口;
server 服务器IP:服务器端口;
}
upstream 服务器列表名称 {
server 服务器IP:服务器端口 weight=数值; #数值越大的机器说明性能越优越,那么分配的请求当然更多
server 服务器IP:服务器端口 weight=数值;
}
upstream 服务器列表名称 {
ip_hash; # 根据用户请求的IP求得其hash值,然后根据hash值来计算应该将该请求分配在哪一台服务器
server 服务器IP:服务器端口;
server 服务器IP:服务器端口;
}
location /test/ {
proxy_pass http://服务器列表名称/;
}
第一节 Redis介绍
1. Redis是什么
Redis全称为 Remote Dictionary Server,表示远程字典服务器,是跨平台的非关系型数据库。Redis 是一个开源的使用键值对(Key-Value)存储数据库,也是一种NoSQL数据库。
NoSQL,指的是非关系型的数据库。NoSQL有时也称作Not Only SQL的缩写,是对不同于传统的关系型数据库的数据库管理系统的统称。
NoSQL用于超大规模数据的存储。(例如谷歌或Facebook每天为他们的用户收集万亿比特的数据)。这些类型的数据存储不需要固定的模式,无需多余操作就可以横向扩展。
2. 有Map了为啥还要Redis?
- map存储数据是在 JVM 的内存中,但 JVM 的内存是有限的 不能大量的存储数据
- map不能进行横向和纵向扩展, 但Redis可以,Redis支持集群模式
- Redis与map一样,也是基于内存的(所有数据都是放到内存的) – 速度也快,读的速度是110000次/s,写的速度是81000次/s 。
- Redis支持多种持久化模式(把内存的数据 可以直接保存到硬盘)
- Redis支持多种数据类型,字符串(String)、哈希(Hash)、列表(list)、集合(set)和有序集合(sorted set)
- Redis支持多种数据的淘汰策略(LRU算法)
3. Redis能干什么
- 可以用于存储用户的登陆信息和认证信息(单点登录 SSO,在分布式系统中的任一一台服务器登录,访问其他的服务器均不需要再进行登录)
- Redis中最大的功能可能就是做系统数据的缓存(这是这个时代的一个技术特点)
- 可以用来做秒杀系统
- 用来存储一些可以容忍丢失的数据
因为Redis本身是基于内存的,怎么都有可能存在数据丢失的风险,所以Redis的适用场景,一定是对数据要求 不严格的地方,比如:评论数、点赞数、最热商品 - 接口的防刷和限流
4. Redis中的数据存储
5. RESP协议
Redis 的客户端和服务端之间采取了一种名为 Redis序列化的协议(REdis Serialization Protocol,简称RESP),是基于 TCP 的应用层协议 ,RESP 底层采用的是 TCP 的连接方式,通过 TCP 进行数据传输,然后根据解析规则解析相应信息。
在RESP协议中,数据的类型取决于第一个字节:
- +开始表示单行字符串
- -开始表示错误类型
- :开始表示整数
- $开始表示多行字符串
- *开始表示数组
在RESP协议中,构成协议的每一部分必须使用\r\n作为结束符
示例:
SET key value
*3\r\n #3表示这个命令由3部分组成
$3\r\n # 第一部分的长度是3
SET\r\n # 第一部分的内容
$3\r\n # 第二部分的长度是3
key\r\n # 第二部分的内容
$5\r\n # 第三部分的长度是5
value\r\n # 第三部分的内容
第二节 安装Redis
安装redis 及 redis 图形化界面
第三节 Redis 命令
1. 通用命令
因为redis存储数据采用的是key-value进行存储,不管value是什么类型,针对的key操作都属于通用操作
del key #删除键
exists key #检测键是否存在,1-表示存在,0-表示不存在
expire key seconds #为键设置过期时间 单位是秒
pexpire key milliSeconds #为键设置过期时间 单位是毫秒
keys pattern #使用正则表达式查找键,尽量不要使用这个名来来查找,这个名来查找时可能会造成服务器卡顿
persist key #持久化键,过期时间就相当于没有了
ttl key #获取键的剩余过期时间 单位是秒
pttl key #获取键的剩余过期时间 单位是毫秒
type key #获取键的存储的值的类型
select 0~15 #选择操作的库
move key db #移动键到另外一个库中
flushdb #清空当前所在的数据库
flushall #清空全部数据库
dbsize #查看当前数据库中有多少个键
lastsave #查看最后一次操作的时间
monitor #实时监控Redis服务接收到的命令
2. String命令
set key value #设置键的值,如果存在就是修改,不存在就是增加
get key #获取键的值
mset key value[key value ...] #批量设置键的值
mget key [key ...] #批量获取键的值
setex key seconds value #设置键的值,同时设置键的过期时间
setnx key value #当键不存在时才设置键的值
incr key #将键存储的值增加1,只有存储数字的时候有效
incrby key increment #将键存储的值增加给定的增量,只有存储数字的时候有效
decr key #将键存储的值减去1,只有存储数字的时候有效
decrby key decrement #将键存储的值减少给定减量,只有存储数字的时候有效
append key value #当键存在且存储的值是一个字符串时,将值追加到存储的字符串的末尾
3. Hash命令
hset key field value #设置键存储的字段和值
hget key field #获取键存储的字段值
hmset key field value[field value ...] #批量设置键的存储的字段和值
hmget key [field ...] #批量获取键存储的字段值
hincrby key field increment #键存储的字段值自增1
hsetnx key field value #不存在字段就添加
hexists key field #检查键存储的字段是否存在
hdel key field [field ...] #批量删除键中存储的字段
hgetall key #获取当前键存储的字段和值
hkeys key #获取当前键存储的所有字段
hvals key #获取当前键存储的所有值
hlen key #获取当前键存储的字段数量
4. List命令
#存储数据(从左侧插入数据,从右侧插入数据)
lpush key value [value ...]
rpush key value [value ...]
lpushx key value #将一个值插入到已存在的列表头部
rpushx key value #为已存在的列表添加值
lset key index value #通过索引设置列表元素的值
#弹栈方式获取数据(左侧弹出数据,从右侧弹出数据)
lpop key
rpop key
lrange key start stop #获取列表指定范围内的元素 -1表示末尾
lindex key index #获取指定索引位置的数据
llen key #获取列表的长度
#删除列表中的数据(他是删除当前列表中的count个value值,count > 0从左侧向右侧删除,count < 0从右侧向左侧删除,count == 0删除列表中全部的value)
lrem key count value
#对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。
ltrim key start stop
#将一个列表中最后的一个数据,插入到另外一个列表的头部位置
rpoplpush list1 list2
5. Set命令
#存储数据
sadd key member [member ...]
#获取数据(获取全部数据)
smembers key
#随机获取一个数据(获取的同时,移除数据,count默认为1,代表弹出数据的数量)
spop key [count]
#交集(取多个set集合交集)
sinter set1 set2 ...
#并集(获取全部集合中的数据)
sunion set1 set2 ...
#差集(获取多个集合中不一样的数据)
sdiff set1 set2 ...
#删除数据
srem key member [member ...]
#查看当前的set集合中是否包含这个值
sismember key member
6. Zset (Sorted Set)命令
#添加数据(score必须是数值。member不允许重复的。)
zadd key score member [score member ...]
#修改member的分数(如果member是存在于key中的,正常增加分数,如果memeber不存在,这个命令就相当于zadd)
zincrby key increment member
#查看指定的member的分数
zscore key member
#获取zset中数据的数量
zcard key
#根据score的范围查询member数量
zcount key min max
#删除zset中的成员
zrem key member [member...]
#根据分数从小到大排序,获取指定范围内的数据(withscores如果添加这个参数,那么会返回member对应的分数)
zrange key start stop [withscores]
#根据分数从大到小排序,获取指定范围内的数据(withscores如果添加这个参数,那么会返回member对应的分数)
zrevrange key start stop [withscores]
#根据分数的返回去获取member(withscores代表同时返回score,添加limit,就和MySQL中一样,如果不希望等于min或者max的值被查询出来可以采用 ‘(分数’ 相当于 < 但是不等于的方式,最大值和最小值使用+inf和-inf来标识)
zrangebyscore key min max [withscores] [limit offset count]
#根据分数的返回去获取member(withscores代表同时返回score,添加limit,就和MySQL中一样)
zrevrangebyscore key max min [withscores] [limit offset count]
思考:在添加一个键的时候,需要判断该键是否存在,同时还需要设置该键的过期时间?
按照常规思维来说可以使用两步完成,setnx 命令进行判断 setex命令设置过期时间。但是这样做会存在一个问题,在极端情况下,Redis 执行完 setnx 命令后宕机了或者断电了,那么 setex 命令就没有得到执行,这个添加进去的键就没有过期时间。因此,在做这种操作的需要考虑的是操作的原子性。
第四节 Java 连接 Redis
<dependencies>
<!-- jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.20</version>
</dependency>
</dependencies>
1. Jedis 操作 Redis
1.1 字符串形式存储
public class Demo1 {
@Test
public void set(){
//1. 连接Redis
Jedis jedis = new Jedis("121.199.174.183",6379);
//2. 操作Redis - 因为Redis的命令是什么,Jedis的方法就是什么
jedis.set("name","李四");
//3. 释放资源
jedis.close();
}
@Test
public void get(){
//1. 连接Redis
Jedis jedis = new Jedis("121.199.174.183",6379);
//2. 操作Redis - 因为Redis的命令是什么,Jedis的方法就是什么
String value = jedis.get("name");
System.out.println(value);
//3. 释放资源
jedis.close();
}
}
1.2 字节数组形式存储
1.3 SCAN 操作
SCAN操作可以根据提供的匹配方式和扫描数量来进行扫描,但是每次扫描的结果不一定与扫描数量匹配,只是返回一个在扫描数量范围左右的结果。可能比扫描数量多,也可能比扫描的数量少。
@Test
public void scan(){
Jedis jedis = new Jedis("121.199.174.183", 6379);
jedis.select(0);
jedis.flushDB();
jedis.mset("name", "张三", "age", "20", "address", "四川成都", "phone", "13612345678", "class", "计科1班");
//scan就是扫描 扫描需要配置扫描的条件 这个条件就是ScanParams
ScanParams params = new ScanParams().match("*a*").count(3);
//这个静态变量表示的就是光标开始的位置,默认值是0
String cursor = ScanParams.SCAN_POINTER_START;
do {
//扫描后会得到扫描的结果ScanResult
ScanResult<String> scanResult = jedis.scan(cursor, params);
List<String> result = scanResult.getResult();
result.forEach(System.out::println);
System.out.println("===================");
cursor = scanResult.getStringCursor();
//当光标位置再次归0 表示扫描完成
} while (!ScanParams.SCAN_POINTER_START.equals(cursor));
jedis.close();
}
@Test
public void hscan(){
Jedis jedis = new Jedis("121.199.174.183", 6379);
jedis.select(1);
jedis.flushDB();
Map<String, String> map = new HashMap<>();
map.put("name", "张三");
map.put("age", "20");
map.put("address", "四川成都");
map.put("phone", "13612345678");
map.put("class", "计科1班");
jedis.hmset("user", map);
ScanParams params = new ScanParams().match("a*").count(3);
String cursor = ScanParams.SCAN_POINTER_START;
do {
ScanResult<Map.Entry<String, String>> scanResult = jedis.hscan("user", cursor, params);
List<Map.Entry<String, String>> result = scanResult.getResult();
result.forEach(System.out::println);
System.out.println("===================");
cursor = scanResult.getStringCursor();
} while (!ScanParams.SCAN_POINTER_START.equals(cursor));
jedis.close();
}
@Test
public void sscan(){
Jedis jedis = new Jedis("121.199.174.183", 6379);
jedis.select(2);
jedis.flushDB();
jedis.sadd("infos", "bac","abc","cab","cba","acb");
ScanParams params = new ScanParams().match("a*").count(3);
String cursor = ScanParams.SCAN_POINTER_START;
do{
ScanResult<String> infos = jedis.sscan("infos", cursor, params);
List<String> result = infos.getResult();
result.forEach(System.out::println);
System.out.println("=====================");
cursor = infos.getStringCursor();
} while (!ScanParams.SCAN_POINTER_START.equals(cursor));
jedis.close();
}
@Test
public void zscan(){
Jedis jedis = new Jedis("121.199.174.183", 6379);
jedis.select(3);
jedis.flushDB();
Random r = new Random();
List<String> members = Arrays.asList("bac","abc","cab","cba","acb");
Map<String, Double> membersAndScores = new HashMap<>();
members.forEach(str->membersAndScores.put(str, (double)r.nextInt(100)));
jedis.zadd("infos", membersAndScores, ZAddParams.zAddParams().nx());
ScanParams params = new ScanParams().match("*a*").count(3);
String cursor = ScanParams.SCAN_POINTER_START;
do{
ScanResult<Tuple> infos = jedis.zscan("infos", cursor, params);
List<Tuple> result = infos.getResult();
result.forEach(t-> System.out.println(t.getElement() + "=>" + t.getScore()));
System.out.println("===============");
cursor = infos.getStringCursor();
}while (!ScanParams.SCAN_POINTER_START.equals(cursor));
jedis.close();
}
1.4 Redis 数据类型应用场景
-
String: 点赞数、评论数、粉丝数、查询缓存等
Jedis jedis = new Jedis("127.0.0.1", 6379); jedis.incr("comments");//将评论数增加1
-
Hash:购物车、抢购、限购、限量发放优惠券、激活码等
Jedis jedis = new Jedis("127.0.0.1", 6379); //用户添加购物车时,先判断有无该商品的记录 if(jedis.hexists("zhangsan", "goods0001")){ //有记录就在原来的数量上增加 jedis.hincrBy("zhangsan", "goods0001", 3); } else { //没有记录就执行新增 jedis.hset("zhangsan", "goods001", "3"); }
Jedis jedis = new Jedis("127.0.0.1", 6379); //参与抢购的商品编号和抢购的数量 Map<String,String> goods = new HashMap<>(); goods.put("goods0001", "100"); goods.put("goods0002", "100"); goods.put("goods0003", "100"); goods.put("goods0004", "100"); //p0001商家参与抢购的商品信息 jedis.hmset("p0001", goods); //当用户抢购一件商品时,减少售卖数量 jedis.hincrBy("p0001", "goods0001", -1);
-
List: 消息队列、最新评论、最近回复等
//消息队列 可以简单的理解为存放消息的队列,既然是队列,就具有先进先出的特性 //使用redis来模拟这个特性就是消息队列 Jedis jedis = new Jedis("127.0.0.1", 6379); //用户发出的第一条消息 jedis.lpush("message_queue", "第一条消息"); //用户发出的第二条消息 jedis.lpush("message_queue", "第二条消息"); //用户发出的第三条消息 jedis.lpush("message_queue", "第三条消息"); //服务器端只需要依次取就可以了 while (true){ //如果message_queue不存在,那么就阻塞等待1秒,如果还不存在,就抛出异常 List<String> messages = jedis.brpop(1000,"message_queue"); messages.forEach(System.out::println); }
//消息队列 可以简单的理解为存放消息的队列,既然是队列,就具有先进先出的特性 //使用redis来模拟这个特性就是消息队列 Jedis jedis = new Jedis("127.0.0.1", 6379); jedis.lpush("comments", "第一条评论"); jedis.lpush("comments", "第二条评论"); jedis.lpush("comments", "第三条评论"); jedis.lpush("comments", "第四条评论"); jedis.lpush("comments", "第五条评论"); jedis.lpush("comments", "第六条评论"); //展示评论列表,评论展示都是最后的评论在最前,取前5条展示 List<String> comments = jedis.lrange("comments", 0, 4); comments.forEach(System.out::println);
-
Set:交集、差集、并集、黑白名单等。如好友、关注、粉丝、感兴趣的人集合
Jedis jedis = new Jedis("127.0.0.1", 6379); //无效的IP地址列表,只要在集合中,就说明这些IP存在非法操作,也就是所谓的黑名单 jedis.sadd("invalidIpAddress", "10.121.11.76","10.121.11.85","10.121.11.32","10.121.11.21","10.121.11.54"); String currentIp = "10.121.11.85"; if(jedis.sismember("invalidIpAddress", currentIp)){ System.out.println("黑名单用户"); }
Jedis jedis = new Jedis("127.0.0.1", 6379); //zhangsan和lisi好友列表 jedis.sadd("zhangsan", "lisi", "longhua", "ligang"); jedis.sadd("jinfeng", "longqiang", "ligang", "qiqi"); //求张三和李四两人共同的好友 Set<String> friends = jedis.sinter("zhangsan", "jinfeng"); System.out.println("============两人共同的好友=============="); friends.forEach(System.out::println); Set<String> allFriends = jedis.sunion("zhangsan", "jinfeng"); System.out.println("============两人所有的好友=============="); allFriends.forEach(System.out::println);
-
Zset:延迟队列、排行榜、限流等
//所谓的延迟队列,就是指队列中的消息需要在给定的时间点执行,而这个时间点一定是在将来 Jedis jedis = new Jedis("127.0.0.1", 6379); long time = System.currentTimeMillis(); //消息制作:消息的分数是一个时间戳,这样,根据分数来查就是根据时间戳查 jedis.zadd("delay_queue", time + 500, "500毫秒后消费的消息"); jedis.zadd("delay_queue", time + 1000, "1000毫秒后消费的消息"); jedis.zadd("delay_queue", time + 1500, "1500毫秒后消费的消息"); while (true){ long date = System.currentTimeMillis(); System.out.println("当前消费时间:" + date); //按分数来查询,这里的分数用的是当前时间 Set<Tuple> messages = jedis.zrangeByScoreWithScores("delay_queue", 0, date); messages.forEach(t-> { System.out.println(t.getElement() + " => " + (long)t.getScore()); jedis.zrem("delay_queue", t.getElement()); }); Thread.sleep(100); }
Jedis jedis = new Jedis("127.0.0.1", 6379); Random r = new Random(); String[] names = {"AA", "BB","CC","DD","EE", "FF","GG"}; for(String name: names){ int score = r.nextInt(100000) + 10000; System.out.println(name + " => " + score); jedis.zadd("attack", score, name); } System.out.println("攻击排行榜"); Set<Tuple> attacks = jedis.zrevrangeWithScores("attack", 0, 4); attacks.forEach(t -> System.out.println(t.getElement() + " => " + t.getScore()));
//假设每个用户5秒内访问了10次,则将被限流,后续请求将不再被处理
Jedis jedis = new Jedis("127.0.0.1", 6379);
Random r = new Random();
String[] names = {"AA", "BB","CC","DD","EE", "FF","GG"};
long currentTime = System.currentTimeMillis();
for(String name: names){
//使用随机数模拟访问次数
int times = r.nextInt(20);
System.out.println(name + " => 访问 " + times + "次");
for(int i=0; i<times; i++){
//这里考虑sorted set的原因是 key只存在一个,但可以有多个得分,这里使用的是时间来作为得分,那么统计的时候就
//只需要统计5秒内的成员数量,这样就能得到5秒内的访问次数。虽然使用hash也能得到这样的访问次数,但是hash在移出
//过期的记录时没有规则,sorted set可以根据得分范围来进行移出,也就是可以通过时间范围来进行移出,这样可以避免
//记录无限增长的情况
jedis.zadd(name, currentTime- r.nextInt(5000), name + System.nanoTime());
}
jedis.zremrangeByScore(name, 0, currentTime - 5000);//移出前5秒的访问记录
}
System.out.println("=========================================");
Set<String> results = jedis.zrevrangeByScore(name, time, time - 5000);//统计5秒内访问次数
if(results.size() >= 10){
System.out.println(name + " => 被限流了");
} else {
System.out.println(name + " => 当前访问次数为" + results.size() + ",可以继续访问");
}
2. Jedis 连接池
说到连接池,大家自然而然会想到我们之前学习过的数据源连接池,其作用就是为了避免频繁的创建和关闭连接而带来的性能开销。使用Redis连接池操作也是一样的道理。
@Test
public void jedisPoolTest(){
//JedisPool pool = new JedisPool("121.199.174.183", 6379);
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(100); //连接池中最大活跃数
poolConfig.setMaxIdle(10); //最大空闲数
poolConfig.setMinIdle(5); //最小空闲数
poolConfig.setMaxWaitMillis(2000);//连接池空了后,2秒内未获取jedis就超时
JedisPool pool = new JedisPool(poolConfig,"121.199.174.183", 6379);
Jedis jedis = pool.getResource(); //获取资源,JedisPool池中的资源就是Jedis
jedis.set("test", "连接池");
jedis.close();
pool.close();
}
3. Jedis管道操作
在操作Redis的时候,执行一个命令需要先发送请求到Redis服务器,这个过程需要经历网络的延迟,Redis 还需要给客户端一个响应。如果需要一次性执行多个命令,这种方式的效率就很低下了。为了解决这个问题,可以通过Redis的管道,先将命令放到客户端的一个Pipeline中,之后一次性的将全部命令都发送到Redis服务,Redis服务一次性的将全部的返回结果响应给客户端。
@Test
public void jedisPipeLineTest(){
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(100); //连接池中最大活跃数
poolConfig.setMaxIdle(10); //最大空闲数
poolConfig.setMinIdle(5); //最小空闲数
poolConfig.setMaxWaitMillis(2000);//连接池空了后,2秒内未获取jedis就超时
JedisPool pool = new JedisPool(poolConfig,"121.199.174.183", 6379);
Jedis jedis = pool.getResource(); //获取资源,JedisPool池中的资源就是Jedis
Pipeline pipeline = jedis.pipelined();
pipeline.set("name","张三");
pipeline.set("sex","男");
pipeline.set("age","20");
List<Object> results = pipeline.syncAndReturnAll();
results.forEach(System.out::println);
jedis.close();
pool.close();
}
4. Redis 事务
Redis 事务的实现需要使用到watch监听机制,在开启事务之前,先使用watch监听器机制将涉及到的相关键进行监听。在开启事务后,如果有其他操作对事务涉及到的键进行了修改,则事务自动取消。如果事务执行完成或者事务被取消,则watch监听自动消除。
- 开启事务命令:multi
- 输入要执行的命令:被放入到一个队列中
- 执行事务命令:exec
- 取消事务命令:discard
Redis事务的原理: 在开启事务后,会执行一系列的命令,但这些命令并非真正执行,而是将这些命令放在一个队列中。如果执行事务,那么这个队列中的命令全部执行,如果取消了事务,这个队列中的命令全部作废
@Test
public void jedisTransactionTest() throws InterruptedException {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(100);
config.setMaxIdle(10);
config.setMinIdle(5);
config.setMaxWaitMillis(2000);
JedisPool pool = new JedisPool(config, "121.199.174.183", 6379);
Jedis jedis = pool.getResource();
jedis.set("name", "张三");
jedis.set("age", "20");
jedis.watch("name"); //监听 键name 一旦name键被其他的操作修改,该键上的事务自动取消
Thread.sleep(10000L);//睡眠过程中可以修改name和age的值,这样可以使得watch监听自动消除,事务从而取消
Transaction transaction = jedis.multi();//开启事务
transaction.set("name", "李四");
transaction.set("age", "25");
List<Object> objects = transaction.exec();//执行事务
objects.forEach(System.out::println);
jedis.close();
pool.close();
}
5. Redis 密码设置
在安装目录下找到 redis.windows.conf
文件,这个文件就是 Redis 的配置文件。可以在该配置文件中配置 Redis 密码。
requirepass 密码
在 windows 下,密码生效首先需要关闭 Redis 服务, 然后在命令行中进入 Redis 安装目录,然后执行命令
redis-service.exe redis.windows.conf
redis-cli 连接 Redis: 首先需要执行命令
auth 密码
Jedis 连接 Redis:
Jedis jedis = new Jedis("121.199.174.183", 6379);
jedis.auth(密码);
JedisPool 连接 Redis:
JedisPool pool = new JedisPool(config, "121.199.174.183", 6379, 2000, 密码);
6. Redis 持久化机制
Redis 提供了三种持久化机制: RDB 、 AOF(Append Only File)和混合持久化。其中 RDB 是 Redis 的默认持久化机制。 AOF 默认情况下是关闭的。
-
RDB
RDB持久化文件存储的是一个二进制的文件,速度比较快,传输也很方便。RDB持久化是通过快照(snapshotting)完成的,当符合一定条件时Redis会自动将内存中的所有数据进行快照并存储在硬盘上。进行快照的条件可以由用户在配置文件中自定义,由两个参数构成:时间(秒)和改动的键的个数。当在指定的时间内被更改的键的个数大于指定的数值时就会进行快照。RDB是redis默认采用的持久化方式,在配置文件中已经预置了3个条件:
save 900 1 save 300 10 save 60 10000
save 900 1
表示在900秒内,有1个key改变了,就执行RDB持久化。save 300 10
表示在300秒内,有10个key改变了,就执行RDB持久化。save 60 10000
表示在60秒内,有10000个key改变了,就执行RDB持久化。由此可以看出,RDB 是在一个时间范围内触发事件才进行持久化,如果在未触发事件的情况下, Redis 宕机或者断电了,将会造成数据丢失。因此无法保证数据的绝对安全
RDB模式如何进行数据同步呢?
RDB模式下,首先清除原来rdb文件中的所有内容,清空内容之后,再将内存所有的数据同步到rdb文件中, 但这可能造成所有的数据都丢失
RDB模式缺点很明显,为什么还要使用RDB呢?
因为在生产环境中,Redis服务器是不可能一直运行,那么就可能出现宕机、重启等情况。这时就需要做数据同步,数据同步时,主要考虑的点是恢复速度。而 RDB 保存数据采用的二进制,在数据恢复的时候,识别速度快,这也是为什么要使用RDB的原因。
-
AOF
AOF持久化文件存储的是一个文本文件,速度相对 RDB 较慢,到了后期文件会比较大,传输较为困难。开启AOF持久化后每执行一条会更改Redis中的数据的命令,Redis就会将该命令写入硬盘中的AOF文件。在 Redis 中有 AOF 的配置项:
appendonly yes # 开启AOF持久化 appendfilename "appendonly.aof" # AOF持久化文件 #appendfsync always appendfsync everysec #appendfsync no
appendfsync always
表示每执行一个写操作,立即持久化到AOF文件中,性能比较低。
appendfsync everysec
表示每秒执行一次持久化,在开发时,综合考虑使用这种方案。
appendfsync no
表示不同步,数据只保存在内存中,操作系统需要时刷新数据即可。aof-use-rdb-preamble no # 关闭混合持久化
AOF 保存文件时,如果执行相同的命令,AOF文件会记录多次,并没有优化,如果需要优化,则需要手动使用
bgrewriteaof
命令来优化。但手动重写很显然不能满足生产环境的需要,因此,redis提供了如下配置来实现:auto-aof-rewrite-percentage 100 # 触发的条件:下面设置的文件大小增加100%,也就是1倍,才会重写 auto-aof-rewrite-min-size 64mb # aof文件大小达到64M才可能重写记录的命令
由此可以看出,AOF 相对于 RDB 来说更为安全。因此,官方推荐同时开启 RDB 和 AOF。
思考:Redis 同时开启 RDB 和 AOF, 它们之间如何协作的?
aof-use-rdb-preamble yes # 开启混合持久化,需要开启aof才有效
混合持久化实际上就是每一次aof被优化的时候,都会将原来的 aof 变成 rdb,再向aof中写内容的时候 依然是 aof,这样在同一个持久化文件中 既有 aof,又有 rdb,这就是混合持久化
7. Redis数据淘汰策略
Redis服务器的内存是有限的,硬件一旦确定了,内存大小也就确定了。当不停的向内存中写数据时,就会出现内存写满的情况,此时,就需要使用到Redis的数据淘汰策略了。Redis中设计了多种数据的淘汰策略,详情如下:
volatile主要针对的是设置了过期时间的key,如果需要淘汰redis中的数据,那么这些设置了过期时间的key优先被淘汰,如果空间依然不足,那么才会在没有设置过期时间的key进行淘汰。
allkeys主要针对的是所有的键,不管有没有设置过期时间
LFU:根据访问频率来决定淘汰哪一个key,访问频率指的是在一定时间范围内的访问次数。
# 优先淘汰掉设置了过期时间的key,然后才淘汰掉使用的比较少的key,假设key没有设置过期时间,那么不会优先淘
# 汰,这种模式也是在开发中使用的比较多的一种缓存策略模式
#volatile-lru -> Evict using approximated LRU among the keys with an expire set.
# 对所有key通用,优先删除最近最少使用的Key
# allkeys-lru -> Evict any key using approximated LRU.
# Redis中存储的每一个key都有一个内部时钟,当key使用频率高时,内部时钟会递增,当key使用频率低时,内部时钟会
# 递减,该策略是淘汰设置了过期时间且内部时钟最小的key
# volatile-lfu -> Evict using approximated LFU among the keys with an expire set.
# Redis中存储的每一个key都有一个内部时钟,当key使用频率高时,内部时钟会递增,当key使用频率低时,内部时钟会
# 递减,该策略是淘汰所有key中内部时钟最小的key
# allkeys-lfu -> Evict any key using approximated LFU.
# 随机淘汰具有过期时间的key
# volatile-random -> Remove a random key among the ones with an expire set.
# 淘汰的是随机的key
# allkeys-random -> Remove a random key, any key.
# 根据key剩余的过期时间(ttl值)来进行数据淘汰, ttl值越小,越先被淘汰
# volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
# 只要缓存满了,就不继续服务器里面的写请求,读请求是可以完成的,这种模式缓存里面的所有数据都不会丢失,但会导致
# 参与Redis的业务会失败
# noeviction -> Don't evict anything, just return an error on write operations.
maxmemory-policy volatile-lru #这个就是配置缓存的淘汰策略的
maxmemory <bytes> #这个是配置Redis的缓存的大小
第五节 Redis 集群
1. 主从复制
1.1 主从复制介绍
主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(master),后者称为从节点(slave);数据的复制是单向的,只能由主节点到从节点。
1.2 主从复制原理
- 从服务器连接主服务器,发送SYNC命令;
- 主服务器接收到SYNC命名后,开始执行BGSAVE命令生成RDB文件并使用缓冲区记录此后执行的所有写命令;
- 主服务器BGSAVE执行完后,向所有从服务器发送快照文件,并在发送期间继续记录被执行的写命令;
- 从服务器收到快照文件后丢弃所有旧数据,载入收到的快照;
- 主服务器快照发送完毕后开始向从服务器发送缓冲区中的写命令;
- 从服务器完成对快照的载入,开始接收命令请求,并执行来自主服务器缓冲区的写命令;(从服务器初始化完成)
- 主服务器每执行一个写命令就会向从服务器发送相同的写命令,从服务器接收并执行收到的写命令(从服务器初始化完成后的操作)
1.3 主从复制优缺点
-
优点
-
支持主从复制,主机会自动将数据同步到从机,可以进行读写分离
-
为了分载Master的读操作压力,Slave服务器可以为客户端提供只读操作的服务,写服务仍然必须由Master来完成
-
Slave同样可以接受其它Slave的连接和同步请求,这样可以有效的分载Master的同步压力。
-
Master Server是以非阻塞的方式为Slaves提供服务。所以在Master-Slave同步期间,客户端仍然可以提交查询或修改请求。
-
Slave Server同样是以非阻塞的方式完成数据同步。在同步期间,如果有客户端提交查询请求,Redis则返回同步之前的数据
-
主从复制是高可用的基石,除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。
-
-
缺点
- Redis不具备自动容错和恢复功能,主机从机的宕机都会导致前端部分读写请求失败,需要等待机器重启或者手动切换前端的IP才能恢复。
- 主机宕机,宕机前有部分数据未能及时同步到从机,切换IP后还会引入数据不一致的问题,降低了系统的可用性。
- Redis较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。
1.2 主从复制搭建
集群搭建不能将Redis存放在C盘,因为C盘创建文件需要权限。如果已经安装在C盘,那么只需要将整个Redis文件拷贝至其他盘即可。
-
编辑redis.windows.conf文件,将其中的bind 121.199.174.183 修改为 bind 本机IP地址。例如:
如果是每个redis服务器都是独立的文件夹,那么appendfilename属性可以不用设置。
bind 192.168.2.7 #这个是本机IP appendfilename "appendonly6379.aof"
-
拷贝redis.windows.conf文件三分,分别重命名为 redis.master6379.conf、redis.slave6380.conf、redis.slave6381.conf
-
修改 redis.slave6380.conf 文件,修改内容如下:
port 6380 slaveof 192.168.2.7 6379 masterauth 123456 #如果master有设置密码,必须添加到这里。没有设置,可以不用配置这项 appendfilename "appendonly6380.aof"
-
修改 redis.slave6381.conf 文件,修改内容如下:
port 6381 slaveof 192.168.2.7 6379 masterauth 123456 #如果master有设置密码,必须添加到这里。没有设置,可以不用配置这项 appendfilename "appendonly6381.aof"
-
使用这三个配置文件分别启动Redis,主从复制搭建完成
redis-server.exe redis.master6379.conf redis-server.exe redis.slave6380.conf redis-server.exe redis.slave6381.conf
1.3 Jedis 连接 Redis 主从复制集群
redis.clients.jedis.exceptions.JedisDataException: NOAUTH Authentication required.
出现这个错误表示没有设置jedis连接的密码
@Test
public void jedisMasterSlaveTest(){
Jedis master = new Jedis("192.168.2.7", 6379);
Jedis slave1 = new Jedis("192.168.2.7", 6380);
Jedis slave2 = new Jedis("192.168.2.7", 6381);
slave1.slaveof("192.168.2.7", 6379);
slave2.slaveof("192.168.2.7", 6379);
master.set("name", "张三");
System.out.println(slave1.get("name"));
System.out.println(slave2.get("name"));
}
2. 哨兵
2.1 哨兵模式介绍
哨兵模式是建立在主从复制的基础上,当主服务器中断服务后,可以将一个从服务器升级为主服务器,以便继续提供服务,在低版本中,这个过程需要人工手动来操作。 在Redis 2.8及以后的版本中,提供了哨兵工具来实现自动化的系统监控和故障恢复功能。哨兵的作用就是监控Redis系统的运行状况。它的功能包括以下两个:
- 监控主服务器和从服务器是否正常运行
- 主服务器出现故障时自动将从服务器转换为主服务器
2.2 哨兵的工作方式
- 每个Sentinel(哨兵)进程以每秒钟一次的频率向整个集群中的Master主服务器,Slave从服务器以及其他Sentinel(哨兵)进程发送一个 PING 命令。
- 如果一个实例(instance)距离最后一次有效回复 PING 命令的时间超过 down-after-milliseconds 选项所指定的值, 则这个实例会被 Sentinel(哨兵)进程标记为主观下线(SDOWN)
- 如果一个Master主服务器被标记为主观下线(SDOWN),则正在监视这个Master主服务器的所有 Sentinel(哨兵)进程要以每秒一次的频率确认Master主服务器的确进入了主观下线状态
- 当有足够数量的 Sentinel(哨兵)进程(大于等于配置文件指定的值)在指定的时间范围内确认Master主服务器进入了主观下线状态(SDOWN), 则Master主服务器会被标记为客观下线(ODOWN)
- 在一般情况下, 每个 Sentinel(哨兵)进程会以每 10 秒一次的频率向集群中的所有Master主服务器、Slave从服务器发送 INFO 命令。
- 当Master主服务器被 Sentinel(哨兵)进程标记为客观下线(ODOWN)时,Sentinel(哨兵)进程向下线的 Master主服务器的所有 Slave从服务器发送 INFO 命令的频率会从 10 秒一次改为每秒一次。
- 若没有足够数量的 Sentinel(哨兵)进程同意 Master主服务器下线, Master主服务器的客观下线状态就会被移除。若 Master主服务器重新向 Sentinel(哨兵)进程发送 PING 命令返回有效回复,Master主服务器的主观下线状态就会被移除。
- 若有足够数量的 Sentinel(哨兵)进程同意 Master主服务器下线,监视该服务的所有sentinel(哨兵)将进行协商,并选出一个领头的sentinel(哨兵),然后对Master 主服务器进行故障转移操作:
- 从所有Slave从服务器里面挑选一个Slave从服务器,将其转成主服务器,领头Sentinel(哨兵)向被选出从服务器(新主)发送SLAVEOF no one命令,以每秒一次的频率向被升级的从服务器发送INFO命令,观察回复的角色信息,直到Slave从服务器变成Master主服务器。
- 之前下线的Master主服务器下的所有Slave从服务器改为复制新的Master主服务器
- 将之前下线的Master主服务器改为新的Master主服务器下的Slave从服务器
2.3 哨兵模式的优缺点
-
优点
- 哨兵模式是基于主从模式的,所有主从的优点,哨兵模式都具有。
- 主从可以自动切换,系统更健壮,可用性更高。
-
缺点
- Redis较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。
2.4 哨兵模式搭建
准备7台 Redis 服务器: 1 主 3 从 3 哨兵
主服务器Master 配置:
# 换成电脑的IP
bind 192.168.2.7
port 6400
requirepass 123456
从服务器Slave配置:(服务器的端口不能相同)
# 换成电脑的IP
bind 192.168.2.7
port 6401
slaveof 192.168.2.7 6400
masterauth 123456
requirepass 123456
哨兵Sentinel配置:
bind 192.168.2.7
port 6403
# Sentinel去监视一个名为master的主redis实例,
# 这个主实例的IP地址为本机地址192.168.2.7,端口号为6400,
# 而将这个主实例判断为失效至少需要2个Sentinel进程的同意,只要同意Sentinel的数量不达标,自动failover就不会执行
sentinel monitor master 192.168.2.7 6400 2
# down-after-milliseconds指定了Sentinel认为Redis实例已经失效所需的毫秒数。
# 当实例超过该时间没有返回PING,或者直接返回错误,那么Sentinel将这个实例标记为主观下线。
# 只有一个Sentinel进程将实例标记为主观下线并不一定会引起实例的自动故障迁移:只有在足够数量的Sentinel都将一
# 个实例标记为主观下线之后,实例才会被标记为客观下线。这时自动故障迁移才会执行
sentinel down-after-milliseconds master 5000
# parallel-syncs指定了在执行故障转移时,最多可以有多少个从Redis实例在同步新的主实例,
# 在从Redis实例较多的情况下这个数字越小,同步的时间越长,完成故障转移所需的时间就越长
sentinel parallel-syncs master 1
# 如果在failover-timeout该时间(ms)内未能完成故障迁移(failover)操作,则认为该failover失败
sentinel failover-timeout master 15000
sentinel auth-pass master 123456
注意:哨兵设置密码后,创建连接池的时候需要指定哨兵的密码。如果没有设置,那么创建连接池的时候可以不需要使用密码。但是spring-boot对redis不支持哨兵设置密码
启动顺序
-
Master
# 示例 redis-server.exe sentinel/redis.master6400.conf
-
Slave
# 示例 redis-server.exe sentinel/redis.slave6401.conf
-
Sentinel
Sentinel 启动时使用命令如下
redis-server.exe 哨兵配置文件路径 --sentinel # 示例 redis-server.exe sentinel/redis.sentinel6404.conf --sentinel
2.5 Jedis 连接 Redis 哨兵
@Test
public void redisSentinelTest(){
Set<String> sentinels = new HashSet<>();
//只需要配置哨兵的服务器信息即可
sentinels.add(new HostAndPort("10.7.173.76", 6404).toString());
sentinels.add(new HostAndPort("10.7.173.76", 6405).toString());
sentinels.add(new HostAndPort("10.7.173.76", 6406).toString());
JedisSentinelPool pool = new JedisSentinelPool("master", sentinels, "123456");
Jedis jedis = pool.getResource();
jedis.select(1);
String result = jedis.set("success", "OK");
System.out.println(result);
jedis.close();
pool.close();
}
2.6 springboot 整合 Redis 哨兵
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
spring:
redis:
sentinel:
master: master
nodes:
- 10.7.173.76:6404
- 10.7.173.76:6405
- 10.7.173.76:6406
jedis:
pool:
max-active: 100
max-idle: 10
min-idle: 3
max-wait: -1
password: 123456
package com.qf.spring.boot.redis;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfiguration {
@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){
RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(factory);
StringRedisSerializer keySerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(keySerializer);
redisTemplate.setHashKeySerializer(keySerializer);
Jackson2JsonRedisSerializer<?> valueSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
valueSerializer.setObjectMapper(mapper);
redisTemplate.setValueSerializer(valueSerializer);
redisTemplate.setHashValueSerializer(valueSerializer);
return redisTemplate;
}
}
package com.qf.spring.boot.redis;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
@SpringBootTest
class SpringbootRedisApplicationTests {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@Test
void redisTemplateTest() {
redisTemplate.opsForValue().set("springBoot", "success");
}
}
3. Redis Cluster
3.1 主从复制的弊端
假设master节点的内存只有4G,那slave节点所能存储的数据上限也只能是4G。主从复制架构中是读写分离的,可以通过增加slave节点来扩展主从的读并发能力,但是写能力和存储能力是无法进行扩展的,就只能是master节点能够承载的上限。所以,当你只需要存储4G的数据时候的,基于主从复制和基于Sentinel的高可用架构是完全够用的。
但是如果面临的是海量的数据的时候呢?16G、64G、256G甚至1T呢?现在互联网的业务里面,如果体量足够大,肯定会面临缓存海量缓存数据的场景的。
这就是为什么需要引入Redis Cluster。
3.2 Redis Cluster介绍
redis从3.0开始支持集群功能。redis集群采用无中心节点方式实现,无需proxy代理,客户端直接与redis集群的每个节点连接,根据同样的hash算法计算出key对应的slot,然后直接在slot对应的redis节点上执行命令。在redis看来,响应时间是最苛刻的条件,增加一层带来的开销是redis不能接受的。因此,redis实现了客户端对节点的直接访问,为了去中心化,节点之间通过gossip协议交换互相的状态,以及探测新加入的节点信息。redis集群支持动态加入节点,动态迁移slot,以及自动故障转移。
3.3 Redis Cluster 架构
n个主从架构组合在一起就是Redis Cluster。Redis Cluster要求至少需要3个master才能组成一个集群,同时每个master至少需要有一个slave节点。
如果一个主从能够存储32G的数据,如果这个集群包含了两个主从,则整个集群就能够存储64G的数据。
主从架构中,可以通过增加slave节点的方式来扩展读请求的并发量,那Redis Cluster中是如何做的呢?虽然每个master下都挂载了一个slave节点,但是在Redis Cluster中的读、写请求其实都是在master上完成的。
slave节点只是充当了一个数据备份的角色,当master发生了宕机,就会将对应的slave节点提拔为master,来重新对外提供服务。
3.4 数据分片
Redis 集群使用数据分片(sharding)而非一致性哈希(consistency hashing)来实现: 一个 Redis 集群包含 16384 个哈希槽(hash slot), 数据库中的每个键都属于这 16384 个哈希槽的其中一个, 集群使用公式 CRC16(key) % 16383 来计算键 key 属于哪个槽, 其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和 。集群中的每个节点负责处理一部分哈希槽。 举个例子, 一个集群可以有三个节点, 其中:
- 节点 A 负责处理 0 号至 5500 号哈希槽。
- 节点 B 负责处理 5501 号至 11000 号哈希槽。
- 节点 C 负责处理 11001 号至 16384 号哈希槽。
这种将哈希槽分布到不同节点的做法使得用户可以很容易地向集群中添加或者删除节点。 比如说:
- 如果用户将新节点 D 添加到集群中, 那么集群只需要将节点 A 、B 、 C 中的某些槽移动到节点 D 就可以了。
- 如果用户要从集群中移除节点 A , 那么集群只需要将节点 A 中的所有哈希槽移动到节点 B 和节点 C , 然后再移除空白(不包含任何哈希槽)的节点 A 就可以了。
因为将一个哈希槽从一个节点移动到另一个节点不会造成节点阻塞, 所以无论是添加新节点还是移除已存在节点, 又或者改变某个节点包含的哈希槽数量, 都不会造成集群下线。
3.5 故障转移
在一部分节点下线或者无法与集群的大多数(majority)节点进行通讯的情况下, 为了使得集群仍然可以正常运作, Redis 集群对节点使用了主从复制功能: 集群中的每个节点都有 1 个至 N 个复制品(replica), 其中一个复制品为主节点(master), 而其余的 N-1 个复制品为从节点(slave)。
集群主节点出现故障,发生故障转移,其他主节点会把故障主节点的从节点自动提为主节点,原来的主节点恢复后,自动成为新主节点的从节点。
3.6 数据访问
redis cluster是一个去中心化的集群,每个节点都会跟其他节点保持连接,用来交换彼此的信息。节点组成集群的方式使用cluster meet命令,meet命令可以让两个节点相互握手,然后通过gossip协议交换信息。每个节点会保存一份数据分布表,节点会将自己的slot信息发送给其他节点。
客户端在初始化的时候只需要知道一个节点的地址即可,客户端会先尝试向这个节点执行命令,比如“get key”,如果key所在的slot刚好在该节点上,则能够直接执行成功。如果slot不在该节点,则节点会返回MOVED错误,同时把该slot对应的节点告诉客户端。客户端可以去该节点执行命令。
错误信息: 在访问集群的时候,节点可能会返回ASK错误。这种错误是在key对应的slot正在进行数据迁移时产生的,这时候向slot的原节点访问,如果key在迁移源节点上,则该次命令能直接执行。如果key不在迁移源节点上,则会返回ASK错误,描述信息会附上迁移目的节点的地址。客户端这时候要先向迁移目的节点发送ASKING命令,然后执行之前的命令。
3.7 Redis 安装
-
将 reids 源码通过 Xftp 放入
/user/local
文件夹下 -
进入
/user/local
解压源码 -
升级 C ++ 编译器
yum -y install centos-release-scl yum -y install devtoolset-9-gcc devtoolset-9-gcc-c++ devtoolset-9-binutils scl enable devtoolset-9 bash
-
进入
redis-5.0.14
编译源码make
-
安装 redis
# PREFIX= 这个关键字的作用是编译的时候用于指定程序存放的路径 make PREFIX=/usr/local/redis-5.0.14 install
-
启动 redis
./bin/redis-server& ./redis.conf
-
启动 redis-cli
./bin/redis-cli shutdown #关闭redis
3.8 Redis Cluster 搭建
-
/usr下创建redis-cluster目录,其下创建7001、7002、…、7006目录
-
修改配置文件redis.conf
daemonize yes port 7001 #分别对每个机器的端口号进行设置 dir /usr/redis-cluster/7001/ #指定数据文件存放位置,必须要指定不同的目录位置,不然会丢失数据 cluster-enabled yes #启动集群模式 cluster-config-file nodes-7001.conf #集群节点信息文件,这里 700* 和port对应上 cluster-node-timeout 5000 # bind 121.199.174.183 #去掉bind绑定访问ip信息 protected-mode no #关闭保护模式 appendonly yes #如果要设置密码需要增加如下配置: requirepass 123456 #设置redis访问密码 masterauth 123456 #设置集群节点间访问密码,跟上面一致
-
拷贝配置文件redis.conf至创建的这些目录中,然后修改端口
-
启动每个结点redis服务
/usr/redis-5.0.14/bin/redis-server /usr/redis-cluster/7001/redis.conf /usr/redis-5.0.14/bin/redis-server /usr/redis-cluster/7002/redis.conf /usr/redis-5.0.14/bin/redis-server /usr/redis-cluster/7003/redis.conf /usr/redis-5.0.14/bin/redis-server /usr/redis-cluster/7004/redis.conf /usr/redis-5.0.14/bin/redis-server /usr/redis-cluster/7005/redis.conf /usr/redis-5.0.14/bin/redis-server /usr/redis-cluster/7006/redis.conf
-
查看启动
ps -ef |grep redis
-
云服务器打开端口
7001 ~ 7006 17001 ~ 17006 #上面的端口全部打开
-
执行创建集群命令
#代表为每个创建的主服务器节点创建一个从服务器节点 -a表示访问密码 设置了密码需要使用-a /usr/redis-5.0.3/src/redis-cli -a 123456 --cluster create --cluster-replicas 1 121.199.174.183:7001 121.199.174.183:7002 121.199.174.183:7003 121.199.174.183:7004 121.199.174.183:7005 121.199.174.183:7006 # 未设置密码 /usr/redis-5.0.14/src/redis-cli --cluster create --cluster-replicas 1 121.199.174.183:7001 121.199.174.183:7002 121.199.174.183:7003 121.199.174.183:7004 121.199.174.183:7005 121.199.174.183:7006
-
查询集群信息
/usr/redis-5.0.14/src/redis-cli -c -h 121.199.174.183 -p 7001 # -c表示以集群方式连接redis # -h指定ip地址 # -p指定端口号
-
关闭集群
/usr/redis-5.0.14/src/redis-cli -c -h 121.199.174.183 -p 7001 shutdown /usr/redis-5.0.14/src/redis-cli -c -h 121.199.174.183 -p 7002 shutdown /usr/redis-5.0.14/src/redis-cli -c -h 121.199.174.183 -p 7003 shutdown /usr/redis-5.0.14/src/redis-cli -c -h 121.199.174.183 -p 7004 shutdown /usr/redis-5.0.14/src/redis-cli -c -h 121.199.174.183 -p 7005 shutdown # 这个是设置了密码 关闭使用的命令 /usr/redis-5.0.14/src/redis-cli -a 123456 -c -h 121.199.174.183 -p 7006 shutdown
3.9 jedis 操作 Redis Cluster
//创建一连接,JedisCluster对象,在系统中是单例存在
Set<HostAndPort> nodes = new HashSet<>();
nodes.add(new HostAndPort("121.199.174.183", 7001));
nodes.add(new HostAndPort("121.199.174.183", 7002));
nodes.add(new HostAndPort("121.199.174.183", 7003));
nodes.add(new HostAndPort("121.199.174.183", 7004));
nodes.add(new HostAndPort("121.199.174.183", 7005));
nodes.add(new HostAndPort("121.199.174.183", 7006));
JedisCluster cluster = new JedisCluster(nodes);
//执行JedisCluster对象中的方法,方法和redis一一对应。
cluster.set("cdqf2107", "快毕业了");
String result = cluster.get("cluster-test");
System.out.println(result);
//程序结束时需要关闭JedisCluster对象
cluster.close();
redis使用最多的场景就是缓存,请解释说明什么是缓存击穿?什么是缓存穿透?什么是缓存雪崩?
缓存击穿说白了就是将缓存给打穿了,缓存没有起到缓冲的作用。
所谓的缓存穿透值指的是缓存中没有查询的数据,数据库中也没有查询的数据。
大面积的缓存击穿称为缓存雪崩。
什么情况下会出现缓存击穿?
在key过期的时候,缓存处于失效状态,查询将直接进入数据库
什么情况下会出现缓存穿透?
缓存中没有查询的数据,数据库中也没有查询的数据
什么情况下会出现缓存雪崩?
在同一时间点,如果出现大量的抗住高并发的key过期
缓存击穿解决方案
给查询加上一个互斥锁,只允许一个线程去查询数据,其他线程等待
缓存穿透解决方案
即使查询没有结果,也要在缓存中缓存一个null值或者“”值。
缓存雪崩解决方案
在缓存的时候,给key添加一个随机数,让key的过期时间不相同
你使用过分布式锁吗?
用过。
你怎么实现的分布式锁?
我们采用的是redis来实现分布式锁。
能具体的描述一下吗?(如果没有问,就在上一个问题接着说)
redis 分布式锁的实现原理是利用key的特性,如果一个key不存在,我们就可以在redis中进行添加操作,同时为这个key设置一个过期时间,如果添加成功,则说明拿到了分布式锁;否则,获取锁失败,那么将在一段时间范围内反复尝试获取分布式锁。拿到锁后,开始执行业务,业务执行完成后,删除redis中存储的key,分布式锁得到释放。
如果业务处理时间过长,分布式锁已经过期,后面再删除的时候就会出问题,你是如何解决的呢?
对于业务处理时间过长这个问题,可以是Redission实现的看门狗来对分布式锁进行过期时间的延长,看门狗的实现原理是使用计时器延迟调度任务实现的,主要用的就是Timer和TimerTask。对于删除出现的问题,可以在加锁的时候设置的值使用当前线程的ID,在删除的时候,先检测这个分布式锁是否是当前线程的分布式锁,如果是,则可以删除;如果不是,则直接放弃。
判断和删除是两个不同的操作,如果保证他们都能得到执行?
这个可以使用 lua 脚本来实现原子性,在脚本中进行判断和删除
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1])
else
return 0
end
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";// lua脚本,用来释放分布式锁
jedis.eval(luaScript,Collections.singletonList(key),Collections.singletonList(lockValue));
jedis.close();
-- 续约过期时间
if redis.call('get',KEYS[1]) == ARGV[1] then redis.call('pexpire', KEYS[1], ARGV[2])
return 1
else
return 0
end
原文地址:https://blog.csdn.net/weixin_45621552/article/details/140486932
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!