首页 > Java开发 > java并发锁ReentrantReadWriteLock读写锁源码分析

java并发锁ReentrantReadWriteLock读写锁源码分析

1、ReentrantReadWriterLock基础
所谓读写锁,是对访问资源共享锁和排斥锁,一般的重入性语义为 如果对资源加了写锁,其他线程无法再获得写锁与读锁,但是持有写锁的线程,可以对资源加读锁(锁降级);如果一个线程对资源加了读锁,其他线程可以继续加读锁。
java.util.concurrent.locks中关于多写锁的接口:ReadWriteLock
[java] view plain copy

  1. public interface ReadWriteLock {  
  2.     /** 
  3.      * Returns the lock used for reading. 
  4.      * 
  5.      * @return the lock used for reading. 
  6.      */  
  7.     Lock readLock();  
  8.   
  9.     /** 
  10.      * Returns the lock used for writing. 
  11.      * 
  12.      * @return the lock used for writing. 
  13.      */  
  14.     Lock writeLock();  
  15. }  
提一个问题,是否觉得ReentrantReadWriteLock会实现Lock接口吗?与ReentrantLock有什么关系?
答案是否定的,ReentrantReadWriterLock通过两个内部类实现Lock接口,分别是ReadLock,WriterLock类。与ReentrantLock一样,ReentrantReadWriterLock同样使用自己的内部类Sync(继承AbstractQueuedSynchronizer)实现CLH算法。为了方便对读写锁获取机制的了解,先介绍一下Sync内部类中几个属性,采用了位运算:
[java] view plain copy

  1. /* 
  2.          * Read vs write count extraction constants and functions. 
  3.          * Lock state is logically divided into two unsigned shorts: 
  4.          * The lower one representing the exclusive (writer) lock hold count, 
  5.          * and the upper the shared (reader) hold count. 
  6.          */  
  7.   
  8.         static final int SHARED_SHIFT   = 16;  
  9.         static final int SHARED_UNIT    = (1 << SHARED_SHIFT);  
  10.         static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;  
  11.         static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;  
  12.         /** Returns the number of shared holds represented in count  */  
  13.         static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }  
  14.         /** Returns the number of exclusive holds represented in count  */  
  15.         static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }  
首先ReentrantReadWriterLock使用一个32位的int类型来表示锁被占用的线程数(ReentrantLock中的state),用所以,采取的办法是,高16位用来表示读锁占有的线程数量,用低16位表示写锁被同一个线程申请的次数。
        SHARED_SHIFT,表示读锁占用的位数,常量16
        SHARED_UNIT,   增加一个读锁,按照上述设计,就相当于增加 SHARED_UNIT;
        MAX_COUNT    ,表示申请读锁最大的线程数量,为65535
        EXCLUSIVE_MASK  :表示计算写锁的具体值时,该值为 15个1,用 getState & EXCLUSIVE_MASK算出写锁的
                    线程数,大于1表示重入。
          static int sharedCount(int c)    { return c >>> SHARED_SHIFT; } 
          static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
举例说明,比如,现在当前,申请读锁的线程数为13个,写锁一个,那state怎么表示?
上文说过,用一个32位的int类型的高16位表示读锁线程数,13的二进制为 1101,那state的二进制表示为
00000000 00001101 00000000 00000001,十进制数为851969, 接下在具体获取锁时,需要根据这个851968这个值得出上文中的 13 与 1。要算成13,只需要将state 无符号向左移位16位置,得出00000000 00001101,就出13,根据851969要算成低16位置,只需要用该00000000 00001101 00000000 00000001 & 111111111111111(15位),就可以得出00000001,就是利用了1&1得1,1&0得0这个技巧。
移位元素,如果一个数值向左移(<)一位,在没越界(超过该类型表示的最大值)的情况下,想当于操作数 * 2
如果一个数值向右(>) 移动移位,在没有越界的情况下,想到于操作数 除以2。
        然后再关注如下几个与线程本地变量相关的属性:
        

[java] view plain copy

  1. /** 
  2.          * The number of reentrant read locks held by current thread. 
  3.          * Initialized only in constructor and readObject. 
  4.          * Removed whenever a thread's read hold count drops to 0. 
  5.          */  
  6.         private transient ThreadLocalHoldCounter readHolds;  
  7.   
  8.         /** 
  9.          * The hold count of the last thread to successfully acquire 
  10.          * readLock. This saves ThreadLocal lookup in the common case 
  11.          * where the next thread to release is the last one to 
  12.          * acquire. This is non-volatile since it is just used 
  13.          * as a heuristic, and would be great for threads to cache. 
  14.          * 
  15.          * <p>Can outlive the Thread for which it is caching the read 
  16.          * hold count, but avoids garbage retention by not retaining a 
  17.          * reference to the Thread. 
  18.          * 
  19.          * <p>Accessed via a benign data race; relies on the memory 
  20.          * model's final field and out-of-thin-air guarantees. 
  21.          */  
  22.         private transient HoldCounter cachedHoldCounter;  
  23.   
  24.         /** 
  25.          * firstReader is the first thread to have acquired the read lock. 
  26.          * firstReaderHoldCount is firstReader's hold count. 
  27.          * 
  28.          * <p>More precisely, firstReader is the unique thread that last 
  29.          * changed the shared count from 0 to 1, and has not released the 
  30.          * read lock since then; null if there is no such thread. 
  31.          * 
  32.          * <p>Cannot cause garbage retention unless the thread terminated 
  33.          * without relinquishing its read locks, since tryReleaseShared 
  34.          * sets it to null. 
  35.          * 
  36.          * <p>Accessed via a benign data race; relies on the memory 
  37.          * model's out-of-thin-air guarantees for references. 
  38.          * 
  39.          * <p>This allows tracking of read holds for uncontended read 
  40.          * locks to be very cheap. 
  41.          */  
  42.         private transient Thread firstReader = null;  
  43.         private transient int firstReaderHoldCount;  
   
        上述这4个变量,其实就是完成一件事情,将获取读锁的线程放入线程本地变量(ThreadLocal),方便从整个上 下文,根据当前线程获取持有锁的次数信息。其实 firstReader,firstReaderHoldCount ,cachedHoldCounter 这三个变量就是为readHolds变量服务的,是一个优化手段,尽量减少直接使用readHolds.get方法的次数,firstReader与firstReadHoldCount保存第一个获取读锁的线程,也就是readHolds中并不会保存第一个获取读锁的线程;cachedHoldCounter 缓存的是最后一个获取线程的HolderCount信息,该变量主要是在如果当前线程多次获取读锁时,减少从readHolds中获取HoldCounter的次数。请结合如下代码理解上述观点:
[java] view plain copy

  1. if (r == 0) {  
  2.     firstReader = current;  
  3.     firstReaderHoldCount = 1;  
  4. else if (firstReader == current) {  
  5.     firstReaderHoldCount++;  
  6. else {  
  7.     HoldCounter rh = cachedHoldCounter;  
  8.     if (rh == null || rh.tid != current.getId())  
  9.         cachedHoldCounter = rh = readHolds.get();  
  10.     else if (rh.count == 0)  
  11.         readHolds.set(rh);  
  12.     rh.count++;  
  13. }  
2、ReentrantReadWriterLock源码分析
2.1 ReadLock 源码分析
2.1.1 lock方法
[java] view plain copy

  1. /** 
  2.          * Acquires the read lock. 
  3.          * 
  4.          * <p>Acquires the read lock if the write lock is not held by 
  5.          * another thread and returns immediately. 
  6.          * 
  7.          * <p>If the write lock is held by another thread then 
  8.          * the current thread becomes disabled for thread scheduling 
  9.          * purposes and lies dormant until the read lock has been acquired. 
  10.          */  
  11.         public void lock() {  
  12.             sync.acquireShared(1);  
  13.         }  

sync.acquireShared方法存在于AbstractQueuedSynchronizer类中,

[java] view plain copy

  1. /** 
  2.      * Acquires in shared mode, ignoring interrupts.  Implemented by 
  3.      * first invoking at least once {@link #tryAcquireShared}, 
  4.      * returning on success.  Otherwise the thread is queued, possibly 
  5.      * repeatedly blocking and unblocking, invoking {@link 
  6.      * #tryAcquireShared} until success. 
  7.      * 
  8.      * @param arg the acquire argument.  This value is conveyed to 
  9.      *        {@link #tryAcquireShared} but is otherwise uninterpreted 
  10.      *        and can represent anything you like. 
  11.      */  
  12.     public final void acquireShared(int arg) {  
  13.         if (tryAcquireShared(arg) < 0)    //@1  
  14.             doAcquireShared(arg);           //@2  
  15.     }  

根据常识,具体获取锁的过程在子类中实现,果不其然,tryAcquireShared方法在ReentrantReadWriterLock的Sync类中实现

[java] view plain copy

  1. protected final int tryAcquireShared(int unused) {  
  2.             /* 
  3.              * Walkthrough: 
  4.              * 1. If write lock held by another thread, fail. 
  5.              * 2. Otherwise, this thread is eligible for 
  6.              *    lock wrt state, so ask if it should block 
  7.              *    because of queue policy. If not, try 
  8.              *    to grant by CASing state and updating count. 
  9.              *    Note that step does not check for reentrant 
  10.              *    acquires, which is postponed to full version 
  11.              *    to avoid having to check hold count in 
  12.              *    the more typical non-reentrant case. 
  13.              * 3. If step 2 fails either because thread 
  14.              *    apparently not eligible or CAS fails or count 
  15.              *    saturated, chain to version with full retry loop. 
  16.              */  
  17.             Thread current = Thread.currentThread();    //@1 start  
  18.             int c = getState();  
  19.             if (exclusiveCount(c) != 0 &&  
  20.                 getExclusiveOwnerThread() != current)  
  21.                 return -1;                                                     // @1 end  
  22.             int r = sharedCount(c);  
  23.             if (!readerShouldBlock() &&                            
  24.                 r < MAX_COUNT &&  
  25.                 compareAndSetState(c, c + SHARED_UNIT)) {    // @2  
  26.                 if (r == 0) {                                      //@21                                 
  27.                     firstReader = current;  
  28.                     firstReaderHoldCount = 1;  
  29.                 } else if (firstReader == current) {  //@22  
  30.                     firstReaderHoldCount++;  
  31.                 } else {                                            // @23  
  32.                     HoldCounter rh = cachedHoldCounter;  
  33.                     if (rh == null || rh.tid != current.getId())  
  34.                         cachedHoldCounter = rh = readHolds.get();  
  35.                     else if (rh.count == 0)  
  36.                         readHolds.set(rh);  
  37.                     rh.count++;  
  38.                 }  
  39.                 return 1;  
  40.             }  
  41.             return fullTryAcquireShared(current);      // @3  
  42.         }  
尝试获取共享锁代码解读:
@1 start--end ,如果有线程已经抢占了写锁,并且不是当前线程,则直接返回-1,通过排队获取锁。
@2,如果线程不需要阻塞,并且获取读锁的线程数没有超过最大值,并且使用 CAS更新共享锁线程数量成功的话;表示成获取读锁,然后进行内部变量的相关更新操作;先关注一下,成功获取读锁后,内部变量的更新操作:
@21,如果r=0, 表示,当前线程为第一个获取读锁的线程
@22,如果第一个获取读锁的对象为当前对象,将firstReaderHoldCount 加一
@23,成功获取锁后,如果不是第一个获取多锁的线程,将该线程持有锁的次数信息,放入线程本地变量中,方便在整个请求上下文(请求锁、释放锁等过程中)使用持有锁次数信息。
@3 在讲解代码@3之前,我们先重点分析@2处的第一个条件,是否需要阻塞方法:readerShouldBlock,在具体的子类中,现在查看的是NonfairSync中的方法:
[java] view plain copy

  1. final boolean readerShouldBlock() {  
  2.             /* As a heuristic to avoid indefinite writer starvation, 
  3.              * block if the thread that momentarily appears to be head 
  4.              * of queue, if one exists, is a waiting writer.  This is 
  5.              * only a probabilistic effect since a new reader will not 
  6.              * block if there is a waiting writer behind other enabled 
  7.              * readers that have not yet drained from the queue. 
  8.              */  
  9.             return apparentlyFirstQueuedIsExclusive();   //该方法,具体又是在 AbstractQueuedSynchronizer中  
  10.         }  
  11. /** 
  12.      * Returns {@code true} if the apparent first queued thread, if one 
  13.      * exists, is waiting in exclusive mode.  If this method returns 
  14.      * {@code true}, and the current thread is attempting to acquire in 
  15.      * shared mode (that is, this method is invoked from {@link 
  16.      * #tryAcquireShared}) then it is guaranteed that the current thread 
  17.      * is not the first queued thread.  Used only as a heuristic in 
  18.      * ReentrantReadWriteLock. 
  19.      */  
  20.     final boolean apparentlyFirstQueuedIsExclusive() {  
  21.         Node h, s;  
  22.         return (h = head) != null &&  
  23.             (s = h.next)  != null &&  
  24.             !s.isShared()         &&  
  25.             s.thread != null;  
  26.     }  
    该方法如果头节点不为空,并头节点的下一个节点不为空,并且不是共享模式【独占模式,写锁】、并且线程不为空。则返回true,说明有当前申请读锁的线程占有写锁,并有其他写锁在申请。为什么要判断head节点的下一个节点不为空,或是thread不为空呢?因为第一个节点head节点是当前持有写锁的线程,也就是当前申请读锁的线程,这里,也就是锁降级的关键所在,如果占有的写锁不是当前线程,那线程申请读锁会直接失败。
现在继续回到@3,讲解如果第一次尝试获取读锁失败后,该如何处理。首先,进入该方法的条件如下:
1),没有写锁被占用时,尝试通过一次CAS去获取锁时,更新失败(说明有其他读锁在申请)。
2),当前线程占有写锁,并且没有有其他写锁在当前线程的下一个节点等待获取写锁。;其实如果是这种情况,除非当前线程占有锁的下个线程取消,否则进入fullTryAcquireShared方法也无法获取锁。
[java] view plain copy

  1. /** 
  2.          * Full version of acquire for reads, that handles CAS misses 
  3.          * and reentrant reads not dealt with in tryAcquireShared. 
  4.          */  
  5.         final int fullTryAcquireShared(Thread current) {  
  6.             /* 
  7.              * This code is in part redundant with that in 
  8.              * tryAcquireShared but is simpler overall by not 
  9.              * complicating tryAcquireShared with interactions between 
  10.              * retries and lazily reading hold counts. 
  11.              */  
  12.             HoldCounter rh = null;  
  13.             for (;;) {  
  14.                 int c = getState();  
  15.                 if (exclusiveCount(c) != 0) {                                     //@31  
  16.                     if (getExclusiveOwnerThread() != current)  
  17.                         return -1;  
  18.                     // else we hold the exclusive lock; blocking here  
  19.                     // would cause deadlock.  
  20.                 } else if (readerShouldBlock()) {                             //@32  
  21.                     // Make sure we're not acquiring read lock reentrantly  
  22.                     if (firstReader == current) {                              //@33  
  23.                         // assert firstReaderHoldCount > 0;  
  24.                     } else {                                                              //@34  
  25.                         if (rh == null) {  
  26.                             rh = cachedHoldCounter;  
  27.                             if (rh == null || rh.tid != current.getId()) {  
  28.                                 rh = readHolds.get();  
  29.                                 if (rh.count == 0)  
  30.                                     readHolds.remove();  
  31.                             }  
  32.                         }  
  33.                         if (rh.count == 0)  
  34.                             return -1;  
  35.                     }  
  36.                 }  
  37.                 if (sharedCount(c) == MAX_COUNT)                             
  38.                     throw new Error("Maximum lock count exceeded");  
  39.                 if (compareAndSetState(c, c + SHARED_UNIT)) {     // @35  
  40.                     if (sharedCount(c) == 0) {  
  41.                         firstReader = current;  
  42.                         firstReaderHoldCount = 1;  
  43.                     } else if (firstReader == current) {  
  44.                         firstReaderHoldCount++;  
  45.                     } else {  
  46.                         if (rh == null)  
  47.                             rh = cachedHoldCounter;  
  48.                         if (rh == null || rh.tid != current.getId())  
  49.                             rh = readHolds.get();  
  50.                         else if (rh.count == 0)  
  51.                             readHolds.set(rh);  
  52.                         rh.count++;  
  53.                         cachedHoldCounter = rh; // cache for release  
  54.                     }  
  55.                     return 1;  
  56.                 }  
  57.             }  
  58.         }  
代码@31,首先再次判断,如果当前线程不是写锁的持有者,直接返回-1,结束尝试获取读锁,需要排队去申请读锁。
代码@32,如果需要阻塞,说明除了当前线程持有写锁外,还有其他线程已经排队在申请写锁,故,即使申请读锁的线程已经持有写锁(写锁内部再次申请读锁,俗称锁降级)还是会失败,因为有其他线程也在申请写锁,此时,只能结束本次申请读锁的请求,转而去排队,否则,将造成死锁。代码@34,就是从readHolds中移除当前线程的持有数,然后返回-1,结束尝试获取锁步骤(结束tryAcquireShared 方法)然后去排队获取。
代码@33,因为,如果当前线程是第一个获取了写锁,那其他线程无法申请写锁(该部分在分析完,读写锁的队列机制后,才回来做更详细的解答。)
代码@35,表示成功获取读锁,后续就是更新readHolds等内部变量,该部分在上文中已有讲解。如果是通过@35尝试获取锁成功,这就是写锁内部--》再次申请读锁(锁降级)的原理。
至此,完成尝试获取锁步骤 tryAcquireShared 方法,我们再次回到 acquireShared,如果返回-1,那么需要排队申请,具体请看 doAcquireShared(arg);
[java]

  1.    public final void acquireShared(int arg) {  
  2.         if (tryAcquireShared(arg) < 0)    //@1  
  3.             doAcquireShared(arg);           //@2  
  4.     }  
  5. /** 
  6.      * Acquires in shared uninterruptible mode. 
  7.      * @param arg the acquire argument 
  8.      */  
  9.     private void doAcquireShared(int arg) {  
  10.         final Node node = addWaiter(Node.SHARED);   //@1  
  11.         boolean failed = true;  
  12.         try {  
  13.             boolean interrupted = false;  
  14.             for (;;) { // @2,开始自旋重试  
  15.                 final Node p = node.predecessor();   // @3  
  16.                 if (p == head) {                                   // @4  
  17.                     int r = tryAcquireShared(arg);           
  18.                     if (r >= 0) {  
  19.                         setHeadAndPropagate(node, r);    //@5  
  20.                         p.next = null// help GC  
  21.                         if (interrupted)  
  22.                             selfInterrupt();  
  23.                         failed = false;  
  24.                         return;  
  25.                     }  
  26.                 }  
  27.                 if (shouldParkAfterFailedAcquire(p, node) &&  
  28.                     parkAndCheckInterrupt())                                              // @6  
  29.                     interrupted = true;  
  30.             }  
  31.         } finally {  
  32.             if (failed)  
  33.                 cancelAcquire(node);  
  34.         }  
  35.     }  
获取共享锁解读:
代码@1,在队列尾部增加一个节点。锁模式为共享模式。
代码@3,获取该节点的前置节点。
代码@4,如果该节点的前置节点为head(头部),为什么前置节点是head时,可以再次尝试呢?在讲解ReentrantLock时,也讲过,head节点的初始化在第一次产生锁争用时初始化,刚开始初始化的head节点是不代表线程的,故可以尝试获取锁。如果获取失败,则将进入到shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法中,线程阻塞,等待被唤醒。
重点分析一下获取锁后的操作:setHeadAndPropagate
[java]

  1. /** 
  2.      * Sets head of queue, and checks if successor may be waiting 
  3.      * in shared mode, if so propagating if either propagate > 0 or 
  4.      * PROPAGATE status was set. 
  5.      * 
  6.      * @param node the node 
  7.      * @param propagate the return value from a tryAcquireShared 
  8.      */  
  9.     private void setHeadAndPropagate(Node node, int propagate) {  
  10.         Node h = head; // Record old head for check below   
  11.         setHead(node);  
  12.         /* 
  13.          * Try to signal next queued node if: 
  14.          *   Propagation was indicated by caller, 
  15.          *     or was recorded (as h.waitStatus) by a previous operation 
  16.          *     (note: this uses sign-check of waitStatus because 
  17.          *      PROPAGATE status may transition to SIGNAL.) 
  18.          * and 
  19.          *   The next node is waiting in shared mode, 
  20.          *     or we don't know, because it appears null 
  21.          * 
  22.          * The conservatism in both of these checks may cause 
  23.          * unnecessary wake-ups, but only when there are multiple 
  24.          * racing acquires/releases, so most need signals now or soon 
  25.          * anyway. 
  26.          */  
  27.         if (propagate > 0 || h == null || h.waitStatus < 0) {   // @1  
  28.             Node s = node.next;  
  29.             if (s == null || s.isShared())    // @2  
  30.                 doReleaseShared();          //@3  
  31.         }  
  32.     }  
  33.   
  34. /** 
  35.      * Sets head of queue to be node, thus dequeuing. Called only by 
  36.      * acquire methods.  Also nulls out unused fields for sake of GC 
  37.      * and to suppress unnecessary signals and traversals. 
  38.      * 
  39.      * @param node the node 
  40.      */  
  41.     private void setHead(Node node) {  
  42.         head = node;  
  43.         node.thread = null;  
  44.         node.prev = null;  
  45.     }  
  46.   
  47. /** 
  48.      * Release action for shared mode -- signal successor and ensure 
  49.      * propagation. (Note: For exclusive mode, release just amounts 
  50.      * to calling unparkSuccessor of head if it needs signal.) 
  51.      */  
  52.     private void doReleaseShared() {  
  53.         /* 
  54.          * Ensure that a release propagates, even if there are other 
  55.          * in-progress acquires/releases.  This proceeds in the usual 
  56.          * way of trying to unparkSuccessor of head if it needs 
  57.          * signal. But if it does not, status is set to PROPAGATE to 
  58.          * ensure that upon release, propagation continues. 
  59.          * Additionally, we must loop in case a new node is added 
  60.          * while we are doing this. Also, unlike other uses of 
  61.          * unparkSuccessor, we need to know if CAS to reset status 
  62.          * fails, if so rechecking. 
  63.          */  
  64.         for (;;) {  
  65.             Node h = head;  
  66.             if (h != null && h != tail) {   //@4  
  67.                 int ws = h.waitStatus;  
  68.                 if (ws == Node.SIGNAL) {   //@5  
  69.                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))  
  70.                         continue;            // loop to recheck cases  
  71.                     unparkSuccessor(h);  
  72.                 }  
  73.                 else if (ws == 0 &&  
  74.                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))   //@6  
  75.                     continue;                // loop on failed CAS  
  76.             }  
  77.             if (h == head)                   // loop if head changed      //@7  
  78.                 break;  
  79.         }  
  80.     }  
释放共享锁的步骤:
代码@1,如果读锁(共享锁)获取成功,或头部节点为空,或头节点取消,或刚获取读锁的线程的下一个节点为空,或在节点的下个节点也在申请读锁,则在CLH队列中传播下去唤醒线程,怎么理解这个传播呢,就是只要获取成功到读锁,那就要传播到下一个节点(如果一下个节点继续是读锁的申请,只要成功获取,就再下一个节点,直到队列尾部或为写锁的申请,停止传播)。具体请看doReleaseShared方法。
代码@4,从队列的头部开始遍历每一个节点。
代码@5,如果节点状态为 Node.SIGNAL,将状态设置为0,设置成功,唤醒线程。为什么会设置不成功,可能改节点被取消;还有一种情况就是有多个线程在运行该代码段,这就是PROPAGATE的含义吧,传播,请看代码@7的理解。
代码@6,如果状态为0,则设置为Node.PROPAGATE,设置为传播,该值然后会在什么时候变化呢?在判断该节点的下一个节点是否需要阻塞时,会判断,如果状态不是Node.SIGNAL或取消状态,为了保险起见,会将前置节点状态设置为Node.SIGNAL,然后再次判断,是否需要阻塞。
代码@7,如果处理过一次 unparkSuccessor 方法后,头节点没有发生变化,就退出该方法,那head在什么时候会改变呢?当然在是抢占锁成功的时候,head节点代表获取锁的节点。一旦获取锁成功,则又会进入setHeadAndPropagate方法,当然又会触发doReleaseShared方法,传播特性应该就是表现在这里吧。再想一下,同一时间,可以有多个多线程占有锁,那在锁释放时,写锁的释放比较简单,就是从头部节点下的第一个非取消节点,唤醒线程即可,为了在释放读锁的上下文环境中获取代表读锁的线程,将信息存入在 readHolds ThreadLocal变量中。
到这里为止,读锁的申请就讲解完毕了,先给出如下流程图:

                                                                        尝试获取读锁过程
从队列中获取读锁的流程如下:

2.1.2 ReadLock 的 unlock方法详解

[java]

  1. public  void unlock() {  
  2.         sync.releaseShared(1);  
  3. }  
  4. //AbstractQueuedSynchronizer的  realseShared方法  
  5. public final boolean releaseShared(int arg) {  
  6.         if (tryReleaseShared(arg)) {  
  7.             doReleaseShared();  
  8.             return true;  
  9.         }  
  10.         return false;  
  11.     }  
  12.   
  13. // ReentrantReadWriterLock.Sync tryReleaseShared  
  14. protected final boolean tryReleaseShared(int unused) {   
  15.             Thread current = Thread.currentThread();  
  16.             if (firstReader == current) {                               // @1 start                 
  17.                 // assert firstReaderHoldCount > 0;  
  18.                 if (firstReaderHoldCount == 1)  
  19.                     firstReader = null;  
  20.                 else  
  21.                     firstReaderHoldCount--;  
  22.             } else {  
  23.                 HoldCounter rh = cachedHoldCounter;  
  24.                 if (rh == null || rh.tid != current.getId())  
  25.                     rh = readHolds.get();  
  26.                 int count = rh.count;  
  27.                 if (count <= 1) {  
  28.                     readHolds.remove();  
  29.                     if (count <= 0)  
  30.                         throw unmatchedUnlockException();  
  31.                 }  
  32.                 --rh.count;                                                            // @1 end  
  33.             }  
  34.             for (;;) {                                                               // @2  
  35.                 int c = getState();  
  36.                 int nextc = c - SHARED_UNIT;  
  37.                 if (compareAndSetState(c, nextc))  
  38.                     // Releasing the read lock has no effect on readers,  
  39.                     // but it may allow waiting writers to proceed if  
  40.                     // both read and write locks are now free.  
  41.                     return nextc == 0;  
  42.             }  
  43.         }  
  44.   
  45. AbstractQueuedSynchronizer的doReleaseShared  
  46. /** 
  47.      * Release action for shared mode -- signal successor and ensure 
  48.      * propagation. (Note: For exclusive mode, release just amounts 
  49.      * to calling unparkSuccessor of head if it needs signal.) 
  50.      */  
  51.     private void doReleaseShared() {  
  52.         /* 
  53.          * Ensure that a release propagates, even if there are other 
  54.          * in-progress acquires/releases.  This proceeds in the usual 
  55.          * way of trying to unparkSuccessor of head if it needs 
  56.          * signal. But if it does not, status is set to PROPAGATE to 
  57.          * ensure that upon release, propagation continues. 
  58.          * Additionally, we must loop in case a new node is added 
  59.          * while we are doing this. Also, unlike other uses of 
  60.          * unparkSuccessor, we need to know if CAS to reset status 
  61.          * fails, if so rechecking. 
  62.          */  
  63.         for (;;) {  
  64.             Node h = head;  
  65.             if (h != null && h != tail) {  
  66.                 int ws = h.waitStatus;  
  67.                 if (ws == Node.SIGNAL) {  
  68.                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))  
  69.                         continue;            // loop to recheck cases  
  70.                     unparkSuccessor(h);  
  71.                 }  
  72.                 else if (ws == 0 &&  
  73.                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))  
  74.                     continue;                // loop on failed CAS  
  75.             }  
  76.             if (h == head)                   // loop if head changed  
  77.                 break;  
  78.         }  
  79.     }  
锁的释放,比较简单,代码@1,主要是将当前线程所持有的锁的数量信息得到(从firstReader或cachedHoldCounter,或readHolds中获取 ),然后将数量减少1,如果持有数为1,则直接将该线程变量从readHolds ThreadLocal变量中移除,避免垃圾堆积。
代码@2,就是在无限循环中将共享锁的数量减少一,在释放锁阶段,只有当所有的读锁,写锁被占有,才会去执行doReleaseShared方法。
2.2 WriterLock 源码分析
2.2.1 lock方法详解
[java]

  1. public void lock() {  
  2.             sync.acquire(1);  
  3.         }  
  4. public final void acquire(int arg) {  
  5.         if (!tryAcquire(arg) &&  
  6.             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  
  7.             selfInterrupt();  
  8. }  
  9. 对上述代码是不是似曾相识,对了,在学习ReentrantLock时候,看到的一样,acquire是在AbstractQueuedSynchronizer中,关键是在 tryAcquire方法,是在不同的子类中实现的。那我们将目光移到ReentrantReadWriterLock.Sync中  
  10. protected final boolean tryAcquire(int acquires) {  
  11.             /* 
  12.              * Walkthrough: 
  13.              * 1. If read count nonzero or write count nonzero 
  14.              *    and owner is a different thread, fail. 
  15.              * 2. If count would saturate, fail. (This can only 
  16.              *    happen if count is already nonzero.) 
  17.              * 3. Otherwise, this thread is eligible for lock if 
  18.              *    it is either a reentrant acquire or 
  19.              *    queue policy allows it. If so, update state 
  20.              *    and set owner. 
  21.              */  
  22.             Thread current = Thread.currentThread();  
  23.             int c = getState();  
  24.             int w = exclusiveCount(c);  
  25.             if (c != 0) {                                   // @1  
  26.                 // (Note: if c != 0 and w == 0 then shared count != 0)  
  27.                 if (w == 0 || current != getExclusiveOwnerThread())                //@2  
  28.                     return false;  
  29.                 if (w + exclusiveCount(acquires) > MAX_COUNT)                
  30.                     throw new Error("Maximum lock count exceeded");  
  31.                 // Reentrant acquire  
  32.                 setState(c + acquires);                                                             //@3  
  33.                 return true;  
  34.             }  
  35.             if (writerShouldBlock() ||                                                                 
  36.                 !compareAndSetState(c, c + acquires))                                   //@4  
  37.                 return false;  
  38.             setExclusiveOwnerThread(current);                                             //@5  
  39.             return true;  
  40.         }  
代码@1,如果锁的state不为0,说明有写锁,或读锁,或两种锁持有
代码@2,如果写锁为0,再加上c!=0,说明此时有读锁,自然返回false,表示只能排队去获取写锁
               如果写锁不为0,如果持有写锁的线程不为当前线程,自然返回false,排队去获取写锁。
代码@3,表示,当前线程持有写锁,现在是重入,所以只需要修改锁的额数量即可。
代码@4,表示,表示通过一次CAS去获取锁的时候失败,说明被别的线程抢去了,也返回false,排队去重试获取锁。
代码@5,成获取写锁后,将当前线程设置为占有写锁的线程。尝试获取锁方法结束。如果该方法返回false,则进入到acquireQueue方法去排队获取写锁,写锁的获取过程,与ReentrantLock获取方法一样,就不过多的解读了。
    读写锁的实现原理就分析到这了,走过路过的朋友,欢迎拍砖讨论。

本文固定链接: http://www.devba.com/index.php/archives/6550.html | 开发吧

报歉!评论已关闭.