国产成人精品久久免费动漫-国产成人精品天堂-国产成人精品区在线观看-国产成人精品日本-a级毛片无码免费真人-a级毛片毛片免费观看久潮喷

您的位置:首頁(yè)技術(shù)文章
文章詳情頁(yè)

Java多線程之ReentrantReadWriteLock源碼解析

瀏覽:71日期:2022-08-13 11:18:10
一、介紹1.1 ReentrantReadWriteLock

ReentrantReadWriteLock 是一個(gè)讀寫鎖,允許多個(gè)讀或者一個(gè)寫線程在執(zhí)行。

內(nèi)部的 Sync 繼承自 AQS,這個(gè) Sync 包含一個(gè)共享讀鎖 ReadLock 和一個(gè)獨(dú)占寫鎖 WriteLock。

該鎖可以設(shè)置公平和非公平,默認(rèn)非公平。

一個(gè)持有寫鎖的線程可以獲取讀鎖。如果該線程先持有寫鎖,再持有讀鎖并釋放寫鎖,稱為鎖降級(jí)。

WriteLock支持Condition并且與ReentrantLock語(yǔ)義一致,而ReadLock則不能使用Condition,否則拋出UnsupportedOperationException異常。

public class ReentrantReadWriteLock implements ReadWriteLock { /** 讀鎖 */ private final ReentrantReadWriteLock.ReadLock readerLock; /** 寫鎖 */ private final ReentrantReadWriteLock.WriteLock writerLock; /** 持有的AQS子類對(duì)象 */ final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer {} static final class NonfairSync extends Sync {} static final class FairSync extends Sync {} public static class ReadLock implements Lock {} public static class WriteLock implements Lock {} //默認(rèn)非公平 public ReentrantReadWriteLock() {this(false); } public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();readerLock = new ReadLock(this);writerLock = new WriteLock(this); } public static class ReadLock implements Lock { private final Sync sync;protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync;} } public static class WriteLock implements Lock { private final Sync sync;protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync;} }}1.2 state

Sync 繼承了 AQS,其中有一個(gè) int 的成員變量 state,int 共32位,這里將其視為兩部分,高16位表示讀的數(shù)量,低16位表示寫的數(shù)量,這里的數(shù)量表示線程重入后的總數(shù)量。

abstract static class Sync extends AbstractQueuedSynchronizer { //繼承的一個(gè)int的成員變量,將其拆分為高16位和低16位 //private volatile int state; static final int SHARED_SHIFT = 16; //讀一次,鎖增加的值 static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //讀的數(shù)量 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //寫的數(shù)量 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }}1.3 HoldCounter

讀鎖使用了一個(gè) ThreadLocal<HoldCounter> 讓每個(gè)線程有一個(gè)線程私有的 HoldCounter,HoldCounter包含一個(gè)線程 id 以及讀重入的次數(shù)。

查找對(duì)應(yīng)線程的HoldCounter 其實(shí)只用一個(gè) ThreadLocalHoldCounter 也足夠了。這里為了加快查詢,用了兩個(gè)額外的緩存,即 cachedHoldCounter、firstReader 和 firstReaderHoldCount(后兩個(gè)組合起來(lái)相當(dāng)于一個(gè) HoldCounter)。

在讀鎖的相關(guān)操作中,先檢查 firstReader 是否為當(dāng)前線程,否則檢查 cachedHoldCounter 內(nèi)部的線程是否為當(dāng)前線程,如果失敗最后會(huì)通過(guò) readHolds 來(lái)獲取當(dāng)前線程的 HoldCounter。

static final class HoldCounter { int count = 0; // 使用線程id,而不是線程的引用。這樣可以防止垃圾不被回收 final long tid = getThreadId(Thread.currentThread());}static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() {return new HoldCounter(); }}//使用的ThreadLocalprivate transient ThreadLocalHoldCounter readHolds;//一個(gè)緩存private transient HoldCounter cachedHoldCounter;//組合起來(lái)相當(dāng)于一個(gè)緩存private transient Thread firstReader = null;private transient int firstReaderHoldCount;二、讀鎖2.1 讀鎖的獲取

下面講解 tryAcquireShared 和 tryReadLock,tryReadLock 是一種直接搶占的非公平獲取,和 tryAcquireShared 中的非公平獲取有所不同。

2.1.1 tryAcquireShared

根據(jù)注釋

1.檢查是否存在其他線程持有的寫鎖,是的話失敗,返回 -1;

2.判斷在當(dāng)前公平狀態(tài)下能否讀,以及是否超過(guò)讀的最大數(shù)量,滿足條件則嘗試 CAS 修改狀態(tài),讓 state 加一個(gè)單位的讀 SHARED_UNIT;修改成功后會(huì)根據(jù)三種情況,即首次讀、firstReader 是當(dāng)前線程,以及其他情況分別進(jìn)行處理,成功,返回1;

3.前面未返回結(jié)果,會(huì)執(zhí)行 fullTryAcquireShared。

可以將該方法視為 fullTryAcquireShared 的一次快速嘗試,如果嘗試失敗,會(huì)在 fullTryAcquireShared 的自旋中一直執(zhí)行,直到返回成功或者失敗。

//ReadLockpublic void lock() { sync.acquireShared(1);} //AQSpublic final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0)doAcquireShared(arg);} //Syncprotected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); // 如果寫的數(shù)量不是0,且寫線程不是當(dāng)前線程,失敗 if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1; // 獲取讀的個(gè)數(shù) int r = sharedCount(c); // 如果當(dāng)前線程想要讀,沒(méi)有被堵塞 // 當(dāng)前讀的數(shù)量未超過(guò)最大允許的讀的個(gè)數(shù) // CAS執(zhí)行成功 if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) { // 第一次讀,修改firstReader和firstReaderHoldCount if (r == 0) { firstReader = current; firstReaderHoldCount = 1; // 如果當(dāng)前線程正好是firstReader} else if (firstReader == current) { firstReaderHoldCount++; // 其他情況,經(jīng)過(guò)一系列處理后,使得rh為當(dāng)前線程的HoldCounter // 對(duì)rh的記數(shù)加一} else { HoldCounter rh = cachedHoldCounter; // 如果cached為null或者不是當(dāng)前線程 if (rh == null || rh.tid != getThreadId(current)) // 從readHolds中g(shù)et,并修改cachedcachedHoldCounter = rh = readHolds.get(); // 如果cached不是null,但記數(shù)為null // 這種情況表示當(dāng)前線程的HoldCounter已經(jīng)被刪除,即為null, // 但cached仍然保留著null之前的那個(gè)HoldCounter, // 為了方便,直接將cached設(shè)置給ThreadLocal即可 else if (rh.count == 0)readHolds.set(rh); //執(zhí)行到這里,rh表示當(dāng)前線程的HoldCounter,記數(shù)加1 rh.count++;}return 1; } // 前面未返回結(jié)果,執(zhí)行第三步 return fullTryAcquireShared(current);}2.1.2 fullTryAcquireShared

在上述的簡(jiǎn)單嘗試 tryAcquireShared 未能確定結(jié)果后,執(zhí)行第三步 fullTryAcquireShared 自旋來(lái)不斷嘗試獲取讀鎖,直到成功獲取鎖返回1,或者滿足相應(yīng)條件認(rèn)定失敗返回-1。

1.其他線程持有寫鎖,失敗

2.當(dāng)前線程讀的嘗試滿足堵塞條件表示當(dāng)前線程排在其他線程后面,且當(dāng)前線程沒(méi)有持有鎖即非重入的情況,失敗

3.其他情況則不斷自旋CAS,達(dá)到最大讀的數(shù)量會(huì)拋出異常,其他情況在成功后返回1。

final int fullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ HoldCounter rh = null; for (;;) {int c = getState();if (exclusiveCount(c) != 0) { // 存在其他線程持有寫鎖,返回-1 if (getExclusiveOwnerThread() != current)return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. //沒(méi)有寫鎖,且該線程排在其他線程后面,應(yīng)該被堵塞 //如果已經(jīng)持有讀鎖,此次獲取是重入,可以執(zhí)行else if 之后的操作; //否則,會(huì)被堵塞,返回-1。} else if (readerShouldBlock()) { // Make sure we’re not acquiring read lock reentrantly //檢查firstReader if (firstReader == current) {// assert firstReaderHoldCount > 0; } else {if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { //執(zhí)行到下一步rh是cached或者readHolds.get(),檢查rhrh = readHolds.get(); //在get時(shí),如果不存在,會(huì)產(chǎn)生一個(gè)新的HoldCounter //記數(shù)為0表示不是重入鎖,會(huì)刪除讓其重新為nullif (rh.count == 0) readHolds.remove(); }} //返回失敗if (rh.count == 0) return -1; }} //達(dá)到最大值,不允許繼續(xù)增加if (sharedCount(c) == MAX_COUNT) throw new Error('Maximum lock count exceeded'); //和2.1.1中相似if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) {firstReader = current;firstReaderHoldCount = 1; } else if (firstReader == current) {firstReaderHoldCount++; } else {if (rh == null) rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get();else if (rh.count == 0) readHolds.set(rh);rh.count++;cachedHoldCounter = rh; // cache for release } return 1;} }}2.1.3 readerShouldBlock

該方法返回當(dāng)前線程請(qǐng)求獲得讀鎖是否應(yīng)該被堵塞,在公平鎖和非公平鎖中的實(shí)現(xiàn)不同

在公平鎖中,返回在排隊(duì)的隊(duì)列中當(dāng)前線程之前是否存在其他線程,是的話返回 true,當(dāng)前線程在隊(duì)列頭部或者隊(duì)列為空返回 false。

// FairSyncfinal boolean readerShouldBlock() { return hasQueuedPredecessors();}// AQSpublic final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());}

在非公平鎖中,隊(duì)列中存在兩個(gè)節(jié)點(diǎn),且第二個(gè)節(jié)點(diǎn)是獨(dú)占的寫節(jié)點(diǎn),會(huì)返回 true,使得新來(lái)的讀線程堵塞。

這種方式只能在第二個(gè)節(jié)點(diǎn)是請(qǐng)求寫鎖的情況下返回 true,避免寫鎖的無(wú)限等待;如果寫鎖的請(qǐng)求節(jié)點(diǎn)在隊(duì)列的其他位置,返回 false,不影響新來(lái)的讀線程獲取讀鎖。

如果不按照這種方式處理,而按照隊(duì)列中的順序進(jìn)行處理,則只要存在其他線程在讀,每次來(lái)一個(gè)新的線程請(qǐng)求讀鎖,總是成功,寫鎖會(huì)一直等待下去。

// NonfairSyncfinal boolean readerShouldBlock() { /* As a heuristic to avoid indefinite writer starvation, * block if the thread that momentarily appears to be head * of queue, if one exists, is a waiting writer. This is * only a probabilistic effect since a new reader will not * block if there is a waiting writer behind other enabled * readers that have not yet drained from the queue. */ return apparentlyFirstQueuedIsExclusive();}// AQSfinal boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null &&(s = h.next) != null &&!s.isShared() &&s.thread != null;}2.1.4 tryReadLock

和 fullTryAcquireShared 有相似之處,該方法總是直接去搶占鎖,直到其他線程獲取寫鎖返回失敗,或者當(dāng)前當(dāng)前線程獲取讀鎖返回成功。

//ReadLockpublic boolean tryLock() { return sync.tryReadLock();}//Syncfinal boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) {int c = getState();if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false;int r = sharedCount(c);if (r == MAX_COUNT) throw new Error('Maximum lock count exceeded');if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) {firstReader = current;firstReaderHoldCount = 1; } else if (firstReader == current) {firstReaderHoldCount++; } else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0) readHolds.set(rh);rh.count++; } return true;} }}2.2 讀鎖的釋放

tryReleaseShared 在 if/else 中實(shí)現(xiàn)了通過(guò) first/cached/readHolds 獲取相應(yīng)的 HoldCounter,并修改其中的記數(shù),記數(shù)為0則刪除;在 for 中,不斷自旋實(shí)現(xiàn) CAS 修改狀態(tài) c,如果修改后的狀態(tài)為0,表示讀寫鎖全部釋放,返回 true,否則是 false。

// ReadLockpublic void unlock() { sync.releaseShared(1);}// AQSpublic final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {doReleaseShared();return true; } return false;}// Syncprotected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 先檢查 firstReader是否是當(dāng)前線程 if (firstReader == current) {// assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1) firstReader = null;else firstReaderHoldCount--; //否則,處理 cached/readHolds中的HoldCounter } else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get();int count = rh.count;if (count <= 1) { readHolds.remove(); if (count <= 0)throw unmatchedUnlockException();}--rh.count; } //自旋修改 state for (;;) {int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. //只有讀寫鎖均釋放干凈,才返回true return nextc == 0; }}三、寫鎖3.1 寫鎖的獲取

下面講解 tryAcquire 和 tryWriteLock,tryWriteLock 是一種非公平的獲取。

3.1.1 tryAcquire

根據(jù)注釋,tryAcquire 分為三步

1.如果讀記數(shù)非0,或者寫記數(shù)非0且寫線程不是當(dāng)前線程,失敗

2.寫鎖的獲取應(yīng)該被堵塞或者CAS失敗,失敗

3.其他情況,寫重入和新來(lái)的寫線程,均成功

//WriteLockpublic void lock() { sync.acquire(1);}//AQSpublic final void acquire(int arg) { if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}//Syncprotected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //c分為兩部分,寫和讀 if (c != 0) {// (Note: if c != 0 and w == 0 then shared count != 0) // c非0,w是0,則讀記數(shù)非0 || 獨(dú)占的寫線程不是當(dāng)前線程 // 返回 falseif (w == 0 || current != getExclusiveOwnerThread()) return false;if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error('Maximum lock count exceeded');// Reentrant acquire // 重入的情況setState(c + acquires);return true; } // 寫應(yīng)該被堵塞或者CAS失敗,返回false if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false; // 非重入,在CAS成功后,設(shè)定獨(dú)占寫線程為當(dāng)前線程,返回true setExclusiveOwnerThread(current); return true;}3.1.2 writerShouldBlock

在公平鎖中,檢查隊(duì)列前面是否有其他線程在排隊(duì),在非公平鎖中,總是返回false,即總是不堵塞。

//FairSyncfinal boolean writerShouldBlock() { return hasQueuedPredecessors();}//NonfairSyncfinal boolean writerShouldBlock() { return false; // writers can always barge}3.1.3 tryWriteLock

和 tryAcquire 在非公平鎖的寫法基本一樣。

final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); if (c != 0) {int w = exclusiveCount(c);if (w == 0 || current != getExclusiveOwnerThread()) return false;if (w == MAX_COUNT) throw new Error('Maximum lock count exceeded'); } if (!compareAndSetState(c, c + 1))return false; setExclusiveOwnerThread(current); return true;}3.2 寫鎖的釋放

在 tryRelease 中,修改相應(yīng)的狀態(tài),如果修改后寫鎖記數(shù)為0,則返回 true。

//WriteLockpublic void unlock() { sync.release(1);}//AQSpublic final boolean release(int arg) { if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0) unparkSuccessor(h);return true; } return false;}//Syncprotected final boolean tryRelease(int releases) { // 首先檢查當(dāng)前線程是否持有寫鎖 if (!isHeldExclusively())throw new IllegalMonitorStateException(); int nextc = getState() - releases; // 根據(jù)修改后的寫記數(shù)來(lái)確定free boolean free = exclusiveCount(nextc) == 0; // 此時(shí),寫鎖完全釋放,設(shè)定寫?yīng)氄季€程為null if (free)setExclusiveOwnerThread(null); setState(nextc); // 返回 free return free;}四、鎖降級(jí)

如果一個(gè)線程已經(jīng)持有寫鎖,再去獲取讀鎖并釋放寫鎖,這個(gè)過(guò)程稱為鎖降級(jí)。

持有寫鎖的時(shí)候去獲取讀鎖,只有該持有寫鎖的線程能夠成功獲取讀鎖,然后再釋放寫鎖,保證此時(shí)當(dāng)前線程是有讀鎖的;如果有寫鎖,先釋放寫鎖,再獲取讀鎖,可能暫時(shí)不能獲取讀鎖,會(huì)在隊(duì)列中排隊(duì)等待。

到此這篇關(guān)于Java基礎(chǔ)之ReentrantReadWriteLock源碼解析的文章就介紹到這了,更多相關(guān)Java ReentrantReadWriteLock源碼解析內(nèi)容請(qǐng)搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!

標(biāo)簽: Java
相關(guān)文章:
主站蜘蛛池模板: 我看毛片 | 国产午夜亚洲精品第一区 | 国产日韩亚洲不卡高清在线观看 | 性做久久久久久久免费看 | 久久在线视频 | 欧美国产成人免费观看永久视频 | 国产一区二区三区亚洲综合 | 午夜精品亚洲 | 国产精品美女一级在线观看 | 中文字幕视频在线观看 | videos欧美丰满肥婆 | 成人免费福利片在线观看 | 成年午夜一级毛片视频 | 国内精品视频成人一区二区 | 最新国产区 | 在线视频观看免费视频18 | 99久久免费午夜国产精品 | 久久综合久美利坚合众国 | 男人天堂av网 | 毛片免费全部播放一级 | 一a一级片| 欧美九九视频 | 国产亚洲欧美另类久久久 | 日韩精品中文字幕视频一区 | 久久国产美女免费观看精品 | 国产成人一区二区三区在线视频 | 国产欧美一区二区精品性色 | 国产精品美女一级在线观看 | 一本色道久久88综合亚洲精品高清 | 国产美女一区二区三区 | 国产a级一级久久毛片 | 精品久久久久久中文字幕 | 成人毛片免费观看视频 | 国产精品一区伦免视频播放 | 美女的让男人桶到爽软件 | 999国产精品亚洲77777 | 亚洲综合99| 狠狠色狠狠色综合久久一 | 韩国美女豪爽一级毛片 | 久久久日韩精品国产成人 | 成人欧美一区二区三区在线 |