缓存与分布式锁
哪些数据适合放入缓存
- 即时性、数据一致性要求不高的
- 访问量大且更新频率不高的数据
选择redis做为缓存中间件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
问题记录与分析
产生堆外内存溢出:OutOfDirectMemoryError
- springboot2.0 以后默认使用lettuce作为操作redis的客户端,它使用netty进行网络通信。
- lettuce的bug导致netty堆外内存溢出
解决方案: 切换到jedis(或者升级lettuce)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
高并发下缓存失效问题-缓存穿透
缓存穿透:
指查询一个一定不存在的数据,由于缓存是不命中,将要去查询数据库,但是数据库也没有该记录,我们将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义
风险:
利用不存在数据进行攻击,数据库瞬时压力增大,最终导致崩溃
解决:
null结果缓存,并加入短暂过期时间
高并发下缓存失效问题-缓存雪崩
缓存雪崩:
缓存雪崩是指我们设置缓存时key采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。
解决:
原有的失效时间基础上增加一个随机值,比如1-5min随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
高并发下缓存失效问题-缓存击穿
缓存击穿:
- 对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常”热点“的数据。
- 如果这个key在大量请求同时进来前正好失效,那么所有key的数据查询都落到db,我们称之为缓存击穿。
解决:
加锁,大量并发只让一个请求去查,其他请求等待,查到以后释放锁,其他请求获取到锁,先查缓存,就会有数据,不用去db。
加锁实践:
springboot所有的组件在容器中都是单例的,可以使用synchronized(this),JUC(Lock)等解决单体应用中的问题,但是分布式系统中,要想锁住所有数据,就必须使用分布式锁
通过分析 分布式锁必须保证加锁(占位+过期时间)和删除锁(判断+删除)的原子性。
加锁可以使用redis setnx ex命令来操作,但是删除锁的时候 ,要先判断再删除,想把这两步操作做成原则性的,需要采用redis+lusj脚本的方式来操作。
public Map<String, List<Catalog2Vo>> getCatalogJsonWithRedisLock() {
// 1. 占分布式锁
String uuid = UUID.randomUUID().toString();
// 设置锁和设置过期时间必须是原子性的 不能通过redis的两条命令设置,这里的命令等价于redis命令setnx ex
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
if (lock) {
log.info("获取分布式锁成功 ...");
Map<String, List<Catalog2Vo>> dataFromDb;
try {
dataFromDb = getDataFromDb();
} finally {
// 删除锁 必须判断是当前锁 再删除,所以,为了保证原子性操作 需要采取redis+Lua脚本 完成
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 执行脚本
redisTemplate.execute(new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList("lock"), uuid);
}
return dataFromDb;
} else {
// 加锁失败
log.info("加锁失败,获取分布式锁 等待重试");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return getCatalogJsonWithRedisLock();
}
}
Redisson
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上
导入依赖
<!--使用redisson做为分布式锁,分布式对象等功能框架 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.0</version>
</dependency>
配置单个redis
@Configuration
public class MyRedissonConfig {
@Bean(destroyMethod = "shutdown")
public RedissonClient redisson() throws IOException {
// 1. 创建配置
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
// 2. 根据配置创建出Redisson实例
return Redisson.create(config);
}
}
测试:
@ResponseBody
@GetMapping("/hello")
public String hello() {
// 1. 获取一把锁 ,只要锁的名字一样,就是同一把锁
RLock lock = redisson.getLock("my-lock");
// 2. 加锁
lock.lock(); // 阻塞式等待
// 锁的自动续期:如果业务超长,运行期间自动给锁续上新的30s 不用担心业务时间长,锁自动过期被删掉
// 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除
try {
System.out.println("加锁成功 执行业务 ..." + Thread.currentThread().getId());
Thread.sleep(30000);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 3. 解锁
System.out.println("释放锁 ..." + Thread.currentThread().getId());
lock.unlock();
}
return "hello";
}
lock.lock()
- 如果我们传递了时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
- 如果我们未指定锁的超时时间,就使用30*1000 (看门狗的默认时间:LockWatchdogTimeout),只要占锁成功,就会启动一个定时任务:重新给锁设置过期时间,新的过期时间就是看门狗的默认时间。这个定时任务执行间隔(internalLockLeaseTime)为: (看门狗时间/3)
源代码:
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
传递时间时执行的方法:执行lua脚本
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
定时任务来做续期
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
最佳实践是传时间,省去了整个续期的操作,给定合理的过期时间即可。
读写锁测试:
/**
* 测试读写锁 - 写
*/
@GetMapping("/write")
@ResponseBody
public String writeValue() {
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
RLock rLock = lock.writeLock();
String s = "";
try {
rLock.lock();
s = UUID.randomUUID().toString();
// 模拟业务时长
Thread.sleep(30000);
redisTemplate.opsForValue().set("writeValue", s);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return s;
}
/**
* 测试读写锁 - 读
* <p>
* 保证一定能读到最新的数据,修改期间,写锁是一个排他锁(互斥锁),读锁是一个共享锁
* 写锁没释放 读就必须等待
*/
@GetMapping("/read")
@ResponseBody
public String readValue() {
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
RLock rLock = lock.readLock();
rLock.lock();
String writeValue = "";
try {
writeValue = redisTemplate.opsForValue().get("writeValue");
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return writeValue;
}
总结:
- 读+读 :相当于无锁,并发读,只会在redis中记录好,所有当前的读锁,他们都会同时加锁成功
- 写+读 :等待写锁释放
- 写+写 :阻塞方式
- 读+写 :有读锁,写也需要等待
即只要有写的存在,都必须等待
信号量测试:
/**
* 测试信号量
* <p>
* 模拟车库停车
* 车位 3 测试的时候先在redis中先设置当前车位数 set park 3
*/
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
RSemaphore semaphore = redisson.getSemaphore("park");
// acquire()是阻塞的,当没有车位时会一直等到有释放时才返回
// 如果不想阻塞 可以使用 tryAcquire() 会返回一个布尔值
semaphore.acquire(); // 获取一个信号,获取一个值,即占一个车位
return "ok";
}
/**
* 出库
*/
@GetMapping("/go")
@ResponseBody
public String go() throws InterruptedException {
RSemaphore semaphore = redisson.getSemaphore("park");
semaphore.release(); // 释放一个信号,即空出一个车位
return "ok";
}
闭锁测试:
/**
* 测试闭锁
* <p>
* 模拟学校关闭大门 只要当5个班级人都走完了 才可以关闭大门
*/
@GetMapping("/lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.trySetCount(5);
door.await(); //等待闭锁都完成
return "放假喽!关大门了哈";
}
@GetMapping("/outDoor/{id}")
@ResponseBody
public String outDoor(@PathVariable("id") Long id) {
// 这里只是模拟 不用考虑真实场景
RCountDownLatch door = redisson.getCountDownLatch("door");
door.countDown(); // 计数减一
return id + "班的人都走完了";
}
缓存数据一致性
- 双写模式
- 失效模式
无论是双写模式还是失效模式,都会导致缓存的不一致问题。即多个实例同时更新会出事。怎么办?
- 如果是用户纬度数据(订单数据、用户数据),这种并发几率非常小,不用考虑这个问题,缓存数据加
上过期时间,每隔一段时间触发读的主动更新即可
-
如果是菜单,商品介绍等基础数据,也可以去使用canal订阅binlog的方式。
-
缓存数据+过期时间也足够解决大部分业务对于缓存的要求。
-
通过加锁保证并发读写,写写的时候按顺序排好队。读读无所谓。所以适合使用读写锁。(业务不关心
脏数据,允许临时脏数据可忽略);
总结:
- 我们能放入缓存的数据本就不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,保
证每天拿到当前最新数据即可。
-
我们不应该过度设计,增加系统的复杂性
-
遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。
整合springcache
整合springcache,简化缓存开发
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
指定使用redis来缓存
spring.cache.type=redis
spring提供的几个注解:
@Cacheable
: Triggers cache population:触发将数据保存到缓存的操作@CacheEvict
: Triggers cache eviction:触发将数据从缓存中删除的操作@CachePut
: Updates the cache without interfering with the method execution:不影响方法执行更新缓存@Caching
: Regroups multiple cache operations to be applied on a method:组合以上多个操作@CacheConfig
: Shares some common cache-related settings at class-level:在类级别共享缓存的相同配置
测试:
- 开启缓存功能:启动类上加上 @EnableCaching注解
- 只需要使用注解就能完成缓存
使用缓存后的默认行为:
- 如果缓存中有数据,则方法不会调用,即直接返回缓存中的数据
- key默认自动生成:缓存的名字::SimpleKey []
- 缓存的value值,默认使用jdk序列化机制,将序列化后的数据存到redis
- 默认ttl时间:-1,即用不过期
以上默认行为导致的结果与我们实际需求不同,所有我们可以自定义这些配置:
自定义:
- 指定生成的key : 通过key属性指定,接收一个spEL表达式
- 指定缓存数据的过期时间:配置文件中配置
- 将保存的value数据转为json格式
原理:CacheAutoConfiguration
-> RedisCacheConfiguration
-> 自动配置了RedisCacheManager
-> 初始化所有的缓存 -> 每个缓存决定使用什么配置 -> 如果redisCacheConfiguration
有就用已有的,没有就使用默认配置
所以,想要改缓存的配置,只需要给容器中注入一个 RedisCacheConfiguration
即可
就会应用到当前RedisCacheManager
管理的所有缓存分区中;
可以参考源码中默认配置来自己写一个RedisCacheConfiguration:
源码中的默认配置:
* <dd>{@link org.springframework.data.redis.serializer.StringRedisSerializer}</dd>
* <dt>value serializer</dt>
* <dd>{@link org.springframework.data.redis.serializer.JdkSerializationRedisSerializer}</dd>
从这注释中可以看出 k采用的是字符串序列化,v采用的是jdk序列化
自定义配置:
@EnableCaching
@Configuration
@EnableConfigurationProperties(CacheProperties.class)
public class MyRedisCacheConfig {
@Bean
RedisCacheConfiguration configuration(CacheProperties cacheProperties) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer()));
CacheProperties.Redis redisProperties = cacheProperties.getRedis();
if (redisProperties.getTimeToLive() != null) {
config = config.entryTtl(redisProperties.getTimeToLive());
}
if (redisProperties.getKeyPrefix() != null) {
config = config.prefixCacheNameWith(redisProperties.getKeyPrefix());
}
if (!redisProperties.isCacheNullValues()) {
config = config.disableCachingNullValues();
}
if (!redisProperties.isUseKeyPrefix()) {
config = config.disableKeyPrefix();
}
return config;
}
}
redis缓存的其他配置
spring.cache.type=redis
# 单位为毫秒
spring.cache.redis.time-to-live=3600000
# 缓存的key 加前缀, 可以用来区分redis中的值哪些是缓存用的数据
# 如果这里配置了前缀 就代替了默认前缀 之前的默认前缀 缓存名字::
spring.cache.redis.key-prefix=CHCHE_
# 是否使用配置的前缀
spring.cache.redis.use-key-prefix=true
# 是否缓存空值,开启 防止缓存穿透
spring.cache.redis.cache-null-values=true
缓存数据测试:
@Cacheable(value = {"category"}, key = "#root.method.name")
@Override
public List<CategoryEntity> getLevelOne() { ... }
删除缓存测试:
// 分类数据更新的时候 触发删除缓存 指定缓存分区 再指定key 注意这里的key 接收的是一个spEL表达式,如果是普通字符串 需要里面加单引号
@CacheEvict(value = "category", key = "'getLevelOne'")
如果一个要删除多个缓存,就可以使用@Caching,它可以组合其他注解
@Caching(evict = {
@CacheEvict(value = "category", key = "'getLevelOne'"),
@CacheEvict(value = "category", key = "'getCatalogJson'")
})
或者可以指定删除某个缓存分区下的所有缓存,这也是使用缓存分区的好处
@CacheEvict(value = "category", allEntries = true)
所以我们约定,存储同一类型的数据 使用同一个缓存分区
且为了方便管理 配置文件中,不知道自定义前缀,就使用默认的 分区名为前缀
Spring-Cache 的不足:
使用@Cacheable时可以指定sync = true解决缓存击穿问题,但是不是分布式锁。