Redis的学习笔记
Wucheng

学习 Redis 的过程中遇到了一些值得记录的东西,所以就写下来

Redis 缓存刷新 Token

用户在登录后需要根据Token来进行鉴权和用户信息,如果用户一直在活跃则需要 Redis 来做缓存(仅想代替 Session 的情况下,如果七天过期等长期有效则不需要),但是缓存需要设置过期时间,所以需要在用户进行请求的时候顺带刷新用户的token。

我们可以设置一个 RefreshTokenInterceptor 并把其优先级设高一点,来给用户刷新 Token ,这样每次用户请求的时候都能够刷新用户的过期时间。

Spring 集成了 Redis 的工具,所以我们可导包 spring-boot-starter-data-redis 后,在 Config 下注入 StringRedisTemplate( Interceptor 无法直接使用 @AutoWired )传入 Interceptor 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class MvcConfig implements WebMvcConfigurer {

@Autowired
private StringRedisTemplate stringRedisTemplate;

@Override
public void addInterceptors(InterceptorRegistry registry) {
// token刷新拦截器
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).order(0);
// 登录拦截器
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
// ...
)
.order(1);
}
}

然后编写 RefreshTokenInterceptor (使用了 Hotool 工具包)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
public class RefreshTokenInterceptor implements HandlerInterceptor {

private final StringRedisTemplate stringRedisTemplate;

public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate){
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
log.info("请求的地址:{}", request.getRequestURI());

// 获取请求头中的token
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)) {
return true;
}
// 基于token获取redis的用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(LOGIN_USER_KEY + token);

if (userMap.isEmpty()) {
return true;
}
// 将查询到的hash数据转为UserDTO对象
UserDTO user = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 保存在ThreadLocal
UserHolder.saveUser(user);
// 刷新token有效期
stringRedisTemplate.expire(LOGIN_USER_KEY + token, LOGIN_USER_TTL, TimeUnit.MINUTES);

return true;
}
}

这样就完成了对 Token 的刷新。

缓存的更新策略

一些数据在更改后,如果没有及时更新缓存,则会导致数据不一致,所以我们需要一个更新策略,让其数据保持一致性。

如果在数据要求不是强一致的情况下,我们可以靠过期时间让其失效后,当有请求再次访问时自动进行更新。

如果对一致性有要求,我们则可以通过 update 方法中手动重新设置缓存数据。

但是这样同样也会出现一个问题——如果数据经常更新但是并没有用户访问,则不停的更新缓存会对Redis有很多无效的 IO 操作。

所以相比较于更新缓存,直接删除缓存数据,等有用户访问的时候再次写入缓存是在此情况下更好的策略。

1
2
3
4
5
6
7
8
9
10
11
12
@Transactional
public Result update(Shop shop) {
// 更新数据库
updateById(shop);
Long id = shop.getId();
if (id == null) {
return Result.fail("店铺ID不能为空");
}
// 删除缓存
stringRedisTemplate.delete(CACHE_SHOP_KEY + id);
return Result.ok();
}

但同样要注意,在以上策略下都会有一定的缓存击穿的风险,所以需要视业务情况而定。

缓存穿透问题

缓存穿透的问题出现于查询数据库中没有的数据,数据库没有此数据,自然不会有缓存,所以缓存穿透是绕过缓存直接将请求打到数据库中。如果有人进行恶意攻击,则很容易让数据库宕机。

以下有几种解决方法:

  1. 将空数据也写入缓存
    将空字段写入缓存中并设置较短的过期时间,相同的请求不会直接请求到数据库上。不过这个方法缺点也很明显,只要有人使用不同字段恶意攻击,即可挤占内存,使得其他字段被自动清理。
  2. 使用布隆过滤器
    布隆过滤器可以根据已添加的数据进行哈希运算,可以判断值是否可能存在与集合中,如果布隆过滤器返回 false 则此值一定不在集合中。我们可以使用 Redis 自带的布隆过滤器拦截大部分会导致缓存击穿的请求,少量布隆过滤器判断失误的请求可以将空数据写入缓存,或维护一个表来将布隆过滤器误判的值存入其中。
    布隆过滤器的缺点是删除添加进过滤器的值很难删除,如果有一个值后续失效了,布隆过滤器会放行,可以通过维护一个失效的表来进行一定程度的解决。
  3. 在字段名字设置上进行一定处理使其符合一套自定规则
    这个办法仅需要判断 ID 等是否符合规则,如果不符合规则拦截即可
  4. 频繁请求的IP封禁

以上方法可以相互配合使用,基本上可以解决绝大部分的缓存穿透情况。

附上一段解决缓存穿透示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public <R, ID> R getWithPassThrough(
String keyPrefix,ID id, Class<R> type, Function<ID, R> dbFallback,
Long expireTime, TimeUnit timeUnit) {
String key = CACHE_SHOP_KEY + id;
// 从redis查询缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 判断是否存在
if (StrUtil.isNotBlank(json)) {
// 存在,直接返回
return JSONUtil.toBean(json, type);
}
// 如果json不为空的话则只剩下为空值的情况,即为json为""
if (json != null) {
return null;
}
// 缓存中不存在,传入lambda表达式,返回数据查询的结果对象
R r = dbFallback.apply(id);
// 数据库也不存在,返回错误
if (r == null) {
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}
// 存在,写入redis
this.set(key, r, expireTime, timeUnit);
// 返回
return r;
}

缓存雪崩问题

缓存雪崩是指大量的 Key 在同一时间或者相近时间失效,或者 Redis 服务器忽然出现问题,导致大量请求直接访问数据库。

缓存雪崩的问题一般出现在大量的 Key 设置在相邻或同一时间,在预热或设置过期时间的时候只需要加入随机数使其过期时间平均分布在不同时间段即可。

Redis 服务问题则可以使用 Redis 集群部署来保证服务的运行。

也可以通过服务降级,多级缓存等方式来解决。

缓存击穿问题

缓存击穿一般发生于热点 Key 过期时间到了,并发的大量请求直接到达数据库使其宕机。

对于此问题根据对数据一致性的需求采用不同的方式来应对。

  1. 加锁,如果没有获取到缓存,则线程开始获取锁,然后开始重建缓存数据,其他线程则等待重试。
  2. 逻辑过期
    在字段中添加一个过期时间,当过期后获取锁后,开启一个异步线程去重建数据,然后再由异步线程释放锁。如果一致性要求不高可以返回过期数据。

以逻辑过期为示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// 线程池
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

public <R, ID> R queryWithLogicalExpire(String prefix, //前缀
ID id, Class<R> type, //类
Function<ID, R> dbFallback, //Lambda表达式
Long time, //过期时间
TimeUnit timeUnit) {
String key = CACHE_SHOP_KEY + id;
// 从redis查询缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 判断是否存在,因为热点key一般会设置为永不过期,过期则说明redis中没有,要么redis挂了
if (StrUtil.isBlank(json)) {
return null;
}
// 缓存命中,反序列化对象(RedisData用来放逻辑过期的时间)
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
LocalDateTime expireTime = redisData.getExpireTime();
// 判断是否过期
if (expireTime.isAfter(LocalDateTime.now())) {
return r;
}
// 已过期,则缓存重建
// 获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean lock = tryLock(lockKey);
// 判断是否获取锁成功
if (BooleanUtil.isTrue(lock)) {
// 二次检查
json = stringRedisTemplate.opsForValue().get(key);
redisData = JSONUtil.toBean(json, RedisData.class);
LocalDateTime expireTime1 = redisData.getExpireTime();
if (expireTime1.isAfter(LocalDateTime.now())) {
return JSONUtil.toBean((JSONObject) redisData.getData(), type);
}
// 成功,开启异步线程实现缓存创建
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
// 传入的lambda表达式,返回查询结果
R r1 = dbFallback.apply(id);
// 设置逻辑过期,并释放锁
this.setWithLogicalExpire(prefix, key, r1, time, timeUnit);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//释放锁
unlock(lockKey);
}
});
} else {
// 失败,返回过期商铺信息
return r;
}
// 返回
return r;
}

//设置逻辑过期并释放锁
public void setWithLogicalExpire(String prefix,
String key,
Object value,
Long expireTime,
TimeUnit timeUnit) {
// 设置逻辑过期
RedisData redisData = new RedisData();
redisData.setData(value);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(timeUnit.toSeconds(expireTime)));

// 写入redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}

// 释放锁
private void unlock(String key) {
stringRedisTemplate.delete(key);
}

秒杀问题

在高并发的情况下,如果两个线程同时查询到数据库的相同字段,所以在做库存扣减的时候容易发生两个库存同时扣减,导致超卖。

在单体应用下,我们可以直接加锁来解决并发问题。但是在此之前,我们需要用Redis来生成一个全局订单ID,以防止重复重复订单ID。

全局ID生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Component
public class RedisIdWorker {
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 开始时间戳
*/
private static final long BEGIN_TIMESTAMP = 1671801060L;

/**
* 序列号的位数
*/
private static final int COUNT_BITS = 32;
/**
* 生成全局唯一id并且返回
* @param: keyPrefix
* @return long
*/
public long nextId(String keyPrefix) {
// 生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timeStamp = nowSecond - BEGIN_TIMESTAMP;
// 生成序列号
// 获取当前日期
String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
// 从Redis从获取今天的订单个数
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 拼接返回(通过位运算)
return timeStamp << COUNT_BITS | count;
}
}

借助Redis生成全局ID,我们可以在分布式框架下生成一个唯一的Long类型的ID

使用Redis来实现互斥锁

既然是秒杀场景,那么锁是必然需要的,在分布式场景下单体应用本身的锁则会无效,所以我们需要Redis的字段来充当锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class SimpleRedisLock implements ILock {

private String name;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public boolean tryLook(long timeoutSec) {
// 获取线程表示
String threadId = ID_PREFIX+Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
// 防止空指针
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
// 获取线程标识
String threadId = ID_PREFIX+Thread.currentThread().getId();
// 获取锁中的标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标识是否一致
if (threadId.equals(id)){
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}

因为我们并没有锁续期的操作,所以为了防止其他实例误删,我们加上了UUID以及线程标识,在解锁的时候,我们需要判断是否是自己的锁

但是此时产生了一个新的问题,我们解锁的时候的操作并不是原子性的,如果在进入if后发生了阻塞,并且等待时间超过了自动释放时间,则也会将其他线程的锁释放。

Lua脚本实现释放锁

为了保证原子性,我们引入Lua脚本来执行锁的释放

1
2
3
4
5
6
-- KEYS[1]表示输入的第一个参数 ARGV[1]表示输入的第一个值
-- 即 set key value这行命令中,传递的key为KEYS[1],value为ARGV[1]
if(redis.call('get',KEYS[1]) == ARGV[1]) then
return redis.call('del',KEYS[1])
end
return 0;

将Lua脚本放入resource目录中,然后再自定义锁中改造一下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}

@Override
public void unlock() {
//调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX+name), //传入类型必须为List
ID_PREFIX+Thread.currentThread().getId());
}

至此我们就保证了释放锁的原子性,实现了一个简单的分布式锁

不过这些并不需要我们完全手动实现,Redisson提供了许多的锁,包含了许多功能。例如:可重入锁,看门狗机制续期锁,Redis集群分布式锁等。

一人一单秒杀情景

在单体应用下,我们可以通过直接给方法上锁,但是同时要考虑事务,所以我们要将其中生成订单的业务抽离出来并加上@Transaction交由Spring管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
   public Result  seckillVoucher(Long voucherId) {
// 查询优惠券的信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 判断秒杀是在时间范围内
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始!");
}
if (voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束!");
}
// 判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足");
}
Long userId = UserHolder.getUser().getId();

// 因为事务交给spring管理,所以如果直接调用方法会导致事务失效。IOC的原理是使用代理对象,所以我们如果要使得事务有效需要先获取到对象的代理对象,再由代理对象调用方法。
// 获取代理对象(引入AspectJ后在启动类中添加注解@EnableAspectJAutoProxy(exposeProxy = true))
synchronized (userId.toString().intern()) {
// 获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}

@Transactional
public Result createVoucherOrder(Long voucherId) {
// 查询订单
Long userId = UserHolder.getUser().getId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 判断是否存在
if (count > 0){
return Result.fail("用户已经购买过一次!");
}
// 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock",0)
.update();
if (!success) {
return Result.fail("库存不足!");
}

// 生成订单
VoucherOrder voucherOrder = new VoucherOrder();
// 获取全局ID
long orderId = redisIdWorker.nextId("order");

voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok();
}

如果我们使用Redis充当分布式锁,则我们可以修改seckillVoucher方法使用Redission提供的锁来实现分布式的秒杀场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Autowired
private RedissonClient redissonClient;

@Override
public Result seckillVoucher(Long voucherId) {
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始!");
}
if (voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束!");
}
if (voucher.getStock() < 1) {
return Result.fail("库存不足");
}
Long userId = UserHolder.getUser().getId();

RLock lock = redissonClient.getLock("lock:order:" + userId);
//获取锁
boolean isLock = lock.tryLock();
// 判断是否获取锁成功
if (!isLock) {
// 获取锁失败,返货错误或重试
return Result.fail("不允许重复下单");
}

try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
lock.unlock();
}
}