个人技术分享

悲观锁 和 乐观锁

Java中的synchronized就是悲观锁的一个实现,悲观锁可以确保无论哪个线程持有锁,都能独占式的访问临界区代码,虽然悲观锁的实现比较简单,但是还是会存在不少问题。

悲观锁总是假设会发生最坏的情况,每次线程去读取数据的时候,也会上锁。这样其他线程在读取数据的时候也会被阻塞,直到它拿到锁,传统的关系型数据库就用到了很多的悲观锁,如行锁表锁读锁写锁

悲观锁会存在以下几个问题:

  1. 在多线程环境下, 加锁和释放锁都会导致线程上下文的切换以及调度延时,会引发一系列性能问题。
  2. 一个线程持有锁的时候,会导致其他抢锁线程都被临时挂起
  3. 如果一个线程优先级高的线程等待一个优先级低的线程释放锁,就会导致线程优先级倒置,从而引发性能风险。

解决以上悲观锁的方式,就是使用乐观锁去替代 悲观锁。乐观锁其实时一种思想,在使用乐观锁的时候,每次线程都去读取的数据的时候都认为其他线程不会进行修改,所以不会上锁,仅仅在更新的时候判断一下其他线程有没有去更新这个数据

数据库操作中的带版本号的数据更新,JUC原子类中都使用了乐观锁的方式来提高性能。

这里针对于悲观锁,我们就就不在举例说明了,感兴趣的话可以去学习一下看一下之前的synchronized章节。

1.基于CAS实现乐观锁

乐观锁的实现步骤主要就两个

(1)冲突监测

(2)数据更新

乐观锁时一种比较典型的CAS原子操作,JUC强大的高并发性能就是建立在CAS原子操作上的,CAS操作中包含三个操作数

(1)需要操作的内存位置(V)

(2)进行比较的预期原值(A)

(3)拟写入的新值(B)

如果内存 V的位置的值 预期原值 A 比较一致,那么CPU会自动将该位置的值替换为新的值 B否则CPU不做任何操作

下面我们通过一个案例来了解一下CAS中的乐观锁,在Java中,乐观锁的一种常见实现方式是借助于java.util.concurrent.atomic包下的原子类,比如AtomicInteger。下面我们通过一个简单的银行账户转账的例子来展示如何使用CAS(Compare-And-Swap)操作实现乐观锁。

/**
 * CAS乐观锁的一个实现
 */
public class OptimismLockDemo {

    private final Logger logger = LoggerFactory.getLogger(OptimismLockDemo.class);


    @Test
    @DisplayName("测试JUC的CAS操作")
    public void testOptimismLock() {
        // 初始余额设置100
        Bank bank = new Bank(100);

        // 线程1存钱 100
        Thread t1 = new Thread(() -> {
            if (bank.updateCount(100)) {
                logger.error("存钱100成功!");
            } else {
                logger.error("取钱成功!");
            }
        }, "t1");

        // 线程2 取钱 100
        Thread t2 = new Thread(() -> {
            if (bank.updateCount(-100)) {
                logger.error("取钱100成功!");
            } else {
                logger.error("取钱失败!");
            }

        }, "t2");


        // 启动t1 和 t2 线程
        t1.start();
        t2.start();

        // 等待全部线程执行完毕
        try {
            t2.join();
            t1.join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }


    }


    static class Bank {
        // 定义余额
        private AtomicInteger balance;

        Bank(int initBalance) {
            this.balance = new AtomicInteger(initBalance);
        }


        public boolean updateCount(int money) {
            // 获取当前余额
            int currentMoney = balance.get();


            // 计算出预期的结果
            int newMoney = currentMoney + money;

            // 如果是转出且余额不足,直接返回false,否则尝试更新余额
            if (money < 0 && newMoney < 0) {
                return false;
            }
            // 尝试更新余额,这里就是CAS操作的核心
            // compareAndSet比较并交换,如果当前余额仍为currentMoney 则更新为newMoney,否则就告知更新失败
            return balance.compareAndSet(currentMoney, newMoney);
        }
    }
}

在这个例子中,我们创建了一个初始余额为100的账户,然后启动了两个线程分别执行转出100元和转入100元的操作。由于转账方法基于CAS实现,因此在并发环境下能够确保转账的原子性和正确性,避免了传统锁机制可能导致的性能瓶颈。

在这里插入图片描述

在这里插入图片描述

2.自旋锁

在实际情况中,一个成功的数据更新操作可能需要多次执行CAS(比较并交换)操作,这就是所谓的CAS自旋。通过反复尝试,直至更新成功,这样的机制无需锁定资源,实现了多线程环境下变量状态的高效协同,我们称之为“无锁同步”或“非阻塞同步”。这种方式,正是乐观锁的核心思想之一,它体现了在并发编程领域追求高性能与低冲突的“乐观”策略。

2.1.不可重入自旋锁

自旋锁(SpinLock)的基本含义就是:当一个线程在获取锁 的时候,如果锁已经被其他线程获取,那么该调用线程就一致那里循环监测锁是否已经被释放,一直到获取那个锁之后才会退出循环。

package com.hrfan.thread.lock.type.spin;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 自旋锁
 * 不可重入锁
 */
public class SpinLockDemo {


    private static final Logger log = LoggerFactory.getLogger(SpinLockDemo.class);

    @Test
    @DisplayName("测试不可重入自旋锁")
    public void testSpinLock() {
        SpinLock spinLock = new SpinLock();
        // 这里为了模拟多线程的一个并发性能 我们使用线程池来进行测试
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 3; i++) {
            executorService.submit(() -> {
                spinLock.lock();
                try {
                    // 模拟执行操作
                    log.error("抢锁成功!执行操作");
                } finally {
                    spinLock.unlock();

                }
            });
        }
        executorService.shutdown();


    }


    static class SpinLock implements Lock {

        /**
         * 当前锁的拥有者
         */
        private AtomicReference<Thread> owner = new AtomicReference<>();


        @Override
        public void lock() {
            // 抢占锁
            // 获取当前线程
            Thread thread = Thread.currentThread();
            // 开始抢占锁 (不断cas 直到owner的值为null 才更新为当前线程)
            while (!owner.compareAndSet(null, thread)) {
                log.error("抢锁失败!让出剩余CPU时间片!");
                // 如果抢锁失败(即当前锁已被其他线程持有),则让出CPU时间片给其他线程,稍后再试
                Thread.yield();
            }
        }

        @Override
        public void unlock() {
            // 通过代码 我们可以发现,SpinLock是不支持重入的,在一个线程获取锁没有释放之前,它不可能再次获得锁。
            // 释放锁
            Thread thread = Thread.currentThread();
            // 只有拥有者才能够释放锁
            if (thread == owner.get()) {
                log.error("释放锁成功!");
                // 这里设置为拥有者为空,不需要在使用compareAndSet() 因为上面已经判断
                owner.set(null);
            }
        }

        @Override
        public void lockInterruptibly() throws InterruptedException {

        }

        @Override
        public boolean tryLock() {
            return false;
        }

        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }


        @Override
        public Condition newCondition() {
            return null;
        }
    }
}

在这里插入图片描述

2.2.可重入自旋锁

为了实现可重入自旋锁,这里引入一个计数器,用来记录一个线程获取锁的次数。

package com.hrfan.thread.lock.type.spin;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 可重入自旋锁
 */
public class ReentrantSpinLockDemo {
    ReentrantSpinLock lock = new ReentrantSpinLock();

    @Test
    @DisplayName("测试可重入锁")
    public void test() throws InterruptedException {


        new Thread(() -> {
            lock.lock();
            try {
                log.error("开始执行任务! 重入次数{}", lock.getCount());
                reentrantSpinLock();
            } finally {
                lock.unlock();
            }
        }, "t1").start();
        Thread.sleep(1000);
    }

    public void reentrantSpinLock() {
        lock.lock();
        try {
            log.error("开始执行重入方法! 重入次数{}", lock.getCount());
        } finally {
            lock.unlock();
        }
    }


    private static final Logger log = LoggerFactory.getLogger(ReentrantSpinLockDemo.class);

    static class ReentrantSpinLock implements Lock {

        /**
         * 当前锁的拥有者
         * 使用拥有者的Thread作为同步状态,而不是使用一个简单的整数作为同步状态
         */
        private AtomicReference<Thread> owner = new AtomicReference<>();


        /**
         * 记录一个线程同步获取锁的状态
         */
        private int count = 0;

        @Override
        public void lock() {
            // 抢占锁
            Thread thread = Thread.currentThread();
            // 如果是重入 增加重入次数返回
            if (thread == owner.get()) {
                count++;
                return;
            }

            // 如果不是重入 那么进行自旋操作
            while (!owner.compareAndSet(null, thread)) {
                log.error("抢锁失败!让出剩余CPU时间片!");
                // 如果抢锁失败(即当前锁已被其他线程持有),则让出CPU时间片给其他线程,稍后再试
                Thread.yield();
            }
        }


        @Override
        public void unlock() {
            // 只有拥有者才能释放锁
            Thread thread = Thread.currentThread();
            if (thread == owner.get()) {
                // 如果发现count的次数不是 0减少重入次数 并返回
                if (count > 0) {
                    count--;
                } else {
                    // 直接将拥有者设置为空
                    owner.set(null);
                }
            }
        }

        @Override
        public void lockInterruptibly() throws InterruptedException {

        }

        @Override
        public boolean tryLock() {
            return false;
        }

        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }


        @Override
        public Condition newCondition() {
            return null;
        }

        public int getCount() {
            return count;
        }
    }
}

在这里插入图片描述

自旋锁的特点:线程在获取锁的时候,如果锁被其他线程持有,当前线程循环等待,直到获取锁。线程抢锁期间线程的状态不会改变,一直时运行状态,在操作系统层面线程处于用户态。

自旋锁的问题:在争用激烈的场景下,如果某个线程持有的锁的时间太长,就会导致其他自旋线程的CPU资源耗尽,另外,如果大量的线程进行空自旋,还可能导致硬件层面的总线风暴

2.3.CLH自旋锁

前面提到了CAS自旋可能引发的一些性能问题,尤其是它可能导致CPU层面的总线争用(总线风暴)。面对这一挑战,Java并发包(JUC)中的轻量级锁如何有效规避这一问题呢?其解决方案在于利用队列机制对试图获取锁的线程进行有序排列,从而大幅度减少了CAS操作的频次,从根本上减轻了对CPU和总线的压力。

具体而言,CLH锁作为一种典型的基于队列(常采用单向链表形式)实现的自旋锁机制,为这一问题提供了高效的解答。在CLH锁的模型中,任何一个请求加锁的线程首先会尝试通过CAS操作将自己的信息节点添加到队列的末端。一旦成功入队,该线程随后仅需在其直接前驱节点上执行相对简单的自旋等待操作,直至前驱释放锁资源。

这一设计精妙之处在于,CLH锁仅在节点初次尝试加入队列时涉及一次CAS操作。一旦入队完成,后续的自旋过程无需进一步的CAS介入,仅需执行标准的自旋逻辑即可。因此,在高度竞争的并发场景下,CLH锁能够显著削减CAS操作的总量,有效避开了可能引发的总线风暴现象。

在Java并发工具包(JUC)中,显式锁的实现底层依赖于AbstractQueuedSynchronizer(AQS),这一框架实质上是对CLH锁原理的一种扩展与变体应用,进一步强化了锁的管理和线程调度能力,展现了高级的并发控制策略。

上面提到的简单自旋 其实就是

普通自旋是指线程在未能立即获取锁时,不进行复杂的操作如CAS(比较并交换)而是执行简单的循环检查某个条件是否满足(比如前驱节点是否已经释放了锁)。

下面我们通过一个案例来学习一下CLH自旋锁

package com.hrfan.thread.lock.type.spin;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sound.sampled.FloatControl;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * CLH版本自旋锁
 */
public class CLHSpinLockDemo {
    private static final Logger log = LoggerFactory.getLogger(CLHSpinLockDemo.class);
    public static int count = 0;

    @Test
    @DisplayName("测试CLH自旋锁")
    public void test() {
        long startTime = System.currentTimeMillis();
        // 创建CLH自旋锁
        CLHSpinLock lock = new CLHSpinLock();
        // ReentrantLock lock = new ReentrantLock();
        // 线程数量
        int threads = 10;
        // 每次执行的次数
        int turns = 10000;
        // 通过线程池来创建线程
        ExecutorService executorService = Executors.newFixedThreadPool(threads);
        // 创建计时器
        CountDownLatch latch = new CountDownLatch(threads);
        for (int i = 0; i < threads; i++) {
            executorService.submit(() -> {
                for (int j = 0; j < turns; j++) {
                    // 创建锁
                    lock.lock();
                    try {
                        count++;
                    } finally {
                        lock.unlock();
                    }
                }
                // 更新计时器
                latch.countDown();
            });
        }
        // 等待全部线程执行完毕
        try {
            latch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        long endTime = System.currentTimeMillis();
        log.error("-------------线程执行结束,最终结果为:{}", count);
        log.error("耗时:{}", endTime - startTime);
    }

    static class Node {
        // 当前线程正在抢占锁,或者已经占有锁 true
        // 当前线程已经释放锁 下一个线程可以占有锁了 false
        volatile boolean locked;


        // 前驱节点 需要监听其lock字段
        Node preNode;


        public Node(boolean locked, Node preNode) {
            this.locked = locked;
            this.preNode = preNode;

        }

        // 空节点
        public static final Node EMPTY = new Node(false, null);


        public boolean isLocked() {
            return locked;
        }

        public void setLocked(boolean locked) {
            this.locked = locked;
        }

        public Node getPreNode() {
            return preNode;
        }

        public void setPreNode(Node preNode) {
            this.preNode = preNode;
        }
    }


    static class CLHSpinLock implements Lock {
        // 创建当前节点的本地变量
        private static ThreadLocal<Node> currentNodeLocal = new ThreadLocal<Node>();

        // CLH队列的尾指针,使用AtomicReference,方法CAS操作
        AtomicReference<Node> tail = new AtomicReference<>(null);

        public CLHSpinLock() {
            // 设置尾部节点
            tail.getAndSet(Node.EMPTY);
        }

        @Override
        public void lock() {
            // 加锁操作
            // 将节点添加到等待队列的尾部
            Node curNode = new Node(true, null);
            Node preNode = tail.get();
            // 通过CAS自旋 将当前节点插入到队列的尾部
            while (!tail.compareAndSet(preNode, curNode)) {
                preNode = tail.get();
            }
            // 设置前驱节点
            curNode.setPreNode(preNode);

            // 自旋监听前驱节点的locked变量,直到其值为false,如果前驱节点为true说明上一个线程还没释放锁
            while (curNode.getPreNode().isLocked()) {
                // 抢锁失败 让出cpu时间片
                Thread.yield();
            }

            // 到这里说明已经抢到了锁
            // log.error("已经抢锁成功!");
            // 将当前节点缓存到线程本地变量中 释放锁的时候需要使用
            currentNodeLocal.set(curNode);

        }

        @Override
        public void lockInterruptibly() throws InterruptedException {

        }

        @Override
        public boolean tryLock() {
            return false;
        }

        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }

        @Override
        public void unlock() {

            // 释放锁
            // 获取当前线程的threadLocal
            Node node = currentNodeLocal.get();
            // 将locked标志改为false
            node.setLocked(false);
            // 将前驱节点设置为null 断开引用方便垃圾回收
            node.setPreNode(null);
            // 释放当前缓存中的线程信息
            currentNodeLocal.set(null);
        }

        @Override
        public Condition newCondition() {
            return null;
        }
    }
}

在这里插入图片描述

在这里插入图片描述