詳解java中的阻塞隊列
阻塞隊列(BlockingQueue)首先是一個支持先進先出的隊列,與普通的隊列完全相同;其次是一個支持阻塞操作的隊列,即:
當隊列滿時,會阻塞執行插入操作的線程,直到隊列不滿。 當隊列為空時,會阻塞執行獲取操作的線程,直到隊列不為空。阻塞隊列用在多線程的場景下,因此阻塞隊列使用了鎖機制來保證同步,這里使用的可重入鎖;而對于阻塞與喚醒機制則有與鎖綁定的Condition實現
應用場景:生產者消費者模式
java中的阻塞隊列java中的阻塞隊列根據容量可以分為有界隊列和無界隊列:
有界隊列:隊列中只能存儲有限個元素,超出后存放元素線程會被阻塞或者失敗。 無界隊列:隊列中可以存儲無限個元素。java8中提供了7種阻塞隊列阻塞隊列供開發者使用,如下表:類名 描述 ArrayBlockingQueue 一個由數組結構組成的有界阻塞隊列 LinkedBlockingQueue 由鏈表結構組成的有界阻塞隊列(默認大小Integer.MAX_VALUE) PriorityBlockingQueue 支持優先級排序的無界阻塞隊列 DelayQueue 使用優先級隊列實現的延遲無界阻塞隊列 SynchronousQueue 不存儲元素的阻塞隊列,即單個元素的隊列 LinkedTransferQueue 由鏈表結構組成的無界阻塞隊列 LinkedBlockingDeque 由鏈表結構組成的雙向阻塞隊列
另外還有一個在ScheduledThreadPoolExecutor中實現的DelayedWorkQueue阻塞隊列,但這個阻塞隊列開發者不能使用。它們之間的UML類圖如下圖:
BlockingQueue接口是阻塞隊列對外的訪問接口,所有的阻塞隊列都實現了BlockQueue中的方法
BlockQueue中方法作為一個隊列的核心方法就是入隊和出隊。由于存在阻塞策略,BlockQueue將出隊入隊的情況分為了四組,每組提供不同的方法:
拋出異常:當隊列滿時,如果再往隊列中插入元素,則拋出IllegalStateException異常;當隊列為空時,從隊列中獲取元素則拋出NoSuchElementException異常。 返回特定值(布爾值):當隊列滿時,如果再往隊列中插入元素,則返回false;當隊列為空時,從隊列中獲取元素則返回null。 一直阻塞:當隊列滿時,如果再往隊列中插入元素,阻塞當前線程直到隊列中至少一個被移除或者響應中斷退出;當隊列為空時,則阻塞當前線程直到至少一個元素元素入隊或者響應中斷退出。 超時退出:當隊列滿時,如果再往隊列中插入元素,阻塞當前線程直到隊列中至少一個被移除或者達到指定的等待時間退出或者響應中斷退出;當隊列為空時,則阻塞當前線程直到至少一個元素元素入隊或者達到指定的等待時間退出或者響應中斷退出。對于每種情況BlockingQueue提供的方法如下表:
方法處理方式 拋出異常 返回特定值(布爾值) 一直阻塞 超時退出 插入 add(e) offer(e) put(e) offer(e,time,unit) 移除 remove() poll() take() poll(time.unit) 檢查 element() peek() 不可用 不可用
上述方法一般用于生產者-消費者模型中,是其中的生產和消費操作隊列的核心方法。除了這些方法,BlockingQueue還提供了一些其他的方法如下表:
方法名稱 描述 remove(Object o) 從隊列中移除一個指定值 size() 獲取隊列中元素的個數 contains(Object o) 判斷隊列是否包含指定的元素,但是這個元素在這次判斷完可能就會被消費 drainTo(Collection<? super E> c) 將隊列中元素放在給定的集合中,并返回添加的元素個數 drainTo(Collection<? super E> c, int maxElements) 將隊列中元素取maxElements(不超過隊列中元素個數)個放在給定的集合中,并返回添加的元素個數 remainingCapacity() 計算隊列中還可以存放的元素個數 toArray() 以objetc數組的形式獲取隊列中所有的元素 toArray(T[] a) 以給定類型數組的方式獲取隊列中所有的元素 clear() 清空隊列,危險的操作
阻塞隊列的實現原理阻塞隊列的實現依靠通知模式實現:當生產者向滿了的隊列中添加元素時,會阻塞住生產者,直到消費者消費了一個隊列中的元素后會通知消費者隊列可用,此時再由生產者向隊列中添加元素。反之亦然。
阻塞隊列的阻塞喚醒依靠Condition——條件隊列來實現。
以ArrayBlockingQueue為例說明:
ArrayBlockingQueue的定義:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** The queued items */ //以數組的結構存儲隊列的元素,采用的是循環數組 final Object[] items; /** items index for next take, poll, peek or remove */ //隊列的隊頭索引 int takeIndex; /** items index for next put, offer, or add */ //隊列的隊尾索引 int putIndex; /** Number of elements in the queue */ //隊列中元素的個數 int count; /** Main lock guarding all access */ //對于ArrayBlockingQueue所有的操作都需要加鎖, final ReentrantLock lock; /** Condition for waiting takes */ //條件隊列,當隊列為空時阻塞消費者并在生產者生產后喚醒消費者 private final Condition notEmpty; /** Condition for waiting puts */ //條件隊列,當隊列滿時阻塞生產者,并在消費者消費隊列后喚醒生產者 private final Condition notFull;}
根據類的定義字段可以看到,有兩個Condition條件隊列,猜測以下過程
當隊列為空,消費者試圖消費時應該調用notEmpty.await()方法阻塞,并在生產者生產后調用notEmpty.single()方法 當隊列已滿,生產者試圖放入元素應調用notFull.await()方法阻塞,并在消費者消費隊列后調用notFull.single()方法向隊向隊列中添加元素put()方法的添加過程。
/** * 向隊列中添加元素 * 當隊列已滿時需要阻塞當前線程 * 放入元素后喚醒因隊列為空阻塞的消費者 */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //當隊列已滿時需要notFull.await()阻塞當前線程 //offer(e,time,unit)方法就是阻塞的時候加了超時設定 while (count == items.length) notFull.await(); //放入元素的過程 enqueue(e); } finally { lock.unlock(); } } /**enqueue實際添加元素的方法*/ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; //如果條件隊列中存在等待的線程 //喚醒 notEmpty.signal(); }
從隊列中獲取元素take()方法的獲取過程。
/** * 從隊列中獲取元素 * 當隊列已空時阻塞當前線程 * 從隊列中消費元素后喚醒等待的生產線程 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //隊列為空需要阻塞當前線程 while (count == 0) notEmpty.await(); //獲取元素的過程 return dequeue(); } finally { lock.unlock(); } } /**dequeue實際消費元素的方法*/ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings('unchecked') E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //消費元素后從喚醒阻塞的生產者線程 notFull.signal(); return x; }總結
阻塞隊列提供了不同于普通隊列的增加、刪除元素的方法,核心在與隊列滿時阻塞生產者和隊列空時阻塞消費者。這一阻塞過程依靠與鎖綁定的Condition對象實現。Condition接口的實現在AQS中實現,具體的實現類是ConditionObject
以上就是詳解java中的阻塞隊列的詳細內容,更多關于java 阻塞隊列的資料請關注好吧啦網其它相關文章!
相關文章:
