线程池02LinkedBlockingQueue阻塞队列 [数据库教程]

database

首先,我们先了解一下什么是阻塞队列:

  • 当队列满了时,队列会阻塞插入元素的线程,直到队列不满;

  • 当队列为空时,获取元素的线程会等待队列变成非空。

常用到的方法

上面是对阻塞队列的简单了解,下面重点分析一下LinkedBlockingQueue。

源码分析

Node节点

  • 可以看出是单向的链表结构

static class Node<E> {

E item;

Node<E> next;

Node(E x) { item = x; }

}

构造方法和参数

  • 如果未设置初始容量,则默认是Integer.MAX_VALUE;

   /** 队列容量 */

private final int capacity;

/** 目前元素数量 */

private final AtomicInteger count = new AtomicInteger();

/** 头节点 */

transient Node<E> head;

/** 末尾节点 */

private transient Node<E> last;

/** take, poll 方法的锁 */

private final ReentrantLock takeLock = new ReentrantLock();

/** 等待获取 条件队列*/

private final Condition notEmpty = takeLock.newCondition();

/** put, offer 方法的锁 */

private final ReentrantLock putLock = new ReentrantLock();

/** 等待存入的 条件队列 */

private final Condition notFull = putLock.newCondition();

public LinkedBlockingQueue() {

this(Integer.MAX_VALUE);

}

public LinkedBlockingQueue(int capacity) {

if (capacity <= 0) throw new IllegalArgumentException();

this.capacity = capacity;

last = head = new Node<E>(null);//初始化的时候设置头节点和尾节点为两个空节点

}

插入

put 方法

  • 如果队列已经满了,则放入到条件队列中。

public void put(E e) throws InterruptedException {

if (e == null) throw new NullPointerException();

int c = -1;

Node<E> node = new Node<E>(e);//创建新节点

final ReentrantLock putLock = this.putLock;

final AtomicInteger count = this.count;

putLock.lockInterruptibly();//获取put锁

try {

//判断存入的元素个数和配置的数量是否相等,如果相等。那么将当前线程放入到条件队列中

while (count.get() == capacity) {

notFull.await();

}

enqueue(node);//将节点插入到末尾

c = count.getAndIncrement();//元素数量+1

if (c + 1 < capacity)//当前元素数量小于容量的时候,唤醒“存入条件队列”的头节点到同步队列

notFull.signal();

} finally {

putLock.unlock();释放put锁

}

// 唤醒获取条件队列的头节点

if (c == 0)

signalNotEmpty();

}

//将节点设置为尾节点

private void enqueue(Node<E> node) {

last = last.next = node;

}

// 唤醒“获取条件队列”中的首节点

private void signalNotEmpty() {

final ReentrantLock takeLock = this.takeLock;

takeLock.lock();//t获取ake锁

try {

notEmpty.signal();//唤醒“获取条件队列”中的首节点

} finally {

takeLock.unlock();

}

}

offer 方法

  • 如果超过容量就无法插入

  public boolean offer(E e) {

if (e == null) throw new NullPointerException();

final AtomicInteger count = this.count;

if (count.get() == capacity)//如果超过容量直接返回false,表示不能再插入数据

return false;

int c = -1;

Node<E> node = new Node<E>(e);//新建节点

final ReentrantLock putLock = this.putLock;

putLock.lock();//获取put锁

try {

if (count.get() < capacity) {//小于容量时增加节点,并唤醒“存入条件队列”头节点到同步队列

enqueue(node);

c = count.getAndIncrement();

if (c + 1 < capacity)//当前元素数量小于容量的时候,唤醒“存入条件队列”的头节点到同步队列

notFull.signal();

}

} finally {

putLock.unlock();

}

// 唤醒获取条件队列的头节点

if (c == 0)

signalNotEmpty();

return c >= 0;

}

take 方法

  • 获取链表中的头节点。如果不存在元素,则将当前线程放入到条件队列中。

public E take() throws InterruptedException {

E x;

int c = -1;

final AtomicInteger count = this.count;

final ReentrantLock takeLock = this.takeLock;

takeLock.lockInterruptibly();//获取锁

try {

while (count.get() == 0) {//如果为空则放入“获取条件队列”

notEmpty.await();

}

x = dequeue();//将节点加入到链表最后

c = count.getAndDecrement();//数量减1

if (c > 1)//如果元素书大于1,则调用“获取条件队列”中的元素放入同步队列

notEmpty.signal();

} finally {

takeLock.unlock();

}

// 唤醒存入条件队列的头节点

if (c == capacity)

signalNotFull();

return x;//返回头节点

}

// 获取头节点元素

private E dequeue() {

Node<E> h = head;

Node<E> first = h.next;

h.next = h; // help GC

head = first;

E x = first.item;

first.item = null;

return x;

}

// 唤醒“存入条件队列”的头节点到同步队列

private void signalNotFull() {

final ReentrantLock putLock = this.putLock;

putLock.lock();

try {

notFull.signal();

} finally {

putLock.unlock();

}

}

poll方法

  • 如果没有数据立即返回null

   public E poll() {

final AtomicInteger count = this.count;

if (count.get() == 0)

return null;

E x = null;

int c = -1;

final ReentrantLock takeLock = this.takeLock;//获取锁

takeLock.lock();

try {

if (count.get() > 0) {//当前容器数量大于0时

x = dequeue();

c = count.getAndDecrement();

if (c > 1)

notEmpty.signal();

}

} finally {

takeLock.unlock();

}

// 唤醒存入条件队列的头节点

if (c == capacity)

signalNotFull();

return x;

}

总结

1.如何保证当队列没有消息或者消息满了的时候,进行监听?

上面看代码的时候,两段代码刚开始是有点懵的。

1.存入的方法

// 唤醒获取条件队列的头节点

if (c == 0) signalNotEmpty();

2.获取的方法

// 唤醒存入条件队列的头节点

if (c == capacity) signalNotFull();

其实这就监听的重要环节。

逻辑是这样的。以存入为例:

1.如果当前节点为0,说明队列中没有任务;

2.唤醒“获取条件队列”的头节点,去尝试获取元素。如果获取到则执行,如果没有,则依然放入到“获取条件队列”的末尾;

3.这样就可以保证在存入数据的时候,实时监听获取节点元素了。

线程池02-LinkedBlockingQueue 阻塞队列

以上是 线程池02LinkedBlockingQueue阻塞队列 [数据库教程] 的全部内容, 来源链接: utcz.com/z/535228.html

回到顶部