菜鸟笔记
提升您的技术认知

一文让你彻底搞懂aqs(通俗易懂的aqs)-ag真人游戏

一、什么是aqs

  • aqs是一个用来构建锁和同步器的框架,使用aqs能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的reentrantlock,semaphore,其他的诸如reentrantreadwritelock,synchronousqueue,futuretask等等皆是基于aqs的。当然,我们自己也能利用aqs非常轻松容易地构造出符合我们自己需求的同步器。

二、前置知识

  • 学习aqs需要大家对同步锁有一定的概念。同时大家要知道locksupport的使用,可以参考我这篇文章。(locksupport从入门到深入理解)

三、aqs 的核心思想

  • aqs核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制aqs是用clh队列锁实现的,即将暂时获取不到锁的线程加入到队列中。 clh(craig,landin,and hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。aqs是将每条请求共享资源的线程封装成一个clh锁队列的一个结点(node)来实现锁的分配。 aqs使用一个int成员变量来表示同步状态,通过内置的fifo队列来完成获取资源线程的排队工作。aqs使用cas对该同步状态进行原子操作实现对其值的修改。 (图一为节点关系图)
private volatile int state;//共享变量,使用volatile修饰保证线程可见性

四、aqs 案例分析

上面讲述的原理还是太抽象了,那我我们上示例,结合案例来分析aqs 同步器的原理。以reentrantlock使用方式为例。
代码如下:
public class aqsdemo {
  
    private static int num;
    public static void main(string[] args) {
  
        reentrantlock lock = new reentrantlock();
        new thread(new runnable() {
  
            @override
            public void run() {
  
                lock.lock();
                try {
  
                        thread.sleep(1000);
                        num  = 1000;
                    system.out.println("a 线程执行了1秒,num = "  num);
                    } catch (interruptedexception e) {
  
                        e.printstacktrace();
                    }
                finally {
  
                    lock.unlock();
                }
            }
        },"a").start();
        new thread(new runnable() {
  
            @override
            public void run() {
  
                lock.lock();
                try {
  
                    thread.sleep(500);
                    num  = 500;
                    system.out.println("b 线程执行了0.5秒,num = "  num);
                } catch (interruptedexception e) {
  
                    e.printstacktrace();
                }
                finally {
  
                    lock.unlock();
                }
            }
        },"b").start();
        new thread(new runnable() {
  
            @override
            public void run() {
  
                lock.lock();
                try {
  
                    thread.sleep(100);
                    num  = 100;
                    system.out.println("c 线程执行了0.1秒,num = "  num);
                } catch (interruptedexception e) {
  
                    e.printstacktrace();
                }
                finally {
  
                    lock.unlock();
                }
            }
        },"c").start();
    }
}

执行的某一种结果! 这个代码超级简单,但是执行结果却是可能不一样,大家可以自行实验。



对比一下三种结果,大家会发现,无论什么样的结果,num最终的值总是1600,这说明我们加锁是成功的。

五、aqs 源码分析

  • 使用方法很简单,线程操纵资源类就行。主要方法有两个lock() 和unlock().我们深入代码去理解。我在源码的基础上加注释,希望大家也跟着调试源码。其实非常简单。

5.1 aqs 的数据结构

aqs 主要有三大属性分别是 head ,tail, state,其中state 表示同步状态,head为等待队列的头结点,tail 指向队列的尾节点。
    /**
     * head of the wait queue, lazily initialized.  except for
     * initialization, it is modified only via method sethead.  note:
     * if head exists, its waitstatus is guaranteed not to be
     * cancelled.
     */
    private transient volatile node head;
    /**
     * tail of the wait queue, lazily initialized.  modified only via
     * method enq to add new wait node.
     */
    private transient volatile node tail;
    /**
     * the synchronization state.
     */
    private volatile int state;
 还需要再去了解 node的数据结构,
在这里插入代码片
class node{
  
  //节点等待状态
  volatile int waitstatus;
  // 双向链表当前节点前节点
  volatile node prev;
  // 下一个节点
  volatile node next;
  // 当前节点存放的线程
  volatile thread thread;
  // condition条件等待的下一个节点
  node nextwaiter;
}

waitstatus 只有特定的几个常量,相应的值解释如下:

本次源码讲解,我们一reentranlock的非公平锁为例。我们主要关注的方法是lock(),和unlock()。

5.2 lock源码分析

首先我们看一下lock()方法源代码,直接进入非公平锁的lock方法:

final void lock() {
  
            //1、判断当前state 状态, 没有锁则当前线程抢占锁
            if (compareandsetstate(0, 1))
                // 独占锁
                setexclusiveownerthread(thread.currentthread());
            else
                // 2、锁被人占了,尝试获取锁,关键方法了
                acquire(1);
        }

进入 aqs的acquire() 方法:

  public final void acquire(int arg) {
  
        if (!tryacquire(arg) &&
            acquirequeued(addwaiter(node.exclusive), arg))
            selfinterrupt();
    }

总-分-总

  • lock方法主要由tryaquire()尝试获取锁,addwaiter(node.exclusive) 加入等待队列,acquirequeued(node,arg)等待队列尝试获取锁。示意图如下:

5.2.1 tryaquire 方法源码

  • 既然是非公平锁,那么我们一进来就想着去抢锁,不管三七二一,直接试试能不能抢到,抢不到再进队列。
  final boolean nonfairtryacquire(int acquires) {
  
            //1、获取当前线程
            final thread current = thread.currentthread();
            // 2、获取当前锁的状态,0 表示没有被线程占有,>0 表示锁被别的线程占有
            int c = getstate();
            // 3、如果锁没有被线程占有
            if (c == 0) {
  
                 // 3.1、 使用cas去获取锁,   为什么用case呢,防止在获取c之后 c的状态被修改了,保证原子性
                if (compareandsetstate(0, acquires)) {
  
                    // 3.2、设置独占锁
                    setexclusiveownerthread(current);
                    // 3.3、当前线程获取到锁后,直接发挥true
                    return true;
                }
            }
            // 4、判断当前占有锁的线程是不是自己
            else if (current == getexclusiveownerthread()) {
  
                // 4.1 可重入锁,加 1
                int nextc = c   acquires;
                if (nextc < 0) // overflow
                    throw new error("maximum lock count exceeded");
                 // 4.2 设置锁的状态
                setstate(nextc);
                return true;
            }
            return false;
        }

5.2.2 addwaiter() 方法的解析

  • private node addwaiter(node mode),当前线程没有货得锁的情况下,进入clh队列。
 private node addwaiter(node mode) {
  
 		// 1、初始化当前线程节点,虚拟节点
        node node = new node(thread.currentthread(), mode);
        // try the fast path of enq; backup to full enq on failure
        // 2、获取尾节点,初始进入节点是null
        node pred = tail;
        // 3、如果尾节点不为null,怎将当前线程节点放到队列尾部,并返回当前节点
        if (pred != null) {
  
            node.prev = pred;
            if (compareandsettail(pred, node)) {
  
                pred.next = node;
                return node;
            }
        }
        // 如果尾节点为null(其实是链表没有初始化),怎进入enq方法
        enq(node);
        return node;
    }
    
   // 这个方法可以认为是初始化链表
   private node enq(final node node) {
  
   		// 1、入队 : 为什么要用循环呢?  
        for (;;) {
  
           // 获取尾节点
            node t = tail;
           // 2、尾节点为null
            if (t == null) {
   // must initialize
               // 2.1 初始话头结点和尾节点
                if (compareandsethead(new node()))
                    tail = head;
            } 
            // 3、将当前节点加入链表尾部
            else {
  
                node.prev = t;
                if (compareandsettail(t, node)) {
  
                    t.next = node;
                    return t;
                }
            }
        }
    }

有人想明白为什么enq要用for(;;)吗? 咋一看最多只要循环2次啊! 答疑来了,这是对于单线程来说确实是这样的,但是对于多线程来说,有可能在第2部完成之后就被别的线程先执行入链表了,这时候第3步cas之后发现不成功了,怎么办?只能再一次循环去尝试加入链表,直到成功为止。

5.2.3 acquirequeued()方法详解

  • addwaiter 方法我们已经将没有获取锁的线程放在了等待链表中,但是这些线程并没有处于等待状态。acquirequeued的作用就是将线程设置为等待状态。
 final boolean acquirequeued(final node node, int arg) {
  
         // 失败标识
        boolean failed = true;
        try {
  
            // 中断标识
            boolean interrupted = false;
            for (;;) {
  
                // 获取当前节点的前一个节点
                final node p = node.predecessor();
                // 1、如果前节点是头结点,那么去尝试获取锁
                if (p == head && tryacquire(arg)) {
  
                    // 重置头结点
                    sethead(node);
                    p.next = null; // help gc
                    // 获得锁
                    failed = false;
                    // 返回false,节点获得锁,,,然后现在只有自己一个线程了这个时候就会自己唤醒自己
                    // 使用的是acquire中的selfinterrupt(); 
                    return interrupted;
                }
                // 2、如果线程没有获得锁,且节点waitstatus=0,shouldparkafterfailedacquire并将节点的waitstatus赋值为-1
                //parkandcheckinterrupt将线程park,进入等待模式,
                if (shouldparkafterfailedacquire(p, node) &&
                    parkandcheckinterrupt())
                    interrupted = true;
            }
        } finally {
  
            if (failed)
                cancelacquire(node);
        }
    }
private static boolean shouldparkafterfailedacquire(node pred, node node) {
  
        int ws = pred.waitstatus;
        if (ws == node.signal)
            /*
             * this node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
  
            /*
             * predecessor was cancelled. skip over predecessors and
             * indicate retry.
             */
            do {
  
                node.prev = pred = pred.prev;
            } while (pred.waitstatus > 0);
            pred.next = node;
        } else {
  
            /*
             * waitstatus must be 0 or propagate.  indicate that we
             * need a signal, but don't park yet.  caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareandsetwaitstatus(pred, ws, node.signal);
        }
        return false;
    }
  • 好了,这个源码的解释就结束了,大家是不是还是云里雾里,不得不承认,这个代码太优雅了。不愧大神!

我用白话给大家串起来讲一下吧! 我们以reentrantlock的非公平锁结合我们案例4来讲解。
当线程a 到lock()方法时,通过compareandsetstate(0,1)获得锁,并且获得独占锁。当b,c线程去争抢锁时,运行到acquire(1),c线程运行tryacquire(1),接着运行nonfairtryacquire(1)方法,未获取锁,最后返回false,运行addwaiter(),运行enq(node),初始化head节点,同时c进入队列;再进入acquirequeued(node,1)方法,初始化waitstatus= -1,自旋并park()进入等待。
接着b线程开始去抢锁,b线程运行tryacquire(1),运行nonfairtryacquire(1)方法,未获得锁最后返回false,运行addwaiter(),直接添加到队尾,同时b进入队列;在进入acquirequeued(node,1)方法,初始化waitstatus= -1,自旋并park()进入等待。

5.3 unlock源码分析

unlock释放锁。主要利用的是locksupport

  public final boolean release(int arg) {
  
         // 如果成功释放独占锁,
        if (tryrelease(arg)) {
  
            node h = head;
            // 如果头结点不为null,且后续有入队结点
            if (h != null && h.waitstatus != 0)
                //释放当前线程,并激活等待队里的第一个有效节点
                unparksuccessor(h);
            return true;
        }
        return false;
    }
    // 如果释放锁着返回true,否者返回false
    // 并且将sate 设置为0
 protected final boolean tryrelease(int releases) {
  
            int c = getstate() - releases;
            if (thread.currentthread() != getexclusiveownerthread())
                throw new illegalmonitorstateexception();
            boolean free = false;
            if (c == 0) {
  
                free = true;
                setexclusiveownerthread(null);
            }
            setstate(c);
            return free;
        }
  private void unparksuccessor(node node) {
  
        /*
         * if status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  it is ok if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitstatus;
        if (ws < 0)
            // 重置头结点的状态waitstatus
            compareandsetwaitstatus(node, ws, 0);
        /*
         * thread to unpark is held in successor, which is normally
         * just the next node.  but if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
         // 获取头结点的下一个节点
        node s = node.next;
        // s.waitstatus > 0 为取消状态 ,结点为空且被取消
        if (s == null || s.waitstatus > 0) {
  
            s = null;
            // 获取队列里没有cancel的最前面的节点
            for (node t = tail; t != null && t != node; t = t.prev)
                if (t.waitstatus <= 0)
                    s = t;
        }
        // 如果节点s不为null,则获得锁
        if (s != null)
            locksupport.unpark(s.thread);
    }

锁的释放这个还是很简单。

总结

这个源码的最好阅读方式是结合例子去自己一步步跟代码,把每一个步骤写在纸上,尝试一两遍你就会有非常清晰的认识。

大家多给些意见,写之前我信心满满觉得能写的让大家看懂,写完之后我觉得一坨屎。

网站地图