多线程之阻塞队列

多线程之阻塞队列

阻塞队列再并发编程中经常被用于“生成者-消费者”问题中。

阻塞队列

  1. 当队列为空,获取元素线程会阻塞等待队列变为不为空
  2. 当队列不为空时,存储元素的线程会等待队列可用(NOT Full)
// 插入元素
add(E e) :往队列插入数据,当队列满时,插入元素时会抛出IllegalStateException异常;
offer(E e):当往队列插入数据时,插入成功返回true,否则则返回false。当队列满时不会抛出异常;

// 删除元素
remove(Object o):从队列中删除数据,成功则返回true,否则为false
poll:删除数据,当队列为空时,返回null

// 查看元素
element:获取队头元素,如果队列为空时则抛出NoSuchElementException异常;
peek:获取队头元素,如果队列为空则抛出NoSuchElementException异常

// 插入数据:
put():当阻塞队列容量已经满时,往阻塞队列插入数据的线程会被阻塞,直至阻塞队列已经有空余的容量可供使用;
offer(E e, long timeout, TimeUnit unit):若阻塞队列已经满时,同样会阻塞插入数据的线程,直至阻塞队列已经有空余的地方,与put方法不同的是,该方法会有一个超时时间,若超过当前给定的超时时间,插入数据的线程会退出;

// 删除数据:
take():当阻塞队列为空时,获取队头数据的线程会被阻塞;
poll(long timeout, TimeUnit unit):当阻塞队列为空时,获取数据的线程会被阻塞,另外,如果被阻塞的线程超过了给定的时长,该线程会退出

核心方法为 put 和 take

常用阻塞队列

Screen Shot 2020-08-17 at 10.57.47 PM

ArrayBlockingQueue

数组实现的有界队列,通过ReentrantLock 和 Condition实现阻塞。

final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

put: 获取锁,当队列满时notFull await,队列不满时enqueue,signal唤醒notEmpty

take: 获取锁,当队列为空时notEmpty await,当元素不为空dequeue,signal唤醒notFull

LinkedBlockingQueue

用链表实现的有界阻塞队列(如果不设置容量,默认为Integer.MAX_VALUE), 使用2个锁,可以使得读和写线程并发执行。

  • 锁takeLock保证删除数据的安全性,队列为空时读操作线程阻塞并加入takeLock锁的notEmpty条件等待队列。
  • 锁putLock保证添加数据的安全性,队列满时写操作线程阻塞并加入putLock锁的notFull条件等待队列。
/ take, poll, peek 等读操作的方法需要获取到这个锁
private final ReentrantLock takeLock = new ReentrantLock();
// 如果读操作的时候队列是空的,加入notEmpty等待队列
private final Condition notEmpty = takeLock.newCondition();

// put, offer 等写操作的方法需要获取到这个锁
private final ReentrantLock putLock = new ReentrantLock();
// 如果写操作的时候队列是满的,加入notFull等待队列
private final Condition notFull = putLock.newCondition();

put: putlock, 当队列满时notFull await,队列不满enqueue,如果入队成功后队列还没有满(添加删除可同时进行),唤醒notFull队列中等待添加元素的线程; 释放锁,如果添加元素前队列为空,可能会有读线程阻塞,入队后唤醒阻塞的读线程

take:takeLock, 如果队列为空notEmpty await,队列不为空,dequeue,出队后如果还有元素,唤醒notempty等待删除元素的线程;释放锁,如果删除元素前队列是满的,可能有写线程阻塞等待,所以唤醒写线程

SynchronousQueue

SynchronousQueue的同步指的是读线程和写线程需要同步,一个读线程匹配一个写线程。当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作。

SynchronousQueue 实际不存储元素,数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。

SynchronousQueue 执行put/take操作时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程),则将当前线程加入到等待队列。如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然),则匹配等待队列的队头,出队,返回相应数据。

PriorityBlockingQueue

PriorityBlockingQueue队列为无界队列,只能指定初始的队列大小

PriorityBlockingQueue其实是 PriorityQueue 的线程安全版本,插入队列的对象必须是可比较大小的(comparable)。PriorityBlockingQueue/PriorityQueue 通过堆实现

PriorityBlockingQueue put 方法不会 block,因为它是无界队列;take 方法在队列为空的时候会阻塞。

DelayQueue

  1. DelayQueue是一个支持延时获取元素的无界阻塞队列。
  2. DelayQueue中的元素都是可延期的,因为必须实现Delayed接口。
  3. 插入元素时,会根据延期时间对元素排序,队头的元素是最先到期的;取出元素时,只有在队头元素到期时才能够从队列中取元素。如果队头元素还有t时间到期,则将取出元素线程阻塞t时间,t时间到后再次尝试取出队头元素。

使用 ReentrantLock + Condition

put: 获取锁,添加元素,插入元素为队首时,唤醒take线程尝试take (因为更新了队首元素,所以要重新检查队首元素是否到期。)

take: 获取锁,如果队列为空,阻塞take;如果队首到期则出队,如果队首元素没到期,阻塞take(阻塞时间为到期剩余时间),时间到后阻塞唤醒,重新尝试(无限循环)

总结

阻塞队列是一个比普通队列多出两个附加操作的队列。两个操作分别是:

  • 在队列为空时,获取元素的线程会等待队列变为非空。
  • 当队列满时,存储元素的线程会等待队列可用。
ArrayBlockingQueue
  1. ArrayBlockingQueue是由数组实现的有界队列,通过ReentrantLock锁保证队列数据的安全性,通过ReentrantLock的条件Condition是实现阻塞。
  2. 添加元素时,如果队列满了不能添加元素,就将添加元素的线程阻塞并加入notFull条件队列;当成功删除元素后,队列就可以添加元素了,唤醒notFull条件队列中阻塞的线程,添加元素。
  3. 删除元素时,如果队列空了不能删除元素,就将删除元素的线程阻塞并加入notEmpty条件队列;当成功添加元素后,队列就可以删除元素了,唤醒notEmpty条件队列中阻塞的线程,删除元素。
LinkedBlockingQueue
  1. LinkedBlockingQueue用链表实现的有界阻塞队列。(不设置容量,默认为Integer.MAX_VALUE)
  • 锁takeLock保证删除数据的安全性,队列为空时读操作线程阻塞并加入takeLock锁的notEmpty条件等待队列。
  • 锁putLock保证添加数据的安全性,队列满时写操作线程阻塞并加入putLock锁的notFull条件等待队列。
  1. ArrayBlockingQueue的读写使用同一个锁来保证数据安全。LinkedBlockingQueue的读写分别用不同的锁来保证数据安全,采用不同的锁可以使读线程和写线程并发执行,提高了吞吐量,但也增加了编程的复杂度。
SynchronousQueue
  1. SynchronousQueue的同步指的是读线程和写线程需要同步,一个读线程匹配一个写线程。当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作。
  2. SynchronousQueue 实际不存储元素,数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。
  3. SynchronousQueue 执行put/take操作时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程),则将当前线程加入到等待队列。如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然),则匹配等待队列的队头,出队,返回相应数据。
PriorityBlockingQueue
  1. PriorityBlockingQueue队列为无界队列,只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容。
  2. PriorityBlockingQueue其实是 PriorityQueue 的线程安全版本,插入队列的对象必须是可比较大小的(comparable)。PriorityBlockingQueue/PriorityQueue 通过堆实现,这里不再详细介绍数据结构,重点讲解阻塞原理。

PriorityQueue 优先级队列的元素按照其自然顺序进行排序或者构造队列时提供的 Comparator 进行排序,插入元素是根据排序规则找到新元素在堆中位置插入。

  1. PriorityBlockingQueue put 方法不会 block,因为它是无界队列;take 方法在队列为空的时候会阻塞。
DelayQueue
  1. DelayQueue是一个支持延时获取元素的无界阻塞队列。
  2. DelayQueue中的元素都是可延期的,因为必须实现Delayed接口。
  3. 插入元素时,会根据延期时间对元素排序,队头的元素是最先到期的;取出元素时,只有在队头元素到期时才能够从队列中取元素。如果队头元素还有t时间到期,则将取出元素线程阻塞t时间,t时间到后再次尝试取出队头元素。

阻塞队列基于数组的简易实现

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BlockQueue<T> {

private int size;
private Object[] queue;

private Lock lock = new ReentrantLock();
private Condition full = lock.newCondition();
private Condition empty = lock.newCondition();

private int index;
private int removeIndex;
private int currLen;

public BlockQueue() {
this(10);
}

public BlockQueue(int size) {
this.index = 0;
this.removeIndex = 0;
this.currLen = 0;
this.size = size;
queue = new Object[size];
}

public void push(T element) throws InterruptedException {
lock.lock();
try {
while (currLen == size) {
System.out.println("队列满。。。");
full.await();
}
queue[index] = element;
if (++index == size) {
index = 0;
}
currLen++;
empty.signal();
} finally {
lock.unlock();
}
}

public T pop() throws InterruptedException {
lock.lock();
try {
while (currLen == 0) {
System.out.println("队列空。。。");
empty.await();
}
Object obj = queue[removeIndex];
if (++removeIndex == size) {
removeIndex = 0;
}
currLen--;
full.signal();
return (T) obj;
} finally {
lock.unlock();
}
}


}

参考

https://mp.weixin.qq.com/s?__biz=MzAxMjEwMzQ5MA==&mid=2448892563&idx=2&sn=7cec5844d3c23e6e346a5e4d40ca2cf2&chksm=8fb578beb8c2f1a8371cca5c6cf4aa07435254cdc69dff280977c6c4fcb176df01668e89d7e4&scene=158#rd

https://mp.weixin.qq.com/s?__biz=MzAxMjEwMzQ5MA==&mid=2448892660&idx=2&sn=2fdad62bc5017261860ca82935cce7ba&chksm=8fb578d9b8c2f1cfd903c191ea83408314c46680156a6057dc2d4fa218e6e0556ead55115e11&scene=158#rd