多线程之阻塞队列
多线程之阻塞队列
阻塞队列再并发编程中经常被用于“生成者-消费者”问题中。
阻塞队列
- 当队列为空,获取元素线程会阻塞等待队列变为不为空
- 当队列不为空时,存储元素的线程会等待队列可用(NOT Full)
// 插入元素 |
核心方法为 put 和 take
常用阻塞队列
ArrayBlockingQueue
数组实现的有界队列,通过ReentrantLock 和 Condition实现阻塞。
final ReentrantLock lock; |
put: 获取锁,当队列满时notFull await,队列不满时enqueue,signal唤醒notEmpty
take: 获取锁,当队列为空时notEmpty await,当元素不为空dequeue,signal唤醒notFull
LinkedBlockingQueue
用链表实现的有界阻塞队列(如果不设置容量,默认为Integer.MAX_VALUE), 使用2个锁,可以使得读和写线程并发执行。
- 锁takeLock保证删除数据的安全性,队列为空时读操作线程阻塞并加入takeLock锁的notEmpty条件等待队列。
- 锁putLock保证添加数据的安全性,队列满时写操作线程阻塞并加入putLock锁的notFull条件等待队列。
/ take, poll, peek 等读操作的方法需要获取到这个锁 |
put: putlock, 当队列满时notFull await,队列不满enqueue,如果入队成功后队列还没有满(添加删除可同时进行),唤醒notFull队列中等待添加元素的线程; 释放锁,如果添加元素前队列为空,可能会有读线程阻塞,入队后唤醒阻塞的读线程
take:takeLock, 如果队列为空notEmpty await,队列不为空,dequeue,出队后如果还有元素,唤醒notempty等待删除元素的线程;释放锁,如果删除元素前队列是满的,可能有写线程阻塞等待,所以唤醒写线程
SynchronousQueue
SynchronousQueue的同步指的是读线程和写线程需要同步,一个读线程匹配一个写线程。当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作。
SynchronousQueue 实际不存储元素,数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。
SynchronousQueue 执行put/take操作时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程),则将当前线程加入到等待队列。如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然),则匹配等待队列的队头,出队,返回相应数据。
PriorityBlockingQueue
PriorityBlockingQueue队列为无界队列,只能指定初始的队列大小
PriorityBlockingQueue其实是 PriorityQueue 的线程安全版本,插入队列的对象必须是可比较大小的(comparable)。PriorityBlockingQueue/PriorityQueue 通过堆实现
PriorityBlockingQueue put 方法不会 block,因为它是无界队列;take 方法在队列为空的时候会阻塞。
DelayQueue
- DelayQueue是一个支持延时获取元素的无界阻塞队列。
- DelayQueue中的元素都是可延期的,因为必须实现Delayed接口。
- 插入元素时,会根据延期时间对元素排序,队头的元素是最先到期的;取出元素时,只有在队头元素到期时才能够从队列中取元素。如果队头元素还有t时间到期,则将取出元素线程阻塞t时间,t时间到后再次尝试取出队头元素。
使用 ReentrantLock + Condition
put: 获取锁,添加元素,插入元素为队首时,唤醒take线程尝试take (因为更新了队首元素,所以要重新检查队首元素是否到期。)
take: 获取锁,如果队列为空,阻塞take;如果队首到期则出队,如果队首元素没到期,阻塞take(阻塞时间为到期剩余时间),时间到后阻塞唤醒,重新尝试(无限循环)
总结
阻塞队列是一个比普通队列多出两个附加操作的队列。两个操作分别是:
- 在队列为空时,获取元素的线程会等待队列变为非空。
- 当队列满时,存储元素的线程会等待队列可用。
ArrayBlockingQueue
- ArrayBlockingQueue是由数组实现的有界队列,通过ReentrantLock锁保证队列数据的安全性,通过ReentrantLock的条件Condition是实现阻塞。
- 添加元素时,如果队列满了不能添加元素,就将添加元素的线程阻塞并加入notFull条件队列;当成功删除元素后,队列就可以添加元素了,唤醒notFull条件队列中阻塞的线程,添加元素。
- 删除元素时,如果队列空了不能删除元素,就将删除元素的线程阻塞并加入notEmpty条件队列;当成功添加元素后,队列就可以删除元素了,唤醒notEmpty条件队列中阻塞的线程,删除元素。
LinkedBlockingQueue
- LinkedBlockingQueue用链表实现的有界阻塞队列。(不设置容量,默认为Integer.MAX_VALUE)
- 锁takeLock保证删除数据的安全性,队列为空时读操作线程阻塞并加入takeLock锁的notEmpty条件等待队列。
- 锁putLock保证添加数据的安全性,队列满时写操作线程阻塞并加入putLock锁的notFull条件等待队列。
- ArrayBlockingQueue的读写使用同一个锁来保证数据安全。LinkedBlockingQueue的读写分别用不同的锁来保证数据安全,采用不同的锁可以使读线程和写线程并发执行,提高了吞吐量,但也增加了编程的复杂度。
SynchronousQueue
- SynchronousQueue的同步指的是读线程和写线程需要同步,一个读线程匹配一个写线程。当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作。
- SynchronousQueue 实际不存储元素,数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。
- SynchronousQueue 执行put/take操作时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程),则将当前线程加入到等待队列。如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然),则匹配等待队列的队头,出队,返回相应数据。
PriorityBlockingQueue
- PriorityBlockingQueue队列为无界队列,只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容。
- PriorityBlockingQueue其实是 PriorityQueue 的线程安全版本,插入队列的对象必须是可比较大小的(comparable)。PriorityBlockingQueue/PriorityQueue 通过堆实现,这里不再详细介绍数据结构,重点讲解阻塞原理。
PriorityQueue 优先级队列的元素按照其自然顺序进行排序或者构造队列时提供的 Comparator 进行排序,插入元素是根据排序规则找到新元素在堆中位置插入。
- PriorityBlockingQueue put 方法不会 block,因为它是无界队列;take 方法在队列为空的时候会阻塞。
DelayQueue
- DelayQueue是一个支持延时获取元素的无界阻塞队列。
- DelayQueue中的元素都是可延期的,因为必须实现Delayed接口。
- 插入元素时,会根据延期时间对元素排序,队头的元素是最先到期的;取出元素时,只有在队头元素到期时才能够从队列中取元素。如果队头元素还有t时间到期,则将取出元素线程阻塞t时间,t时间到后再次尝试取出队头元素。
阻塞队列基于数组的简易实现
import java.util.concurrent.locks.Condition; |
参考