lucky
发布于 2023-12-25 / 35 阅读
0

Java AQS学习笔记

并发锁笔记

并发编程中的锁是什么?

锁是并发编程中一种常用的同步机制,它可以保证多个线程在访问共享资源时不会发生冲突。

Java 内置的锁是如何实现的?
最常用的最简单的锁要数 ReentrantLock,使用它加锁时如果没有立即加成功,就会阻塞当前的线程等待其它线程释放锁之后再重新尝试加锁。

  • 那线程是如何实现阻塞自己的?
  • 其它线程释放锁之后又是如果唤醒当前线程的?
  • 当前线程是如何得出自己没有加锁成功这一结论的?

思考实现一种简单的锁 ReentrantLock:即满足互斥条件就可,不考虑可重入性。

锁的状态怎么表示?
可以设置个标识例如state。state = 0 标识空闲,state = 1 标识被占用

1703675271982.jpg

线程加锁、释放锁意味着什么?

完成state的值由0->1转换的即锁成功

完成state的值由1->0转换的即释放锁成功

1703675132721.jpg

当多个线程争用同一把锁时,如何管理这些线程?
一个有效的方法是通过队列排队FIFO,按序获取锁。

下面思考一种简单的锁。

思路一:

1.只有第一个线程节点有权占用锁
2.将等待获取锁的线程加入队列尾
3.每个线程轮询检查自己是不是第一个节点前面,如果是,则获取锁,否则继续检查

这种方法简单,但性能差。因为线程在等待锁的过程中,一直在轮询,CPU不能做其它的事情,CPU利用率低。

思路二:
改进思路一中第3步,线程轮询检查时考虑随机sleep,让出CPU

1.只有第一个线程节点有权占用锁
2.将等待获取锁的线程加入队列尾
3.线程轮询检查自己是不是第一个节点前面。如果是,则获取锁,否则随机sleep一段时间,然后继续检查

sleep 时间太短,会导致线程频繁切换,效率低。sleep 时间太长,会导致线程一直在等待,CPU利用率低。

思路三:
改进思路一中第3步,既然sleep时间不确定。可以考虑直接让线程阻塞,让出CPU

1.只有第一个线程节点有权占用锁
2.将等待获取锁的线程加入队列尾
3.线程轮询检查自己是不是第一个节点前面。如果是,则获取锁,否则线程阻塞,等待系统唤醒

线程直接阻塞,那什么时间可以被唤醒呢?

等待系统唤醒?
1:唤醒的时间无法确定
2:唤醒的是哪一个线程也不确定。如果不是第一个节点,被唤醒的线程还得继续阻塞。导致无用的上下文切换

思路四:
改进思路一中第3步,既然系统自然唤醒不靠谱,那就加一个线程用于维护队列(名称:队列管理线程),负责唤醒唤醒第一个节点。这样就不用每个等待线程都轮询检查了,只需要唤醒第一个节点即可。

1.只有第一个线程节点有权占用锁
2.将等待获取锁的线程加入队列尾,阻塞当前进程
3.队列管理线程循环执行:唤醒第一个节点,分配锁,并记录持有线程锁的线程,例如ownerThread
4.当工作线程释放锁时,将自己从队列中移除,清除ownerThread
5.队列管理线程重复第3步

缺点:队列管理线程一直在循环,空耗cpu好像又回到老路了

思路五:
仔细想想思路四中,队列管理线程好像也没处理什么东西,只是唤醒第一个节点,分配锁。这件事可以交给工作线程可以处理吗?

1.只有第一个线程节点有权占用锁
2.将等待获取锁的线程加入队列尾
    2.1 如果当前是第一个节点,则获取锁,并记录持有线程锁的线程,例如ownerThread
    2.2 如果当前不是第一个节点,则线程阻塞,等待唤醒
3.当持有锁的线程释放锁时,
 3.1将自己从队列中移除(此时线程节点必定是第一个节点),清除ownerThread
 3.2唤醒等待队列中的下一个节点

##现在我们来看看Java中是如何管理锁等待队列的

线程阻塞原语

Java 的线程阻塞和唤醒是通过 Unsafe 类的 park 和 unpark 方法做到的。

public class Unsafe {
  ...
  public native void park(boolean isAbsolute, long time);
  public native void unpark(Thread t);
  ...
}

这两个方法都是 native 方法,它们本身是由 C 语言来实现的核心功能。park 的意思是停车,让当前运行的线程 Thread.currentThread() 休眠,unpark 的意思是解除停车,唤醒指定线程。这两个方法在底层是使用操作系统提供的信号量机制来实现的。具体实现过程要深究 C 代码,这里暂时不去具体分析。park 方法的两个参数用来控制休眠多长时间,第一个参数 isAbsolute 表示第二个参数是绝对时间还是相对时间,单位是毫秒。

线程从启动开始就会一直跑,除了操作系统的任务调度策略外,它只有在调用 park 的时候才会暂停运行。锁可以暂停线程的奥秘所在正是因为锁在底层调用了 park 方法。

parkBlocker

线程对象 Thread 里面有一个重要的属性 parkBlocker,它保存当前线程因为什么而 park。等待锁AQS、等待异步任务完成FutureTask、SynchronousQueue同步队列等待数据等等

class Thread {
  ...
  volatile Object parkBlocker;//表明当前线程为何挂起,如等待锁AQS,异步任务FutureTask等
  ...
}

当线程被 unpark 唤醒后,这个属性会被置为 null。Unsafe.park 和 unpark 并不会帮我们设置 parkBlocker 属性,负责管理这个属性的工具类是 LockSupport,它对 Unsafe 这两个方法进行了简单的包装。

class LockSupport {
  ...
  public static void park(Object blocker) {
     Thread t = Thread.currentThread();
     setBlocker(t, blocker);
     UNSAFE.park(false, 0L);
     setBlocker(t, null); // 线程被唤醒来后置null
  }

  public static void unpark(Thread thread) {
     if (thread != null)
        UNSAFE.unpark(thread);
     }
  ...
}

Java 的锁数据结构正是通过调用 LockSupport 来实现休眠与唤醒的。
对象等待锁时,线程对象里面的 parkBlocker 字段的值就是AQS【排队管理器】

排队管理器

AbstractQueuedSynchronizer 类是一个抽象类,它是所有的锁队列管理器的父类,JDK 中的各种形式的锁其内部的队列管理器都继承了这个类,它是 Java 并发世界的核心基石。比如 ReentrantLock、ReadWriteLock、CountDownLatch、Semaphone、ThreadPoolExecutor 内部的队列管理器都是它的子类。这个抽象类暴露了一些抽象方法,每一种锁都需要对这个管理器进行定制。而 JDK 内置的所有并发数据结构都是在这些锁的保护下完成的,它是JDK 多线程高楼大厦的地基。

abstract class AbstractOwnableSynchronizer {
    ...
    private Thread exclusiveOwnerThread;//锁持有者
    ...
}


class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {

    static final class Node {

        static final Node SHARED = new Node();//节点类型共享
        static final Node EXCLUSIVE = null;//节点类型独占

        Node prev;
        Node next;
        Thread thread; // 每个节点一个线程
        Node nextWaiter; // 请求的是共享锁还是独占锁
        int waitStatus; // 精细状态描述字
        ...
    }


    volatile Node head;  // 队头线程将优先获得锁
    volatile Node tail;  // 抢锁失败的线程追加到队尾
    volatile int state;  //锁计数,
    // 资源数量,不同情景又不同的涵义,近似又有区别大致如下:
    // 1. 独占锁:state为0锁空闲,1锁占用
    // 2. 信号量:state为资源数量
    // 3. 栅栏:state为栅栏数量
    // 4. 可重入锁:state为锁计数
  


    protected final int getState() {
        return state;
    }

    protected final void setState(int newState) {
        state = newState;
    }

    /* CAS */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }


    /**
     * 循环,节点入队。返回待入队节点的前一个节点
    **/
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }


    /***
     * 优先直接插入队尾,失败则尝试入队操作
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     ***/
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    //子类实现
    protected boolean tryAcquire(int acquire) {
        throw new UnsupportedOperationException();
    }


    /***
    * 循环执行,直到成功获取资源,返回线程是否被中断
    * 
    **/
    final boolean acquireQueued(final Node node, int acquire) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //检查节点前置节点是否是头节点,若是则尝试获取资源
                final Node p = node.predecessor();
                if (p == head && tryAcquire(acquire)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //检查是否需要阻塞等待,避免CPU空耗,内部调用LockSupport.park将当前线程休眠
                //判断条件为是否有前置唤醒节点
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    //Thread.interrupted() 方法会清除中断标记
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

    /**
    *  1.尝试获取资源,成功则返回
    *  2.addWaiter将当前线程加入等待队列
    *  3.acquireQueued 内部循环执行直到成功获取资源并返回线程是否被中断
    *  4.如何线程被中断,则设置线程中断标志
    **/
    public final void acquire(int acquire) {
        if (!tryAcquire(acquire) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), acquire))
            selfInterrupt();
    }


    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //内部尝试唤醒等待队列中的一个节点,
                //AKA:存在一个线程被唤醒,接着尝试获取资源,即acquireQueued内循环继续执行 
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    //子类实现
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    ...
}


锁管理器维护的只是一个普通的双向列表形式的队列,这个数据结构很简单,但是仔细维护起来却相当复杂,因为它需要精细考虑多线程并发问题,每一行代码都写的无比小心。

JDK 锁管理器的实现者是 Douglas S. Lea,Java 并发包几乎全是他单枪匹马写出来的,在算法的世界里越是精巧的东西越是适合一个人来做。

Douglas S. Lea是纽约州立大学奥斯威戈分校计算机科学教授和现任计算机科学系主任,专门研究并发编程和并发数据结构的设计。他是Java Community Process的执行委员会成员,主持JSR 166,它为Java编程语言添加了并发实用程序。

线程被唤醒的可能因素

  • 其它线程 unpark 了当前线程
  • 时间到了自然醒(park 有时间参数)
  • 其它线程 interrupt 了当前线程
  • 其它未知原因导致的「假醒」

文档中没有明确说明何种未知原因会导致假醒,它倒是说明了当 park 方法返回时并不意味着锁自由了,醒过来的线程在重新尝试获取锁失败后将会再次 park 自己。所以加锁的过程需要写在一个循环里,在成功拿到锁之前可能会进行多次尝试。

ReentrantLock

public class ReentrantLock implements Lock {

    abstract static class Sync extends AbstractQueuedSynchronizer {
    
        abstract void lock();

    
         /**
         * 如果当前锁空闲,尝试占用锁,同时设置锁的持有者为当前线程
         * 如果当前锁非空闲,判断锁持有者是否为当前线程,如果是,则增加重入计数,否则返回失败
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        /**
         * 释放资源,只有status为0时才能释放锁
         **/
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        ...

    }

    static final class NonfairSync extends Sync {
    
        /**
         * 优先尝试抢占锁,失败则排队
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    private final Sync sync;


    public ReentrantLock() {
        this.sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        this.sync = fair ? new FairSync() : new NonfairSync();
    }

    ...
}


条件变量

关于条件变量,需要提出的第一个问题是为什么需要条件变量,只有锁还不够么?考虑下面的伪代码,当某个条件满足时,才去干某件事

public class A {

    private final Lock locker = new ReentrantLock();

    private volatile boolean codition = false;

    void doSomething() {
        // 搞事需要加锁,判断能不能搞事也需要加锁
        locker.lock();
        while(!condition_is_true()) {  // 先看能不能搞事
            locker.unlock();  // 搞不了就歇会再看看能不能搞
            sleep(1);
            locker.lock(); // 搞事需要加锁,判断能不能搞事也需要加锁
        }
        justdoit();  // 搞事
        locker.unlock();
    }


    private boolean condition_is_true(){
        return this.codition
    }

  
    private boolean update_condition_is_true(boolean codition){
        //修改搞事条件也要获取锁
        locker.lock();
        this.codition = codition
        locker.unlock();
    }

}

当条件不满足时,就循环重试(其它线程会通过加锁来修改条件),但是需要间隔 sleep,不然 CPU 就会因为空转而飙高。这里存在一个问题,那就是 sleep 多久不好控制。间隔太久,会拖慢整体效率,甚至会错过时机(条件瞬间满足了又立即被重置了),间隔太短,又回导致 CPU 空转。有了条件变量,这个问题就可以解决了

public class A {

    private final Lock locker = new ReentrantLock();

    private final Lock cond = locker.newCondtion();

    private volatile boolean codition = false;

    void doSomethingWithCond() {
        // 搞事需要加锁,判断能不能搞事也需要加锁
        locker.lock();
        while(!condition_is_true()) {  // 先看能不能搞事
            cond.await();  // 搞不了就歇会,等待通知
            //必须循环,被唤醒的因素很多
            //例如其它线程 unpark 了当前线程,时间到了自然醒(park 有时间参数),其它线程 interrupt 了当前线程,其它未知原因导致的「假醒」等等
        }
        justdoit();  // 搞事
        locker.unlock();
    }


    private boolean condition_is_true(){
        return this.codition
    }

  
    private boolean update_condition_is_true(boolean codition){
        //修改搞事条件也要获取锁
        locker.lock();
        this.codition = codition
        cond.signal();  // 搞完了,通知其它线程
        locker.unlock();
    }

}





await() 方法会一直阻塞在 cond 条件变量上直到被另外一个线程调用了 cond.signal() 或者 cond.signalAll() 方法后才会返回。

await() 阻塞时会自动释放当前线程持有的锁,await() 被唤醒后会再次尝试持有锁(可能又需要排队),拿到锁成功之后 await() 方法才能成功返回。

ConditionObject {

    public final void await() throws InterruptedException {
       if (Thread.interrupted())
                throw new InterruptedException();
        Node node = addConditionWaiter();
        //释放锁,记录资源
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //重新获取锁,占用资源
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    ...
}

await() 方法必须立即释放锁,否则临界区状态就不能被其它线程修改,condition_is_true() 返回的结果也就不会改变。 这也是为什么条件变量必须由锁对象来创建,条件变量需要持有锁对象的引用这样才可以释放锁以及被 signal 唤醒后重新加锁。创建条件变量的锁必须是排他锁,如果是共享锁被 await() 方法释放了并不能保证临界区的状态可以被其它线程来修改,可以修改临界区状态的只能是排他锁。

阻塞在条件变量上的线程可以有多个,这些阻塞线程会被串联成一个条件等待队列。当 signalAll() 被调用时,会唤醒所有的阻塞线程,让所有的阻塞线程重新开始争抢锁。如果调用的是 signal() 只会唤醒队列头部的线程,这样可以避免「惊群问题」。

条件等待队列

当多个线程 await() 在同一个条件变量上时,会形成一个条件等待队列。同一个锁可以创建多个条件变量,就会存在多个条件等待队列。这个队列和 AQS 的队列结构很接近,只不过它不是双向队列,而是单向队列。队列中的节点和 AQS 等待队列的节点是同一个类,但是节点指针不是 prev 和 next,而是 nextWaiter。

class AQS  {
    ...

    class Node {
        static final int CONDITION = -2;
        static final int SIGNAL = -1;
        Thread thread;  // 当前等待的线程
        Node nextWaiter;  // 指向下一个条件等待节点
  
        Node prev;
        Node next;
        int waitStatus;  // waitStatus = CONDITION
    }

    public class ConditionObject implements Condition {

        public ConditionObject() { }

        private transient Node firstWaiter;

        private transient Node lastWaiter;

        private Node addConditionWaiter() {
            ...
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            ...
            return node;
        }


        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                        throw new InterruptedException();
                Node node = addConditionWaiter();
                //释放锁,记录资源
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);//阻塞自己,等待唤醒
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                //重新获取锁,占用资源
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
        }

        public final void signal() {
            //修改临界区状态的只能是排他锁
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

        private void doSignal(Node first) {
            ...
            transferForSignal(first)
            ...
        }

        final boolean transferForSignal(Node node) {
            ...
            Node p = enq(node);// 进入 AQS 等待队列
            int ws = p.waitStatus;
            //更改节点类型,并尝试唤醒线程
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
        ...

    }

    ...
}