学习 Redis 的过程中遇到了一些值得记录的东西,所以就写下来
Redis 缓存刷新 Token
用户在登录后需要根据Token来进行鉴权和用户信息,如果用户一直在活跃则需要 Redis 来做缓存(仅想代替 Session 的情况下,如果七天过期等长期有效则不需要),但是缓存需要设置过期时间,所以需要在用户进行请求的时候顺带刷新用户的token。
我们可以设置一个 RefreshTokenInterceptor 并把其优先级设高一点,来给用户刷新 Token ,这样每次用户请求的时候都能够刷新用户的过期时间。
Spring 集成了 Redis 的工具,所以我们可导包 spring-boot-starter-data-redis
后,在 Config 下注入 StringRedisTemplate( Interceptor 无法直接使用 @AutoWired )传入 Interceptor 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Configuration public class MvcConfig implements WebMvcConfigurer {
@Autowired private StringRedisTemplate stringRedisTemplate;
@Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).order(0); registry.addInterceptor(new LoginInterceptor()) .excludePathPatterns( ) .order(1); } }
|
然后编写 RefreshTokenInterceptor (使用了 Hotool 工具包)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Slf4j public class RefreshTokenInterceptor implements HandlerInterceptor {
private final StringRedisTemplate stringRedisTemplate;
public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate){ this.stringRedisTemplate = stringRedisTemplate; }
@Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { log.info("请求的地址:{}", request.getRequestURI());
String token = request.getHeader("authorization"); if (StrUtil.isBlank(token)) { return true; } Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(LOGIN_USER_KEY + token);
if (userMap.isEmpty()) { return true; } UserDTO user = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false); UserHolder.saveUser(user); stringRedisTemplate.expire(LOGIN_USER_KEY + token, LOGIN_USER_TTL, TimeUnit.MINUTES);
return true; } }
|
这样就完成了对 Token 的刷新。
缓存的更新策略
一些数据在更改后,如果没有及时更新缓存,则会导致数据不一致,所以我们需要一个更新策略,让其数据保持一致性。
如果在数据要求不是强一致的情况下,我们可以靠过期时间让其失效后,当有请求再次访问时自动进行更新。
如果对一致性有要求,我们则可以通过 update 方法中手动重新设置缓存数据。
但是这样同样也会出现一个问题——如果数据经常更新但是并没有用户访问,则不停的更新缓存会对Redis有很多无效的 IO 操作。
所以相比较于更新缓存,直接删除缓存数据,等有用户访问的时候再次写入缓存是在此情况下更好的策略。
1 2 3 4 5 6 7 8 9 10 11 12
| @Transactional public Result update(Shop shop) { updateById(shop); Long id = shop.getId(); if (id == null) { return Result.fail("店铺ID不能为空"); } stringRedisTemplate.delete(CACHE_SHOP_KEY + id); return Result.ok(); }
|
但同样要注意,在以上策略下都会有一定的缓存击穿的风险,所以需要视业务情况而定。
缓存穿透问题
缓存穿透的问题出现于查询数据库中没有的数据,数据库没有此数据,自然不会有缓存,所以缓存穿透是绕过缓存直接将请求打到数据库中。如果有人进行恶意攻击,则很容易让数据库宕机。
以下有几种解决方法:
- 将空数据也写入缓存
将空字段写入缓存中并设置较短的过期时间,相同的请求不会直接请求到数据库上。不过这个方法缺点也很明显,只要有人使用不同字段恶意攻击,即可挤占内存,使得其他字段被自动清理。
- 使用布隆过滤器
布隆过滤器可以根据已添加的数据进行哈希运算,可以判断值是否可能存在与集合中,如果布隆过滤器返回 false 则此值一定不在集合中。我们可以使用 Redis 自带的布隆过滤器拦截大部分会导致缓存击穿的请求,少量布隆过滤器判断失误的请求可以将空数据写入缓存,或维护一个表来将布隆过滤器误判的值存入其中。
布隆过滤器的缺点是删除添加进过滤器的值很难删除,如果有一个值后续失效了,布隆过滤器会放行,可以通过维护一个失效的表来进行一定程度的解决。
- 在字段名字设置上进行一定处理使其符合一套自定规则
这个办法仅需要判断 ID 等是否符合规则,如果不符合规则拦截即可
- 频繁请求的IP封禁
以上方法可以相互配合使用,基本上可以解决绝大部分的缓存穿透情况。
附上一段解决缓存穿透示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public <R, ID> R getWithPassThrough( String keyPrefix,ID id, Class<R> type, Function<ID, R> dbFallback, Long expireTime, TimeUnit timeUnit) { String key = CACHE_SHOP_KEY + id; String json = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(json)) { return JSONUtil.toBean(json, type); } if (json != null) { return null; } R r = dbFallback.apply(id); if (r == null) { stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES); return null; } this.set(key, r, expireTime, timeUnit); return r; }
|
缓存雪崩问题
缓存雪崩是指大量的 Key 在同一时间或者相近时间失效,或者 Redis 服务器忽然出现问题,导致大量请求直接访问数据库。
缓存雪崩的问题一般出现在大量的 Key 设置在相邻或同一时间,在预热或设置过期时间的时候只需要加入随机数使其过期时间平均分布在不同时间段即可。
Redis 服务问题则可以使用 Redis 集群部署来保证服务的运行。
也可以通过服务降级,多级缓存等方式来解决。
缓存击穿问题
缓存击穿一般发生于热点 Key 过期时间到了,并发的大量请求直接到达数据库使其宕机。
对于此问题根据对数据一致性的需求采用不同的方式来应对。
- 加锁,如果没有获取到缓存,则线程开始获取锁,然后开始重建缓存数据,其他线程则等待重试。
- 逻辑过期
在字段中添加一个过期时间,当过期后获取锁后,开启一个异步线程去重建数据,然后再由异步线程释放锁。如果一致性要求不高可以返回过期数据。
以逻辑过期为示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public <R, ID> R queryWithLogicalExpire(String prefix, //前缀 ID id, Class<R> type, //类 Function<ID, R> dbFallback, //Lambda表达式 Long time, //过期时间 TimeUnit timeUnit) { String key = CACHE_SHOP_KEY + id; String json = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isBlank(json)) { return null; } RedisData redisData = JSONUtil.toBean(json, RedisData.class); R r = JSONUtil.toBean((JSONObject) redisData.getData(), type); LocalDateTime expireTime = redisData.getExpireTime(); if (expireTime.isAfter(LocalDateTime.now())) { return r; } String lockKey = LOCK_SHOP_KEY + id; boolean lock = tryLock(lockKey); if (BooleanUtil.isTrue(lock)) { json = stringRedisTemplate.opsForValue().get(key); redisData = JSONUtil.toBean(json, RedisData.class); LocalDateTime expireTime1 = redisData.getExpireTime(); if (expireTime1.isAfter(LocalDateTime.now())) { return JSONUtil.toBean((JSONObject) redisData.getData(), type); } CACHE_REBUILD_EXECUTOR.submit(() -> { try { R r1 = dbFallback.apply(id); this.setWithLogicalExpire(prefix, key, r1, time, timeUnit); } catch (Exception e) { throw new RuntimeException(e); } finally { unlock(lockKey); } }); } else { return r; } return r; }
public void setWithLogicalExpire(String prefix, String key, Object value, Long expireTime, TimeUnit timeUnit) { RedisData redisData = new RedisData(); redisData.setData(value); redisData.setExpireTime(LocalDateTime.now().plusSeconds(timeUnit.toSeconds(expireTime)));
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData)); }
private void unlock(String key) { stringRedisTemplate.delete(key); }
|
秒杀问题
在高并发的情况下,如果两个线程同时查询到数据库的相同字段,所以在做库存扣减的时候容易发生两个库存同时扣减,导致超卖。
在单体应用下,我们可以直接加锁来解决并发问题。但是在此之前,我们需要用Redis来生成一个全局订单ID,以防止重复重复订单ID。
全局ID生成器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Component public class RedisIdWorker { @Autowired private StringRedisTemplate stringRedisTemplate;
private static final long BEGIN_TIMESTAMP = 1671801060L;
private static final int COUNT_BITS = 32;
public long nextId(String keyPrefix) { LocalDateTime now = LocalDateTime.now(); long nowSecond = now.toEpochSecond(ZoneOffset.UTC); long timeStamp = nowSecond - BEGIN_TIMESTAMP; String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd")); long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date); return timeStamp << COUNT_BITS | count; } }
|
借助Redis生成全局ID,我们可以在分布式框架下生成一个唯一的Long类型的ID
使用Redis来实现互斥锁
既然是秒杀场景,那么锁是必然需要的,在分布式场景下单体应用本身的锁则会无效,所以我们需要Redis的字段来充当锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class SimpleRedisLock implements ILock {
private String name; private StringRedisTemplate stringRedisTemplate; private static final String KEY_PREFIX = "lock:"; private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; }
@Override public boolean tryLook(long timeoutSec) { String threadId = ID_PREFIX+Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); }
@Override public void unlock() { String threadId = ID_PREFIX+Thread.currentThread().getId(); String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); if (threadId.equals(id)){ stringRedisTemplate.delete(KEY_PREFIX + name); } } }
|
因为我们并没有锁续期的操作,所以为了防止其他实例误删,我们加上了UUID以及线程标识,在解锁的时候,我们需要判断是否是自己的锁
但是此时产生了一个新的问题,我们解锁的时候的操作并不是原子性的,如果在进入if后发生了阻塞,并且等待时间超过了自动释放时间,则也会将其他线程的锁释放。
Lua脚本实现释放锁
为了保证原子性,我们引入Lua脚本来执行锁的释放
1 2 3 4 5 6
|
if(redis.call('get',KEYS[1]) == ARGV[1]) then return redis.call('del',KEYS[1]) end return 0;
|
将Lua脚本放入resource目录中,然后再自定义锁中改造一下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static { UNLOCK_SCRIPT = new DefaultRedisScript<>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua")); UNLOCK_SCRIPT.setResultType(Long.class); }
@Override public void unlock() { stringRedisTemplate.execute( UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX+name), ID_PREFIX+Thread.currentThread().getId()); }
|
至此我们就保证了释放锁的原子性,实现了一个简单的分布式锁
不过这些并不需要我们完全手动实现,Redisson提供了许多的锁,包含了许多功能。例如:可重入锁,看门狗机制续期锁,Redis集群分布式锁等。
一人一单秒杀情景
在单体应用下,我们可以通过直接给方法上锁,但是同时要考虑事务,所以我们要将其中生成订单的业务抽离出来并加上@Transaction交由Spring管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| public Result seckillVoucher(Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始!"); } if (voucher.getEndTime().isBefore(LocalDateTime.now())){ return Result.fail("秒杀已经结束!"); } if (voucher.getStock() < 1) { return Result.fail("库存不足"); } Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()) { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } }
@Transactional public Result createVoucherOrder(Long voucherId) { Long userId = UserHolder.getUser().getId(); int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); if (count > 0){ return Result.fail("用户已经购买过一次!"); } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherId) .gt("stock",0) .update(); if (!success) { return Result.fail("库存不足!"); }
VoucherOrder voucherOrder = new VoucherOrder(); long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(); }
|
如果我们使用Redis充当分布式锁,则我们可以修改seckillVoucher方法使用Redission提供的锁来实现分布式的秒杀场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Autowired private RedissonClient redissonClient;
@Override public Result seckillVoucher(Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始!"); } if (voucher.getEndTime().isBefore(LocalDateTime.now())){ return Result.fail("秒杀已经结束!"); } if (voucher.getStock() < 1) { return Result.fail("库存不足"); } Long userId = UserHolder.getUser().getId();
RLock lock = redissonClient.getLock("lock:order:" + userId); //获取锁 boolean isLock = lock.tryLock(); // 判断是否获取锁成功 if (!isLock) { // 获取锁失败,返货错误或重试 return Result.fail("不允许重复下单"); } try { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { lock.unlock(); } }
|