新闻资讯

新闻资讯 行业动态

关于缓存穿透、缓存击穿、缓存雪崩的模拟与解决(Redis)

编辑:008     时间:2020-03-21

前言

在我们日常的开发中,无不都是使用数据库来进行数据的存储,但当遇到大量数据并发请求的需求,如秒杀、热点数据请求等,若所有请求都直接打到数据库上会占用大量的硬盘资源,系统在极短的时间内完成成千上万次的读/写操作,极其容易造成数据库系统瘫痪。

此时我们会引入缓存层来阻挡大部分的请求,减轻数据库压力。但引入缓存层往往带来缓存穿透,缓存击穿,缓存雪崩等问题。

本文以Redis为例模拟且解决以上三个问题。

缓存击穿

缓存击穿是指缓存中没有但数据库中有的数据(一般是缓存时间到期),这时由于并发用户特别多,同时读缓存没读到数据,又同时去数据库去取数据,此时如果你的代码没有实现同步机制,会造成小部分的请求直接打到数据库上,给数据库带来一定的压力。

模拟需求

模拟需求:某秒杀活动即将开始,模拟1w个请求同时发生,要获取某商品的商品详情信息

期望:只能有1个请求打到数据库,其他请求均打到Redis或其他缓存中

错误示例

我们先看错误示例,以下示例代码没有做任何同步,模拟情况一。

import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; /**
 * @author HeyS1
 * @date 2020/3/12
 * @description */ public class ConcurrentTest { //请求次数 private int reqestQty = 10000; //倒计时器,当发送reqestQty次请求后继续执行主线程 private CountDownLatch latch = new CountDownLatch(reqestQty); //记录请求落在数据库上的次数 private AtomicInteger dbSelectCount = new AtomicInteger(); //记录请求落在缓存中的次数 private AtomicInteger cacheSelectCount = new AtomicInteger(); //用HashMap模拟缓存储存 private Map<String, String> cache = new HashMap<>(); public static void main(String[] args) { new ConcurrentTest().go();
    } private void go() { //同时创建1w个线程获取 for (int i = 0; i < reqestQty; i++) { new Thread(() -> { this.getGoodsDetail("商品id");
                latch.countDown();
            }).start();
        } // 计数器大于0 时,await()方法会阻塞程序继续执行 try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("数据库查询次数:" + dbSelectCount.get());
        System.out.println("缓存查询次数:" + cacheSelectCount.get());
    } /**
     * 获取商品数据
     *
     * @param key 商品id
     * @return */ public String getGoodsDetail(String key) { //先从缓存查询,存在则直接返回 String data = this.selectCache(key); if (data != null) { return data;
        } //不存在则从数据库查询且将数据放入缓存 data = this.selectDB(key);
        cache.put(key, data); return data;
    } /**
     * 从缓存中获取数据
     *
     * @param key
     * @return */ public String selectCache(String key) {
        cacheSelectCount.addAndGet(1);//记录次数 System.out.println(Thread.currentThread().getId() + " 从cache获取数据===="); return cache.get(key);
    } /**
     * 从数据库中获取数据
     *
     * @param key
     * @return */ public String selectDB(String key) {
        sleep(100);//模拟查询数据库花费100ms dbSelectCount.addAndGet(1);//记录次数 System.out.println(Thread.currentThread().getId() + " 从db获取数据===="); return "数据中的数据";
    } private static void sleep(long m) { try {
            Thread.sleep(m);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

结果:

数据库查询次数:202
缓存查询次数:10000

可以看出,如果getGoodsDetail方法没做任何处理,还是会有少数请求直接打到数据库,这就是缓存穿透

/**
     * 获取商品数据
     *
     * @param key 商品id
     * @return */ public String getGoodsDetail(String key) { //先从缓存查询,存在则直接返回 String data = this.selectCache(key); if (data != null) { return data;
        } //不存在则从数据库查询且将数据放入缓存 data = this.selectDB(key);
        cache.put(key, data); return data;
    }

 

解决方案1:synchronized 

使用synchronized 同步代码块可解决该问题,此方案最简单,但有缺点:在分布式系统/集群下是无法确保各节点同步,也就是说如果是秒杀等保证库存不超卖的情景下,不能用此方案。但只是查询商品详情这种需求,其实问题也不大,具体看业务。

 

只需修改一下上面示例中的getGoodsDetail方法即可

/**
     * 获取商品数据
     *
     * @param key 商品id
     * @return */ public String getGoodsDetail(String key) { //先从缓存查询,存在则直接返回 String data = this.selectCache(key); if (data != null) { return data;
        } //同步代码块 synchronized (this) { //这里还需要再次查询缓存,防止其他等待进入同步代码块的线程的查询打到数据库上 data = this.selectCache(key); if (data != null) { return data;
            } //不存在则从数据库查询且将数据放入缓存 data = this.selectDB(key);
            cache.put(key, data); return data;
        }
    }
数据库查询次数:1
缓存查询次数:10276

解决方案2:redis分布式锁

该方案适合集群或分布式架构,单机使用也可以,但没意义。

分布式锁实现方式有一般有3种,本文使用Redis来实现

1. 数据库乐观锁;
2. 基于Redis的分布式锁;
3. 基于ZooKeeper的分布式锁

首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

  1. 互斥性。在任意时刻,只有一个客户端能持有锁。 
  2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  3. 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。 
  4. 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。(如果线程 C1 获得锁,但由于业务处理时间过长,锁在线程 C1 还未处理完业务之前已经过期了,这时线程 C2 获得锁,在线程 C2 处理业务期间线程 C1 完成业务执行释放锁操作,但这时线程 C2 仍在处理业务线程 C1 释放了线程 C2 的锁,导致线程 C2 业务处理实际上没有锁提供保护机制;同理线程 C2 可能释放线程 C3 的锁,从而导致严重的问题。)

 

完整示例:

这里用到RedisTemplate,请自行使用Spring集成Redis

@RunWith(SpringRunner.class) @SpringBootTest(classes = App.class) @Slf4j public class ConcurrentTest3 { @Autowired RedisTemplate<String, String> redisTemplate; //请求次数 private int reqestQty = 10000; //倒计时器,当发送reqestQty次请求后继续执行主线程 private CountDownLatch latch = new CountDownLatch(reqestQty); //记录请求落在数据库上的次数 private AtomicInteger dbSelectCount = new AtomicInteger(); //记录请求落在缓存中的次数 private AtomicInteger cacheSelectCount = new AtomicInteger(); @Test public void go() { //同时创建1w个线程获取 for (int i = 0; i < reqestQty; i++) {
            new Thread(() -> { this.getGoodsDetail("商品id");
                latch.countDown();
            }).start();
        } //计数器大于0 时,await()方法会阻塞程序继续执行 try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("数据库查询次数:" + dbSelectCount.get());
        System.out.println("缓存查询次数:" + cacheSelectCount.get());
    } public String getGoodsDetail(String key) { //先从缓存查询,存在则直接返回 String data = this.selectCache(key); if (data != null) { return data;
        } /**
         * requestId主要用来确保解锁的时候,A客户端不要把B客户端获得的锁给释放了
         */ String lockKey = "锁的Key";
        String requestId = "请求客户端ID"; //加锁 if (!this.lock(lockKey, requestId, 10)) { //加锁失败,证明其他线程已获得了锁,此时只需等待一会,再次调用本方法即可 sleep(100); this.getGoodsDetail(key);
        } //加锁成功 //这里还需要再次查询缓存,防止其他等待的线程获得锁时又打到数据库上 data = this.selectCache(key); if (data != null) { return data;
        } //从数据库查询且将数据放入缓存 data = this.selectDB(key);
        redisTemplate.opsForValue().set(key, data, 60, TimeUnit.SECONDS); //释放锁 this.unLock(lockKey, requestId); return data;
    } /**
     * 使用redis特性实现互斥锁(setnx)
     *
     * @param lockKey
     * @param requestId
     * @param expireTime 锁过期时间,即超过该时间仍然未被解锁,则自动解锁,防止死锁
     * @return */ public boolean lock(String lockKey, String requestId, int expireTime) { Boolean res = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, Duration.ofSeconds(expireTime)); return res != null && res;
    } /**
     * 释放锁,使用Lua脚本,确保原子性
     *
     * @param lockKey
     * @param requestId
     * @return */ public boolean unLock(String lockKey, String requestId) {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(script, Boolean.class); Boolean res = redisTemplate.execute(redisScript, Collections.singletonList(lockKey), requestId); return res != null && res;
    } /**
     * 从缓存中获取数据
     *
     * @param key
     * @return */ public String selectCache(String key) {
        cacheSelectCount.addAndGet(1);//记录次数 System.out.println(Thread.currentThread().getId() + " 从cache获取数据===="); return redisTemplate.opsForValue().get(key);
    } /**
     * 从数据库中获取数据
     *
     * @param key
     * @return */ public String selectDB(String key) {
        sleep(100);//模拟查询数据库花费100ms dbSelectCount.addAndGet(1);//记录次数 System.out.println(Thread.currentThread().getId() + " 从db获取数据===="); return "数据中的数据";
    } private static void sleep(long m) { try {
            Thread.sleep(m);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
结果:

数据库查询次数:1
缓存查询次数:32095

参考文章:Redis实现分布式锁的正确使用方式(java版本)

 

解决方案拓展

只要实现了同步机制,基本就可以从根本上解决击穿的问题,当然,我们还有一些方法可以去避免发生穿透的发生,比如

  1. 热点数据永不过期;
  2. 缓存预热,比如秒杀活动开始前,先在redis初始化数据。
  3. 编写脚本,去扫描即将过期但此时访问量巨大的缓存,去延迟它的过期时间。

 

缓存穿透

正常情况下,我们去查询数据都是存在。

那么请求去查询一条压根儿数据库中根本就不存在的数据,也就是缓存和数据库都查询不到这条数据,但是请求每次都会打到数据库上面去。

这种查询不存在数据的现象我们称为缓存穿透。

一般情况是黑客攻击,拿着不存在的ID去发送大量的请求,这样产生的请求到数据库去查询,可能会导致你的数据库由于压力过大而宕掉。

解决方案1:缓存空值

之所以会发生穿透,就是因为缓存中没有存储这些空数据的key。从而导致每次查询都到数据库去了。

那么我们就可以为这些key对应的值设置为null或空字符串 丢到缓存里面去。后面再出现查询这个key 的请求的时候,直接返回null 。

这样,就不用在到数据库中去走一圈了,但是别忘了设置过期时间且时间不宜过长,如5分钟。

解决方案2:布隆过滤器

方案1基本能解决业务场景上的问题,但是遇到网站攻击,不停给redis请求不一样的Key,会导致redis内存爆掉。

此时就需要用到另一种方案:布隆过滤器

布隆过滤器是一个神奇的数据结构,可以用来判断一个元素是否在一个集合中

具体自行查阅网上文章

 

缓存雪崩

缓存雪崩其实是概念性问题,和缓存击穿相似。

缓存雪崩是指缓存中数据大批量到过期时间,而查询数据量巨大,引起数据库压力过大甚至down机。和缓存击穿不同的是:缓存击穿指并发查同一条数据,缓存雪崩是不同数据都过期了,很多数据都查不到从而查数据库。

只要解决了击穿和穿透的问题,就不存在雪崩了,即使某个时间点大量数据过期,也不会直接打到数据库,前提是你的Redis得扛得住。

当然,我们也得尽量避免这个问题,我们可以才用给缓存数据设定随机的过期时间来解决这个问题,避免大量缓存在同一时间过期。

其实上面“解决方案拓展”中提到方法也适用于雪崩,要根据业务灵活运用方案。


原文链接:https://my.oschina.net/yejunxi/blog/3193509

郑重声明:本文版权归原作者所有,转载文章仅为传播更多信息之目的,如作者信息标记有误,请第一时间联系我们修改或删除,多谢。

回复列表

相关推荐