首页 > 技术文章 > Java面试必问之-缓存

JayV 2020-09-16 13:29 原文

缓存问题

缓存穿透

​ 缓存穿透是指缓存和数据库中都没有的数据,而用户不断发起请求,如发起为id为“-1”的数据或id为特别大不存在的数据。这时的用户很可能是攻击者,攻击会导致数据库压力过大。

解决方案:

  1. 接口层增加校验,如用户鉴权校验,id做基础校验,id<=0的直接拦截;
  2. 从缓存取不到的数据,在数据库中也没有取到,这时也可以将key-value对写为key-null,缓存有效时间可以设置短点,如30秒(设置太长会导致正常情况也没法使用)。这样可以防止攻击用户反复用同一个id暴力攻击

缓存击穿

​ 缓存击穿是指缓存中没有但数据库中有的数据(一般是缓存时间到期),这时由于并发用户特别多,同时读缓存没读到数据,又同时去数据库去取数据,引起数据库压力瞬间增大,造成过大压力

解决方案:

  1. 设置热点数据永远不过期。
  2. 加互斥锁,互斥锁参考代码如下:

image-20200916132808397

缓存雪崩

​ 缓存雪崩是指缓存中数据大批量到过期时间,而查询数据量巨大,引起数据库压力过大甚至down机。和缓存击穿不同的是,缓存击穿指并发查同一条数据,缓存雪崩是不同数据都过期了,很多数据都查不到从而查数据库。

解决方案

  1. 缓存数据的过期时间设置随机,防止同一时间大量数据过期现象发生。
  2. 如果缓存数据库是分布式部署,将热点数据均匀分布在不同的缓存数据库中。
  3. 设置热点数据永远不过期。

缓存策略

1、存储集合的选择

实现本地缓存,存储容器肯定是 key/value 形式的数据结构,在 Java 中,也就是我们常用的 Map 集合。Map 中有 HashMap、Hashtable、ConcurrentHashMap 几种供我们选择,如果不考虑高并发情况下数据安全问题,我们可以选择HashMap,如果考虑高并发情况下数据安全问题,我们可以选择 Hashtable、ConcurrentHashMap 中的一种集合,但是我们优先选择 ConcurrentHashMap,因为 ConcurrentHashMap 的性能比 Hashtable 要好。

2、过期缓存处理

因为缓存直接存储在内存中,如果我们不处理过期缓存,内存将被大量无效缓存占用,这不是我们想要的,所以我们需要清理这些失效的缓存。过期缓存处理可以参考 Redis 的策略来实现,Redis 采用的是定期删除 + 懒惰淘汰策略。

定期删除策略

定期删除策略是每隔一段时间检测已过期的缓存,并且降之删除。这个策略的优点是能够确保过期的缓存都会被删除。同时也存在着缺点,过期的缓存不一定能够及时的被删除,这跟我们设置的定时频率有关系,另一个缺点是如果缓存数据较多时,每次检测也会给 cup 带来不小的压力。

懒惰淘汰策略

懒惰淘汰策略是在使用缓存时,先判断缓存是否过期,如果过期将它删除,并且返回空。这个策略的优点是只有在查找的时候,才判断是否过期,对 CUP 影响较小。同时这种策略有致命的缺点,当存入了大量的缓存,这些缓存都没有被使用并且已过期,都将成为无效缓存,这些无效的缓存将占用你大量的内存空间,最后导致服务器内存溢出。

我们简单的了解了一下 Redis 的两种过期缓存处理策略,每种策略都存在自己的优缺点。所以我们在使用过程中,可以将两种策略组合起来,结合效果还是非常理想的。

3、缓存淘汰策略

缓存淘汰跟过期缓存处理要区别开来,缓存淘汰是指当我们的缓存个数达到我们指定的缓存个数时,毕竟我们的内存不是无限的。如果我们需要继续添加缓存的话,我们就需要在现有的缓存中根据某种策略淘汰一些缓存,给新添加的缓存腾出位置,下面一起来认识几种常用的缓存淘汰策略。

先进先出策略

最先进入缓存的数据在缓存空间不够的情况下会被优先被清除掉,以腾出新的空间接受新的数据。该策略主要比较缓存元素的创建时间。在一些对数据实效性要求比较高的场景下,可考虑选择该类策略,优先保障最新数据可用。

最少使用策略

无论是否过期,根据元素的被使用次数判断,清除使用次数较少的元素释放空间。该策略主要比较元素的hitCount(命中次数),在保证高频数据有效性场景下,可选择这类策略。

最近最少使用策略

无论是否过期,根据元素最后一次被使用的时间戳,清除最远使用时间戳的元素释放空间。该策略主要比较缓存最近一次被get使用时间。在热点数据场景下较适用,优先保证热点数据的有效性。

随机淘汰策略

无论是否过期,随机淘汰某个缓存,如果对缓存数据没有任何要求,可以考虑使用该策略。

不淘汰策略

当缓存达到指定值之后,不淘汰任何缓存,而是不能新增缓存,直到有缓存淘汰时,才能继续添加缓存。

本地缓存

ConcurrentHashMap

可以采用 ConcurrentHashMap 作为存储集合,这样即使在高并发的情况下,我们也能够保证缓存的安全。过期缓存处理在这里我只使用了定时删除策略,并没有使用定时删除 + 懒惰淘汰策略,你可以自己动手尝试一下使用这两种策略进行过期缓存处理。在缓存淘汰方面,我在这里采用了最少使用策略。好了,技术选型都知道了,我们一起来看看代码实现。

缓存对象类

public class Cache implements Comparable<Cache>{
    // 键
    private Object key;
    // 缓存值
    private Object value;
    // 最后一次访问时间
    private long accessTime;
    // 创建时间
    private long writeTime;
    // 存活时间
    private long expireTime;
    // 命中次数
    private Integer hitCount;

添加缓存

/**
 * 添加缓存
 *
 * @param key
 * @param value
 */
public void put(K key, V value,long expire) {
    checkNotNull(key);
    checkNotNull(value);
    // 当缓存存在时,更新缓存
    if (concurrentHashMap.containsKey(key)){
        Cache cache = concurrentHashMap.get(key);
        cache.setHitCount(cache.getHitCount()+1);
        cache.setWriteTime(System.currentTimeMillis());
        cache.setAccessTime(System.currentTimeMillis());
        cache.setExpireTime(expire);
        cache.setValue(value);
        return;
    }
    // 已经达到最大缓存
    if (isFull()) {
        Object kickedKey = getKickedKey();
        if (kickedKey !=null){
            // 移除最少使用的缓存
            concurrentHashMap.remove(kickedKey);
        }else {
            return;
        }
    }
    Cache cache = new Cache();
    cache.setKey(key);
    cache.setValue(value);
    cache.setWriteTime(System.currentTimeMillis());
    cache.setAccessTime(System.currentTimeMillis());
    cache.setHitCount(1);
    cache.setExpireTime(expire);
    concurrentHashMap.put(key, cache);
}

获取缓存

/**
 * 获取缓存
 * @param key
 * @return
 */
public Object get(K key) {
    checkNotNull(key);
    if (concurrentHashMap.isEmpty()) return null;
    if (!concurrentHashMap.containsKey(key)) return null;
    Cache cache = concurrentHashMap.get(key);
    if (cache == null) return null;
    cache.setHitCount(cache.getHitCount()+1);
    cache.setAccessTime(System.currentTimeMillis());
    return cache.getValue();
}

获取最少使用的缓存

    /**
     * 获取最少使用的缓存
     * @return
     */
    private Object getKickedKey() {
        Cache min = Collections.min(concurrentHashMap.values());
        return min.getKey();
    }

过期缓存检测方法

/**
 * 处理过期缓存
 */
class TimeoutTimerThread implements Runnable {
    public void run() {
        while (true) {
            try {
                TimeUnit.SECONDS.sleep(60);
                expireCache();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 创建多久后,缓存失效
     *
     * @throws Exception
     */
    private void expireCache() throws Exception {
        System.out.println("检测缓存是否过期缓存");
        for (Object key : concurrentHashMap.keySet()) {
            Cache cache = concurrentHashMap.get(key);
            long timoutTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()
                    - cache.getWriteTime());
            if (cache.getExpireTime() > timoutTime) {
                continue;
            }
            System.out.println(" 清除过期缓存 : " + key);
            //清除过期缓存
            concurrentHashMap.remove(key);
        }
    }
}

guava cache组件

某些热点数据在短时间内可能会被成千上万次访问,所以除了放在redis之外,还可以放在本地内存,也就是JVM的内存中。

我们可以使用google的guava cache组件实现本地缓存,之所以选择guava是因为它可以控制key和value的大小和超时时间,可以配置LRU策略且guava是线程安全的。

首先引入guava cache

<dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>18.0</version>
</dependency>

编写CacheService接口

public interface CacheService {
    //存方法
    void setCommonCache(String key,Object value);
    //取方法
    Object getFromCommonCache(String key);
}

实现CacheService(@PostConstruct注解的方法将会在依赖注入完成后被自动调用。)

@Service
public class CacheServiceImpl implements CacheService {

    private Cache<String,Object> commonCache=null;


    @PostConstruct
    public void init(){
        commonCache= CacheBuilder.newBuilder()
                //缓存初始容量10
                .initialCapacity(10)
                //最多100个key,超过按LRU策略移除
                .maximumSize(100)
                //写入后多少秒过期
                .expireAfterWrite(60, TimeUnit.SECONDS).build();
    }
    @Override
    public void setCommonCache(String key, Object value) {
        commonCache.put(key,value);
    }

    @Override
    public Object getFromCommonCache(String key) {
        return commonCache.getIfPresent(key);
    }
}

使用CacheService

@RequestMapping(value = "/get",method = {RequestMethod.GET})
    @ResponseBody
    public CommonReturnType getItem(@RequestParam(name = "id")Integer id){
        //在本地缓存中查找
        ItemModel itemModel= (ItemModel) cacheService.getFromCommonCache("item_"+id);

        if(itemModel==null){
            //本地缓存没有则到redis缓存中查找
            itemModel= (ItemModel) redisTemplate.opsForValue().get("item_"+id);
            if(itemModel==null){
                //都没有则到数据库查找,找到后放入redis中
                itemModel = itemService.getItemById(id);
                redisTemplate.opsForValue().set("item_"+id,itemModel);
                redisTemplate.expire("item_"+id,10, TimeUnit.MINUTES);
            }
            //本地缓存没有时,在redis或数据库找到后再放入本地缓存
            cacheService.setCommonCache("item_"+id,itemModel);
        }


        ItemVO itemVO = convertVOFromModel(itemModel);

        return CommonReturnType.create(itemVO);

    }

面试

缓存雪崩、缓存穿透、缓存预热、缓存更新、缓存降级等问题

一、缓存雪崩

我们可以简单的理解为:由于原有缓存失效,新缓存未到期间 (例如:我们设置缓存时采用了相同的过期时间,在同一时刻出现大面积的缓存过期),所有原本应该访问缓存的请求都去查询数据库了,而对数据库CPU和内存造成巨大压力,严重的会造成数据库宕机。从而形成一系列连锁反应,造成整个系统崩溃。
解决办法
大多数系统设计者考虑用加锁( 最多的解决方案)或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。还有一个简单方案就是将缓存失效时间分散开,例如设置随机缓存时间。

二、缓存穿透

缓存穿透是指用户查询数据,在数据库没有,自然在缓存中也不会有。这样就导致用户查询的时候,在缓存中找不到,每次都要去数据库再查询一遍,然后返回空(相当于进行了两次无用的查询)。这样请求就绕过缓存直接查数据库,这也是经常提的缓存命中率问题。
解决办法;
最常见的则是采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的bitmap中,一个一定不存在的数据会被这个bitmap拦截掉,从而避免了对底层存储系统的查询压力。
另外也有一个更为简单粗暴的方法,如果一个查询返回的数据为空(不管是数据不存在,还是系统故障),我们仍然把这个空结果进行缓存,但它的过期时间会很短,最长不超过五分钟。通过这个直接设置的默认值存放到缓存,这样第二次到缓冲中获取就有值了,而不会继续访问数据库,这种办法最简单粗暴。

5TB的硬盘上放满了数据,请写一个算法将这些数据进行排重。如果这些数据是一些32bit大小的数据该如何解决?如果是64bit的呢?

对于空间的利用到达了一种极致,那就是Bitmap和布隆过滤器(Bloom Filter)。
Bitmap: 典型的就是哈希表
缺点是,Bitmap对于每个元素只能记录1bit信息,如果还想完成额外的功能,恐怕只能靠牺牲更多的空间、时间来完成了。

Bloom-Filter布隆过滤器(推荐)

就是引入了k(k>1相互独立的哈希函数,保证在给定的空间、误判率下,完成元素判重的过程。
它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。
Bloom-Filter算法的核心思想就是利用多个不同的Hash函数来解决“冲突”。
Hash存在一个冲突(碰撞)的问题,用同一个Hash得到的两个URL的值有可能相同。为了减少冲突,我们可以多引入几个Hash,如果通过其中的一个Hash值我们得出某元素不在集合中,那么该元素肯定不在集合中。只有在所有的Hash函数告诉我们该元素在集合中时,才能确定该元素存在于集合中。这便是Bloom-Filter的基本思想。

Bloom-Filter一般用于在大数据量的集合中判定某元素是否存在。

缓存穿透与缓存击穿的区别

缓存击穿:指一个key非常热点,大并发集中对这个key进行访问,当这个key在失效的瞬间,仍然持续的大并发访问就穿破缓存,转而直接请求数据库。
解决方案;在访问key之前,采用SETNX(set if not exists)来设置另一个短期key来锁住当前key的访问,访问结束再删除该短期key。

三、缓存预热

缓存预热就是系统上线后,将相关的缓存数据直接加载到缓存系统。这样就可以避免在用户请求的时候,先查询数据库,然后再将数据缓存的问题!用户直接查询事先被预热的缓存数据!
解决思路:
1、直接写个缓存刷新页面,上线时手工操作下;
2、数据量不大,可以在项目启动的时候自动进行加载;
3、定时刷新缓存;

四、缓存更新

除了缓存服务器自带的缓存失效策略之外(Redis默认的有6种策略可供选择),我们还可以根据具体的业务需求进行自定义的缓存淘汰,常见的策略有两种:
(1)定时去清理过期的缓存;
(2)当有用户请求过来时,再判断这个请求所用到的缓存是否过期,过期的话就去底层系统得到新数据并更新缓存。
两者各有优劣,第一种的缺点是维护大量缓存的key是比较麻烦的,第二种的缺点就是每次用户请求过来都要判断缓存失效,逻辑相对比较复杂!具体用哪种方案,大家可以根据自己的应用场景来权衡。

五、缓存降级

当访问量剧增、服务出现问题(如响应时间慢或不响应)或非核心服务影响到核心流程的性能时,仍然需要保证服务还是可用的,即使是有损服务。系统可以根据一些关键数据进行自动降级,也可以配置开关实现人工降级。
降级的最终目的是保证核心服务可用,即使是有损的。而且有些服务是无法降级的(如加入购物车、结算)。
以参考日志级别设置预案:
(1)一般:比如有些服务偶尔因为网络抖动或者服务正在上线而超时,可以自动降级;
(2)警告:有些服务在一段时间内成功率有波动(如在95~100%之间),可以自动降级或人工降级,并发送告警;
(3)错误:比如可用率低于90%,或者数据库连接池被打爆了,或者访问量突然猛增到系统能承受的最大阀值,此时可以根据情况自动降级或者人工降级;
(4)严重错误:比如因为特殊原因数据错误了,此时需要紧急人工降级。

服务降级的目的,是为了防止Redis服务故障,导致数据库跟着一起发生雪崩问题。因此,对于不重要的缓存数据,可以采取服务降级策略,例如一个比较常见的做法就是,Redis出现问题,不去数据库查询,而是直接返回默认值给用户。

热点数据和冷数据是什么

热点数据,缓存才有价值
对于冷数据而言,大部分数据可能还没有再次访问到就已经被挤出内存,不仅占用内存,而且价值不大。频繁修改的数据,看情况考虑使用缓存
对于上面两个例子,寿星列表、导航信息都存在一个特点,就是信息修改频率不高,读取通常非常高的场景。
对于热点数据,比如我们的某IM产品,生日祝福模块,当天的寿星列表,缓存以后可能读取数十万次。再举个例子,某导航产品,我们将导航信息,缓存以后可能读取数百万次。
数据更新前至少读取两次,缓存才有意义。这个是最基本的策略,如果缓存还没有起作用就失效了,那就没有太大价值了。
那存不存在,修改频率很高,但是又不得不考虑缓存的场景呢?有!比如,这个读取接口对数据库的压力很大,但是又是热点数据,这个时候就需要考虑通过缓存手段,减少数据库的压力,比如我们的某助手产品的,点赞数,收藏数,分享数等是非常典型的热点数据,但是又不断变化,此时就需要将数据同步保存到Redis缓存,减少数据库压力。

推荐阅读