首頁技術文章正文

Web前端培訓:深入理解Redis分布式鎖

更新時間:2022-11-11 來源:黑馬程序員 瀏覽量:

IT培訓班

  相信很多同學都聽說過分布式鎖,但也僅僅停留在概念的理解上,這篇文章會從分布式鎖的應用場景講起,從實現的角度上深度剖析redis如何實現分布式鎖。

  一、超賣問題

  我們先來看超賣的概念:

  當寶貝庫存接近0時,如果多個買家同時付款購買此寶貝,或者店鋪后臺在架數量大于倉庫實際數量,將會出現超賣現象。超賣現象本質上就是買到了比倉庫中數量更多的寶貝。

  > 本文主要解決超賣問題的第一種,同時多人購買寶貝時,造成超賣。

  測試代碼

  那么超賣問題是如何產生的呢?我們準備一段代碼進行測試:

@Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 第一種實現,進程內就存在線程安全問題
     * 可以只啟動一個進程測試
     */
    @RequestMapping("/deduct_stock1")
    public void deductStock1(){

        String stock = stringRedisTemplate.opsForValue().get("stock");
        int stockNum = Integer.parseInt(stock);
        if(stockNum > 0){
            //設置庫存減1
            int realStock = stockNum - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + "");
            System.out.println("設置庫存" + realStock);
        }else{
            System.out.println("庫存不足");
        }

    }

  這段代碼中,使用redis先獲取庫存數量(當然實際場景中不會只保存一個全局庫存數,應該根據每一個商品單元(sku)保存一份庫存數)。

 String stock = stringRedisTemplate.opsForValue().get("stock");
 int stockNum = Integer.parseInt(stock);

  接下來,判斷庫存數是否大于0:

  - 如果大于0,將庫存數減一,通過set命令,寫回redis

  >這里沒有使用redis的decrement命令,因為此命令在redis單線程模型下是線程安全的,而為了可以模擬線程不安全的情況將其拆成三步操作。

  //設置庫存減1
  int realStock = stockNum - 1;
  stringRedisTemplate.opsForValue().set("stock",realStock + "");
  System.out.println("設置庫存" + realStock);

  - 如果小于等于0,提示庫存不足

  JMeter測試

  通過JMeter進行并發(fā)測試,看下會不會出現超賣的問題:

      1.啟動tomcat

  這種情況下,只需要啟動一個tomcat就會出現超賣。我們先啟動一個tomcat在8080端口上。

1668135340736_1.jpg

  2.下載JMeter

  Apache JMeter是Apache組織開發(fā)的基于Java的壓力測試工具。

  從官網上下載即可:

  [https://jmeter.apache.org/download_jmeter.cgi](https://links.jianshu.com/go?to=https%3A%2F%2Fjmeter.apache.org%2Fdownload_jmeter.cgi)

  下載完之后解壓,運行bin目錄下的jmeter.bat,顯示如下界面:

1668135364378_2.jpg

  如果嫌字體太小,可以選擇放大:

1668135376942_3.jpg

  3.配置JMeter

  在Test Plan上點擊右鍵,創(chuàng)建`線程組(Thread Group)`

1668135398609_4.jpg

  配置一下具體參數:

1668135413660_5.jpg

  - `Number of Threads` 同時并發(fā)線程數

  - `Ramp-Up Period(in-seconds)` 代表隔多長時間執(zhí)行,0代表同時并發(fā)。假設線程數為100, 估計的點擊率為每秒10次, 那么估計的理想ramp-up period 就是 100/10 = 10 秒

  - `Loop Count` 循環(huán)次數

  > 這里給出500是為了直接測試并發(fā)500搶,看看能不能正好把500個貨物搶完。

  添加Http請求:

1668135439553_6.jpg

  添加請求URL:

1668135453144_7.jpg

  添加聚合結果,用來顯示整體的運行情況:

1668135465198_8.jpg

  到此為止JMeter的配置結束。

  4.設置庫存量

  啟動redis-server,使用redis-client連接:

1668135483864_9.jpg

  把庫存數設置為500。

  5.開始測試

  點擊運行按鈕,啟動測試:

1668135500642_10.jpg

  首先我們看到聚合報告里輸出的結果:

1668135538911_11.jpg

  錯誤率0%,樣本數500,證明500個請求都已經執(zhí)行,但是發(fā)現控制臺輸出如下:

<img src="assets/12.png" alt="img" style="zoom:67%;" />

    很顯然,一份商品都被賣了多次,這顯然是不合理的。

  原因分析

  現在我們只啟動了一個tomcat,在單jvm進程的情況下,tomcat會使用線程池接收請求:

1668135598645_13.jpg

  而由于每個線程可能同時獲取到庫存量,所以庫存量在兩個線程中顯示的都是500,然后兩個線程就繼續(xù)進行扣減庫存操作,得出499寫回redis中,在這個過程中,顯然存在線程安全的問題。同一個商品被賣出了2份,超賣問題就出現了。

  二、加鎖優(yōu)化

  synchronized鎖

  要保證單jvm中線程安全,最簡單直接的方式就是添加synchronized關鍵字,那么這樣行不行呢,我們來做一個測試:

  /**
     * 第二種實現,使用synchronized加鎖
     * 可以只啟動一個進程測試
     */
    @RequestMapping("/deduct_stock2")
    public void deductStock2(){

        synchronized (this){
            String stock = stringRedisTemplate.opsForValue().get("stock");
            int stockNum = Integer.parseInt(stock);
            if(stockNum > 0){
                //設置庫存減1
                int realStock = stockNum - 1;
                stringRedisTemplate.opsForValue().set("stock",realStock + "");
                System.out.println("設置庫存" + realStock);
            }else{
                System.out.println("庫存不足");
            }
        }

    }

  在進行扣減庫存前,先通過synchronized關鍵字,對資源加鎖,這樣就只有一個線程能進入到扣減庫存的代碼塊中。來測試一下:

  重置庫存

set stock 500

  修改接口地址

1668135664179_14.jpg

  測試

<img src="assets/15.png" alt="img" style="zoom:67%;" />

  可以看到,庫存被扣減為0,并且沒有出現超賣的情況(設置了500庫存,并且500個人搶,正好搶完)。

  但是這種方案顯然是不行的,在生產環(huán)境上如果部署多個tomcat實例,那么就會出現如下情況:

1668135702539_16.jpg

  多個進程無法共享jvm內存中的鎖,所以會出現多把鎖,這種情況下也會出現超賣問題。

  三、分布式鎖的實現

  多Tomcat實例下的超賣演示

  接下來我們演示一下如何在多個Tomcat情況下,演示超賣的問題:

       1.啟動兩個tomcat服務

  在IDEA中配置兩個spring boot的啟動項,使用vm參數指定不同的端口號

  ```undefined

  -Dserver.port=8080

  ```

1668135752623_17.jpg

1668135765972_18.jpg

  2.配置nginx

  編寫~/nginx_redis/conf/nginx.conf如下:

user  nginx;
worker_processes  1;

error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;


events {
    worker_connections  1024;
}


http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';


upstream redislock{
     server 192.168.226.1:8080 weight=1;
     server 192.168.226.1:8081 weight=1;
}

 server {
        listen       80;
        server_name  localhost;
    location /{
           root html;
           proxy_pass http://redislock;
    }
}

    access_log  /var/log/nginx/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    keepalive_timeout  65;

    #gzip  on;

    include /etc/nginx/conf.d/*.conf;
}

  > 192.168.226.1這是我宿主機的IP

  準備一個虛擬機(也可以使用windows下的nginx),使用docker啟動nginx:

docker pull nginx
docker run -di -p 10085:80 --name nginx-redis-hc   -v ~/nginx_redis/html:/usr/share/nginx/html   -v ~/nginx_redis/conf/nginx.conf:/etc/nginx/nginx.conf   -v ~/nginx_redis/logs:/var/log/nginx   nginx

  在宿主機下使用`虛擬機的IP地址:10085`訪問nginx,如果出現如下頁面就代表成功:

1668135849014_19.jpg

  3.測試

  修改接口地址為nginx:

1668135863401_20.jpg

  運行查看兩個tomcat的控制臺:

  - tomcat1

<img src="assets/21.png" alt="img" style="zoom:67%;" />

  - tomcat2

<img src="assets/22.png" alt="img" style="zoom:67%;" />

  
       沒有將庫存清空,證明存在超賣問題。

  手動實現分布式鎖

  使用redis手動實現分布式鎖,需要用到命令`setnx`。先來介紹一下setnx:

  SETNX key value[]

  > 可用版本: >= 1.0.0

  >

  > 時間復雜度: O(1)

  只在鍵 `key` 不存在的情況下, 將鍵 `key` 的值設置為 `value` 。

  若鍵 `key` 已經存在, 則 `SETNX` 命令不做任何動作。

  `SETNX` 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。

  返回值

  命令在設置成功時返回 `1` , 設置失敗時返回 `0` 。

  代碼示例

redis> EXISTS job                # job 不存在
# job 不存在
(integer) 0

redis> SETNX job "programmer"    # job 設置成功
(integer) 1

redis> SETNX job "code-farmer"   # 嘗試覆蓋 job ,失敗
(integer) 0

redis> GET job                   # 沒有被覆蓋

  使用redis構建分布式鎖流程如下:

1668135991872_23.jpg

  image.png

  - 線程1申請鎖(`setnx`),拿到了鎖。

  - 線程2申請鎖,由于線程1已經擁有了鎖,`setnx`返回0失敗,這一步用戶操作會失敗。

  - 線程1執(zhí)行扣減庫存操作并釋放鎖。

  - 線程2再次申請鎖,獲取到鎖并執(zhí)行扣減庫存,然后釋放鎖。

  > 注意這里線程沒有拿到鎖,如果不嘗試while(true)重新獲取鎖,這個操作就直接失敗了。

  代碼實現

/**
     * 第三種實現,使用redis中的setIfAbsent(setnx命令)實現分布式鎖
     */
    @RequestMapping("/deduct_stock3")
    public void deductStock3(){

        //在獲取到鎖的時候,給鎖分配一個id
        String opId = UUID.randomUUID().toString();
        Boolean stockLock = stringRedisTemplate
                .opsForValue().setIfAbsent("stockLock", opId, Duration.ofSeconds(30);

        if(stockLock){

            try{
                String stock = stringRedisTemplate.opsForValue().get("stock");
                int stockNum = Integer.parseInt(stock);
                if(stockNum > 0){
                    //設置庫存減1
                    int realStock = stockNum - 1;
                    stringRedisTemplate.opsForValue().set("stock",realStock + "");
                    System.out.println("設置庫存" + realStock);
                }else{
                    System.out.println("庫存不足");
                }

            }catch(Exception e){
                e.printStackTrace();
            }finally {
                if(opId.equals(stringRedisTemplate
                        .opsForValue().get("stockLock"))){
                    stringRedisTemplate.delete("stockLock");
                }
            }

        }

    }

  測試略過,這里有幾個知識點需要說明

  setIfAbsent設置超時

  如果setIfAbsent不設置超時時間,假設線程執(zhí)行業(yè)務代碼時間時死鎖或者其他原因導致長時間不釋放,那么會影響其他線程獲取到鎖,這個時候整體業(yè)務就會出現不可用。

Boolean stockLock = stringRedisTemplate
                .opsForValue().setIfAbsent("stockLock", opId, Duration.ofSeconds(30);

  設置超時時間為30秒,該時間一般大于業(yè)務執(zhí)行的最大時間。

  每次獲取到鎖,設置唯一ID

  考慮這樣的場景

1668136077380_24.jpg

  - 線程1獲取鎖扣減庫存,但是由于操作不當,長時間卡住,這樣會觸發(fā)超時時間鎖被釋放。

  - 線程2獲取到鎖,扣減庫存。

  - 線程1的代碼拋出異常,執(zhí)行finally釋放鎖,但是釋放的是進程B的鎖。

  解決方案就是在**加鎖前生成UUID**,釋放的時候校驗UUID是否正確,如果不正確,說明加鎖線程不是當前線程。

  使用Redisson實現分布式鎖

  setnx雖好,但是實現起來畢竟太過麻煩,一不小心就可能陷入并發(fā)編程的陷阱中,那么有沒有更加簡單的實現方式呢?答案就是`redisson`。

  > Redisson是架設在[Redis](https://links.jianshu.com/go?to=http%3A%2F%2Fwww.oschina.net%2Fp%2Fredis)基礎上的一個Java駐內存數據網格(In-Memory Data Grid)。【[Redis官方推薦](https://links.jianshu.com/go?to=http%3A%2F%2Fwww.redis.io%2Fclients)】

  > Redisson在基于NIO的[Netty](https://links.jianshu.com/go?to=http%3A%2F%2Fnetty.io%2F)框架上,充分的利用了Redis鍵值數據庫提供的一系列優(yōu)勢,在Java實用工具包中常用接口的基礎上,為使用者提供了一系列具有分布式特性的常用工具類。使得原本作為協調單機多線程并發(fā)程序的工具包獲得了協調分布式多機多線程并發(fā)系統(tǒng)的能力,大大降低了設計和研發(fā)大規(guī)模分布式系統(tǒng)的難度。同時結合各富特色的分布式服務,更進一步簡化了分布式環(huán)境中程序相互之間的協作。

  總而言之,`redisson`提供了一系列較為完善的工具類,其中就包含了分布式鎖。用`redisson`實現分布式鎖的流程極為簡單。

  引入依賴

       <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.14.0</version>
        </dependency>

  創(chuàng)建Redisson實例

  @Bean
    public RedissonClient redisson(){
        // 1. Create config object
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
//        config.useClusterServers()
//                // use "rediss://" for SSL connection
//                .addNodeAddress("redis://127.0.0.1:7181");
        return Redisson.create(config);
    }

  編寫分布式鎖代碼

  @Autowired
    private RedissonClient redissonClient;
    /**
     * 第四種實現,使用redisson實現
     */
    @RequestMapping("/deduct_stock4")
    public void deductStock4(){

        RLock lock = redissonClient.getLock("redisson:stockLock");
        try{
            //加鎖
            lock.lock();
            String stock = stringRedisTemplate.opsForValue().get("stock");
            int stockNum = Integer.parseInt(stock);
            if(stockNum > 0){
                //設置庫存減1
                int realStock = stockNum - 1;
                stringRedisTemplate.opsForValue().set("stock",realStock + "");
                System.out.println("設置庫存" + realStock);
            }else{
                System.out.println("庫存不足");
            }

        }catch(Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }

    }

  其中加鎖代碼基本與進程內加鎖一致,就不再詳細解讀,讀者自行實踐即可。

  Redisson分布式鎖原理

  `Redisson分布式鎖`的主要原理非常簡單,利用了lua腳本的原子性。

  在分布式環(huán)境下產生并發(fā)問題的主要原因是三個操作并不是原子操作:

  - 獲取庫存

  - 扣減庫存

  - 寫入庫存

  那么如果我們把三個操作合并為一個操作,在默認單線程的Redis中運行,是不會產生并發(fā)問題的。源碼如下:

 <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

  這一段源碼中,`redisson`利用了lua腳本的原子性,校驗key是否存在,如果不存在就創(chuàng)建key并利用incrby加一操作(這步操作主要是為了實現可重入性)。`redisson`實現的分布式鎖具備如下特性:

  - 鎖失效

  - 鎖續(xù)租

  > 執(zhí)行時間長的鎖快要到期時會自動續(xù)租

  - 可重入

  - 操作原子性

  鎖續(xù)租原理

  使用如下代碼進行測試鎖續(xù)租的情況

@Test
void test() throws InterruptedException {
    RLock testlock1111 = redissonClient.getLock("testlock");
    testlock1111.lock();
    try{
        Thread thread = new Thread(() -> {
            while(true){
                Long testlock = redisTemplate.getExpire("testlock");
                System.out.println(LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME) + " ttl:" + testlock);
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        thread.start();
        thread.join();
    }finally {
        if(testlock1111.isHeldByCurrentThread()){
            testlock1111.unlock();
        }
    }

}

  我們會發(fā)現,每隔10秒會自動續(xù)租一次,保證鎖不被釋放。

<img src="assets/25.png" alt="image-20220721153251169" style="zoom:67%;" />

  那么這種續(xù)租的行為是如何實現的呢?考慮這種情況:如果線程加鎖之后,進程宕機,線程無法執(zhí)行解鎖代碼,那么這個鎖就無法得到釋放(注意,不是加鎖線程不允許亂解鎖),為了避免這種情況的發(fā)生,鎖都會設置一個過期時間。比如使用**lock**無參命令會默認設置30秒的過期時間。那么30秒之后呢?如果線程還在工作,自動釋放依然會產生線程安全的問題。所以Redisson使用了watch dog看門狗機制來實現自動續(xù)租。

  核心代碼及注釋:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    //lock()無參方法leaseTime為-1,所以進else分值
    if (leaseTime > 0) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        //通過lua腳本加鎖
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // 異步方法,等到加鎖成功會回調,第一次加鎖ttlRemaining為空,leaseTime為-1
        if (ttlRemaining == null) {
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                //設置延時任務
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

  接下來分析scheduleExpirationRenewal的過程:

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    //創(chuàng)建一個延遲任務
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            //執(zhí)行l(wèi)ua腳本進行續(xù)租
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                //執(zhí)行l(wèi)ua續(xù)租,鎖還在就續(xù)租,鎖不在返回false就取消續(xù)租的行為
                if (res) {
                    // reschedule itself
                    renewExpiration();
                } else {
                    cancelExpirationRenewal(null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);  //internalLockLeaseTime默認值30,所以每10秒會續(xù)租一次,續(xù)租到30秒
   
    ee.setTimeout(task);
}

  其中,renewExpirationAsync執(zhí)行的lua腳本如下:

protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

  判斷hash中是否存在鎖,如果存在就設置過期時間為30秒,返回1。如果不存在就返回0。

  總結

  本文介紹了超賣問題產生的原因:操作不具備原子性,同時提出了集中解決思路。

  - `synchronized鎖`,無法保證多實例下的線程安全

  - `setnx`手動實現,坑很多、代碼較為復雜

  - `redisson`實現,能夠保證多實例下線程安全,代碼簡單可靠

分享到:
在線咨詢 我要報名
和我們在線交談!