【Java】Java高并发BlockingQueue重要的实现类

Java高并发BlockingQueue重要的实现类

入门小站发布于 21 分钟前

ArrayBlockingQueue

public class ArrayBlockingQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

/** 队列元素 */

final Object[] items;

/** 下一次读取操作的位置, poll, peek or remove */

int takeIndex;

/** 下一次写入操作的位置, offer, or add */

int putIndex;

/** 元素数量 */

int count;

/*

* Concurrency control uses the classic two-condition algorithm

* found in any textbook.

* 它采用一个 ReentrantLock 和相应的两个 Condition 来实现。

*/

/** Main lock guarding all access */

final ReentrantLock lock;

/** Condition for waiting takes */

private final Condition notEmpty;

/** Condition for waiting puts */

private final Condition notFull;

/** 指定大小 */

public ArrayBlockingQueue(int capacity) {

this(capacity, false);

}

/**

* 指定容量大小与指定访问策略

* @param fair 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;

*/

public ArrayBlockingQueue(int capacity, boolean fair) {}

/**

* 指定容量大小、指定访问策略与最初包含给定集合中的元素

* @param c 将此集合中的元素在构造方法期间就先添加到队列中

*/

public ArrayBlockingQueue(int capacity, boolean fair,

Collection<? extends E> c) {}

}

  • ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用一个锁对象,由此也意味着两者无法真正并行运行。按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。然而事实上并没有如此,因为ArrayBlockingQueue的数据写入已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。

  • 通过构造函数得知,参数fair控制对象内部是否采用公平锁,默认采用非公平锁。
  • items、takeIndex、putIndex、count等属性并没有使用volatile修饰,这是因为访问这些变量(通过方法获取)使用都在锁内,并不存在可见性问题,如size()

  • 另外有个独占锁lock用来对出入对操作加锁,这导致同时只有一个线程可以访问入队出队。

Put源码分析

/** 进行入队操作 */

public void put(E e) throws InterruptedException {

//e为null,则抛出NullPointerException异常

checkNotNull(e);

//获取独占锁

final ReentrantLock lock = this.lock;

/**

* lockInterruptibly()

* 获取锁定,除非当前线程为interrupted

* 如果锁没有被另一个线程占用并且立即返回,则将锁定计数设置为1。

* 如果当前线程已经保存此锁,则保持计数将递增1,该方法立即返回。

* 如果锁被另一个线程保持,则当前线程将被禁用以进行线程调度,并且处于休眠状态

*

*/

lock.lockInterruptibly();

try {

//空队列

while (count == items.length)

//进行条件等待处理

notFull.await();

//入队操作

enqueue(e);

} finally {

//释放锁

lock.unlock();

}

}

/** 真正的入队 */

private void enqueue(E x) {

// assert lock.getHoldCount() == 1;

// assert items[putIndex] == null;

//获取当前元素

final Object[] items = this.items;

//按下一个插入索引进行元素添加

items[putIndex] = x;

// 计算下一个元素应该存放的下标,可以理解为循环队列

if (++putIndex == items.length)

putIndex = 0;

count++;

//唤起消费者

notEmpty.signal();

}

Take源码分析

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

while (count == 0)

notEmpty.await();

return dequeue();

} finally {

lock.unlock();

}

}

private E dequeue() {

// assert lock.getHoldCount() == 1;

// assert items[takeIndex] != null;

final Object[] items = this.items;

@SuppressWarnings("unchecked")

E x = (E) items[takeIndex];

items[takeIndex] = null;

if (++takeIndex == items.length)

takeIndex = 0;

count--;

//这里有些特殊

if (itrs != null)

//保持队列中的元素和迭代器的元素一致

itrs.elementDequeued();

notFull.signal();

return x;

}

//该类的迭代器,所有的迭代器共享数据,队列改变会影响所有的迭代器

transient Itrs itrs = null; //其存放了目前所创建的所有迭代器。

/**

* 迭代器和它们的队列之间的共享数据,允许队列元素被删除时更新迭代器的修改。

*/

class Itrs {

void elementDequeued() {

// assert lock.getHoldCount() == 1;

if (count == 0)

//队列中数量为0的时候,队列就是空的,会将所有迭代器进行清理并移除

queueIsEmpty();

//takeIndex的下标是0,意味着队列从尾中取完了,又回到头部获取

else if (takeIndex == 0)

takeIndexWrapped();

}

/**

* 当队列为空的时候做的事情

* 1. 通知所有迭代器队列已经为空

* 2. 清空所有的弱引用,并且将迭代器置空

*/

void queueIsEmpty() {}

/**

* 将takeIndex包装成0

* 并且通知所有的迭代器,并且删除已经过期的任何对象(个人理解是置空对象)

* 也直接的说就是在Blocking队列进行出队的时候,进行迭代器中的数据同步,保持队列中的元素和迭代器的元素是一致的。

*/

void takeIndexWrapped() {}

}

Itrs迭代器创建的时机

//从这里知道,在ArrayBlockingQueue对象中调用此方法,才会生成这个对象

//那么就可以理解为,只要并未调用此方法,则ArrayBlockingQueue对象中的Itrs对象则为空

public Iterator<E> iterator() {

return new Itr();

}

private class Itr implements Iterator<E> {

Itr() {

//这里就是生产它的地方

//count等于0的时候,创建的这个迭代器是个无用的迭代器,可以直接移除,进入detach模式。

//否则就把当前队列的读取位置给迭代器当做下一个元素,cursor存储下个元素的位置。

if (count == 0) {

// assert itrs == null;

cursor = NONE;

nextIndex = NONE;

prevTakeIndex = DETACHED;

} else {

final int takeIndex = ArrayBlockingQueue.this.takeIndex;

prevTakeIndex = takeIndex;

nextItem = itemAt(nextIndex = takeIndex);

cursor = incCursor(takeIndex);

if (itrs == null) {

itrs = new Itrs(this);

} else {

itrs.register(this); // in this order

itrs.doSomeSweeping(false);

}

prevCycles = itrs.cycles;

// assert takeIndex >= 0;

// assert prevTakeIndex == takeIndex;

// assert nextIndex >= 0;

// assert nextItem != null;

}

}

}

代码演示

package com.rumenz.task;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

/**

* @className: BlockingQuqueExample

* @description: TODO 类描述

* @author: mac

* @date: 2021/1/20

**/

public class BlockingQueueExample {

private static volatile Boolean flag=false;

public static void main(String[] args) {

BlockingQueue blockingQueue=new ArrayBlockingQueue(1024);

ExecutorService executorService = Executors.newFixedThreadPool(2);

executorService.execute(()->{

try{

blockingQueue.put(1);

Thread.sleep(2000);

blockingQueue.put(3);

flag=true;

}catch (Exception e){

e.printStackTrace();

}

});

executorService.execute(()->{

try {

while (!flag){

Integer i = (Integer) blockingQueue.take();

System.out.println(i);

}

}catch (Exception e){

e.printStackTrace();

}

});

executorService.shutdown();

}

}

LinkedBlockingQueue

public class LinkedBlockingQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

//队列的容量,指定大小或为默认值Integer.MAX_VALUE

private final int capacity;

//元素的数量

private final AtomicInteger count = new AtomicInteger();

//队列头节点,始终满足head.item==null

transient Node<E> head;

//队列的尾节点,始终满足last.next==null

private transient Node<E> last;

/** Lock held by take, poll, etc */

//出队的锁:take, poll, peek 等读操作的方法需要获取到这个锁

private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */

//当队列为空时,保存执行出队的线程:如果读操作的时候队列是空的,那么等待 notEmpty 条件

private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */

//入队的锁:put, offer 等写操作的方法需要获取到这个锁

private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */

//当队列满时,保存执行入队的线程:如果写操作的时候队列是满的,那么等待 notFull 条件

private final Condition notFull = putLock.newCondition();

//传说中的无界队列

public LinkedBlockingQueue() {}

//传说中的有界队列

public LinkedBlockingQueue(int capacity) {

if (capacity <= 0) throw new IllegalArgumentException();

this.capacity = capacity;

last = head = new Node<E>(null);

}

//传说中的无界队列

public LinkedBlockingQueue(Collection<? extends E> c){}

/**

* 链表节点类

*/

static class Node<E> {

E item;

/**

* One of:

* - 真正的继任者节点

* - 这个节点,意味着继任者是head.next

* - 空,意味着没有后继者(这是最后一个节点)

*/

Node<E> next;

Node(E x) { item = x; }

}

}

  • 如果需要获取(take)一个元素,需要获取takeLock锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。
  • 如果要插入(put)一个元素,需要获取putLock锁,但是获取了锁还不够,如果队列此时已满,还是需要队列不满(notFull)的这个条件(Condition)。

Put源码分析

public class LinkedBlockingQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

/**

* 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。

*/

public void put(E e) throws InterruptedException {

if (e == null) throw new NullPointerException();

// 如果你纠结这里为什么是 -1,可以看看 offer 方法。这就是个标识成功、失败的标志而已。

int c = -1;

//包装成node节点

Node<E> node = new Node<E>(e);

final ReentrantLock putLock = this.putLock;

final AtomicInteger count = this.count;

//获取锁定

putLock.lockInterruptibly();

try {

/** 如果队列满,等待 notFull 的条件满足。 */

while (count.get() == capacity) {

notFull.await();

}

//入队

enqueue(node);

//原子性自增

c = count.getAndIncrement();

// 如果这个元素入队后,还有至少一个槽可以使用,调用 notFull.signal() 唤醒等待线程。

// 哪些线程会等待在 notFull 这个 Condition 上呢?

if (c + 1 < capacity)

notFull.signal();

} finally {

//解锁

putLock.unlock();

}

// 如果 c == 0,那么代表队列在这个元素入队前是空的(不包括head空节点),

// 那么所有的读线程都在等待 notEmpty 这个条件,等待唤醒,这里做一次唤醒操作

if (c == 0)

signalNotEmpty();

}

/** 链接节点在队列末尾 */

private void enqueue(Node<E> node) {

// assert putLock.isHeldByCurrentThread();

// assert last.next == null;

// 入队的代码非常简单,就是将 last 属性指向这个新元素,并且让原队尾的 next 指向这个元素

//last.next = node;

//last = node;

// 这里入队没有并发问题,因为只有获取到 putLock 独占锁以后,才可以进行此操作

last = last.next = node;

}

/**

* 等待PUT信号

* 仅在 take/poll 中调用

* 也就是说:元素入队后,如果需要,则会调用这个方法唤醒读线程来读

*/

private void signalNotFull() {

final ReentrantLock putLock = this.putLock;

putLock.lock();

try {

notFull.signal();//唤醒

} finally {

putLock.unlock();

}

}

}

Take源码分析

public class LinkedBlockingQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

public E take() throws InterruptedException {

E x;

int c = -1;

final AtomicInteger count = this.count;

final ReentrantLock takeLock = this.takeLock;

//首先,需要获取到 takeLock 才能进行出队操作

takeLock.lockInterruptibly();

try {

// 如果队列为空,等待 notEmpty 这个条件满足再继续执行

while (count.get() == 0) {

notEmpty.await();

}

//// 出队

x = dequeue();

//count 进行原子减 1

c = count.getAndDecrement();

// 如果这次出队后,队列中至少还有一个元素,那么调用 notEmpty.signal() 唤醒其他的读线程

if (c > 1)

notEmpty.signal();

} finally {

takeLock.unlock();

}

if (c == capacity)

signalNotFull();

return x;

}

/**

* 出队

*/

private E dequeue() {

// assert takeLock.isHeldByCurrentThread();

// assert head.item == null;

Node<E> h = head;

Node<E> first = h.next;

h.next = h; // help GC

head = first;

E x = first.item;

first.item = null;

return x;

}

/**

* Signals a waiting put. Called only from take/poll.

*/

private void signalNotFull() {

final ReentrantLock putLock = this.putLock;

putLock.lock();

try {

notFull.signal();

} finally {

putLock.unlock();

}

}

}

ArrayBlockingQueue对比

LinkedBlockingQueue实现一个线程添加文件对象,四个线程读取文件对象

package concurrent;

import java.io.File;

import java.io.FileFilter;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.atomic.AtomicInteger;

public class TestBlockingQueue {

static long randomTime() {

return (long) (Math.random() * 1000);

}

public static void main(String[] args) {

// 能容纳100个文件

final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);

// 线程池

final ExecutorService exec = Executors.newFixedThreadPool(5);

final File root = new File("F:\\JavaLib");

// 完成标志

final File exitFile = new File("");

// 读个数

final AtomicInteger rc = new AtomicInteger();

// 写个数

final AtomicInteger wc = new AtomicInteger();

// 读线程

Runnable read = new Runnable() {

public void run() {

scanFile(root);

scanFile(exitFile);

}

public void scanFile(File file) {

if (file.isDirectory()) {

File[] files = file.listFiles(new FileFilter() {

public boolean accept(File pathname) {

return pathname.isDirectory()

|| pathname.getPath().endsWith(".java");

}

});

for (File one : files)

scanFile(one);

} else {

try {

int index = rc.incrementAndGet();

System.out.println("Read0: " + index + " "

+ file.getPath());

queue.put(file);

} catch (InterruptedException e) {

}

}

}

};

exec.submit(read);

// 四个写线程

for (int index = 0; index < 4; index++) {

// write thread

final int NO = index;

Runnable write = new Runnable() {

String threadName = "Write" + NO;

public void run() {

while (true) {

try {

Thread.sleep(randomTime());

int index = wc.incrementAndGet();

File file = queue.take();

// 队列已经无对象

if (file == exitFile) {

// 再次添加"标志",以让其他线程正常退出

queue.put(exitFile);

break;

}

System.out.println(threadName + ": " + index + " "

+ file.getPath());

} catch (InterruptedException e) {

}

}

}

};

exec.submit(write);

}

exec.shutdown();

}

}

关注微信公众号:【入门小站】,解锁更多知识点。

【Java】Java高并发BlockingQueue重要的实现类

java多线程

阅读 10发布于 21 分钟前

本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议

avatar

入门小站

rumenz.com

41 声望

2 粉丝

0 条评论

得票时间

avatar

入门小站

rumenz.com

41 声望

2 粉丝

宣传栏

ArrayBlockingQueue

public class ArrayBlockingQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

/** 队列元素 */

final Object[] items;

/** 下一次读取操作的位置, poll, peek or remove */

int takeIndex;

/** 下一次写入操作的位置, offer, or add */

int putIndex;

/** 元素数量 */

int count;

/*

* Concurrency control uses the classic two-condition algorithm

* found in any textbook.

* 它采用一个 ReentrantLock 和相应的两个 Condition 来实现。

*/

/** Main lock guarding all access */

final ReentrantLock lock;

/** Condition for waiting takes */

private final Condition notEmpty;

/** Condition for waiting puts */

private final Condition notFull;

/** 指定大小 */

public ArrayBlockingQueue(int capacity) {

this(capacity, false);

}

/**

* 指定容量大小与指定访问策略

* @param fair 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;

*/

public ArrayBlockingQueue(int capacity, boolean fair) {}

/**

* 指定容量大小、指定访问策略与最初包含给定集合中的元素

* @param c 将此集合中的元素在构造方法期间就先添加到队列中

*/

public ArrayBlockingQueue(int capacity, boolean fair,

Collection<? extends E> c) {}

}

  • ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用一个锁对象,由此也意味着两者无法真正并行运行。按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。然而事实上并没有如此,因为ArrayBlockingQueue的数据写入已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。

  • 通过构造函数得知,参数fair控制对象内部是否采用公平锁,默认采用非公平锁。
  • items、takeIndex、putIndex、count等属性并没有使用volatile修饰,这是因为访问这些变量(通过方法获取)使用都在锁内,并不存在可见性问题,如size()

  • 另外有个独占锁lock用来对出入对操作加锁,这导致同时只有一个线程可以访问入队出队。

Put源码分析

/** 进行入队操作 */

public void put(E e) throws InterruptedException {

//e为null,则抛出NullPointerException异常

checkNotNull(e);

//获取独占锁

final ReentrantLock lock = this.lock;

/**

* lockInterruptibly()

* 获取锁定,除非当前线程为interrupted

* 如果锁没有被另一个线程占用并且立即返回,则将锁定计数设置为1。

* 如果当前线程已经保存此锁,则保持计数将递增1,该方法立即返回。

* 如果锁被另一个线程保持,则当前线程将被禁用以进行线程调度,并且处于休眠状态

*

*/

lock.lockInterruptibly();

try {

//空队列

while (count == items.length)

//进行条件等待处理

notFull.await();

//入队操作

enqueue(e);

} finally {

//释放锁

lock.unlock();

}

}

/** 真正的入队 */

private void enqueue(E x) {

// assert lock.getHoldCount() == 1;

// assert items[putIndex] == null;

//获取当前元素

final Object[] items = this.items;

//按下一个插入索引进行元素添加

items[putIndex] = x;

// 计算下一个元素应该存放的下标,可以理解为循环队列

if (++putIndex == items.length)

putIndex = 0;

count++;

//唤起消费者

notEmpty.signal();

}

Take源码分析

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

while (count == 0)

notEmpty.await();

return dequeue();

} finally {

lock.unlock();

}

}

private E dequeue() {

// assert lock.getHoldCount() == 1;

// assert items[takeIndex] != null;

final Object[] items = this.items;

@SuppressWarnings("unchecked")

E x = (E) items[takeIndex];

items[takeIndex] = null;

if (++takeIndex == items.length)

takeIndex = 0;

count--;

//这里有些特殊

if (itrs != null)

//保持队列中的元素和迭代器的元素一致

itrs.elementDequeued();

notFull.signal();

return x;

}

//该类的迭代器,所有的迭代器共享数据,队列改变会影响所有的迭代器

transient Itrs itrs = null; //其存放了目前所创建的所有迭代器。

/**

* 迭代器和它们的队列之间的共享数据,允许队列元素被删除时更新迭代器的修改。

*/

class Itrs {

void elementDequeued() {

// assert lock.getHoldCount() == 1;

if (count == 0)

//队列中数量为0的时候,队列就是空的,会将所有迭代器进行清理并移除

queueIsEmpty();

//takeIndex的下标是0,意味着队列从尾中取完了,又回到头部获取

else if (takeIndex == 0)

takeIndexWrapped();

}

/**

* 当队列为空的时候做的事情

* 1. 通知所有迭代器队列已经为空

* 2. 清空所有的弱引用,并且将迭代器置空

*/

void queueIsEmpty() {}

/**

* 将takeIndex包装成0

* 并且通知所有的迭代器,并且删除已经过期的任何对象(个人理解是置空对象)

* 也直接的说就是在Blocking队列进行出队的时候,进行迭代器中的数据同步,保持队列中的元素和迭代器的元素是一致的。

*/

void takeIndexWrapped() {}

}

Itrs迭代器创建的时机

//从这里知道,在ArrayBlockingQueue对象中调用此方法,才会生成这个对象

//那么就可以理解为,只要并未调用此方法,则ArrayBlockingQueue对象中的Itrs对象则为空

public Iterator<E> iterator() {

return new Itr();

}

private class Itr implements Iterator<E> {

Itr() {

//这里就是生产它的地方

//count等于0的时候,创建的这个迭代器是个无用的迭代器,可以直接移除,进入detach模式。

//否则就把当前队列的读取位置给迭代器当做下一个元素,cursor存储下个元素的位置。

if (count == 0) {

// assert itrs == null;

cursor = NONE;

nextIndex = NONE;

prevTakeIndex = DETACHED;

} else {

final int takeIndex = ArrayBlockingQueue.this.takeIndex;

prevTakeIndex = takeIndex;

nextItem = itemAt(nextIndex = takeIndex);

cursor = incCursor(takeIndex);

if (itrs == null) {

itrs = new Itrs(this);

} else {

itrs.register(this); // in this order

itrs.doSomeSweeping(false);

}

prevCycles = itrs.cycles;

// assert takeIndex >= 0;

// assert prevTakeIndex == takeIndex;

// assert nextIndex >= 0;

// assert nextItem != null;

}

}

}

代码演示

package com.rumenz.task;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

/**

* @className: BlockingQuqueExample

* @description: TODO 类描述

* @author: mac

* @date: 2021/1/20

**/

public class BlockingQueueExample {

private static volatile Boolean flag=false;

public static void main(String[] args) {

BlockingQueue blockingQueue=new ArrayBlockingQueue(1024);

ExecutorService executorService = Executors.newFixedThreadPool(2);

executorService.execute(()->{

try{

blockingQueue.put(1);

Thread.sleep(2000);

blockingQueue.put(3);

flag=true;

}catch (Exception e){

e.printStackTrace();

}

});

executorService.execute(()->{

try {

while (!flag){

Integer i = (Integer) blockingQueue.take();

System.out.println(i);

}

}catch (Exception e){

e.printStackTrace();

}

});

executorService.shutdown();

}

}

LinkedBlockingQueue

public class LinkedBlockingQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

//队列的容量,指定大小或为默认值Integer.MAX_VALUE

private final int capacity;

//元素的数量

private final AtomicInteger count = new AtomicInteger();

//队列头节点,始终满足head.item==null

transient Node<E> head;

//队列的尾节点,始终满足last.next==null

private transient Node<E> last;

/** Lock held by take, poll, etc */

//出队的锁:take, poll, peek 等读操作的方法需要获取到这个锁

private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */

//当队列为空时,保存执行出队的线程:如果读操作的时候队列是空的,那么等待 notEmpty 条件

private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */

//入队的锁:put, offer 等写操作的方法需要获取到这个锁

private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */

//当队列满时,保存执行入队的线程:如果写操作的时候队列是满的,那么等待 notFull 条件

private final Condition notFull = putLock.newCondition();

//传说中的无界队列

public LinkedBlockingQueue() {}

//传说中的有界队列

public LinkedBlockingQueue(int capacity) {

if (capacity <= 0) throw new IllegalArgumentException();

this.capacity = capacity;

last = head = new Node<E>(null);

}

//传说中的无界队列

public LinkedBlockingQueue(Collection<? extends E> c){}

/**

* 链表节点类

*/

static class Node<E> {

E item;

/**

* One of:

* - 真正的继任者节点

* - 这个节点,意味着继任者是head.next

* - 空,意味着没有后继者(这是最后一个节点)

*/

Node<E> next;

Node(E x) { item = x; }

}

}

  • 如果需要获取(take)一个元素,需要获取takeLock锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。
  • 如果要插入(put)一个元素,需要获取putLock锁,但是获取了锁还不够,如果队列此时已满,还是需要队列不满(notFull)的这个条件(Condition)。

Put源码分析

public class LinkedBlockingQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

/**

* 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。

*/

public void put(E e) throws InterruptedException {

if (e == null) throw new NullPointerException();

// 如果你纠结这里为什么是 -1,可以看看 offer 方法。这就是个标识成功、失败的标志而已。

int c = -1;

//包装成node节点

Node<E> node = new Node<E>(e);

final ReentrantLock putLock = this.putLock;

final AtomicInteger count = this.count;

//获取锁定

putLock.lockInterruptibly();

try {

/** 如果队列满,等待 notFull 的条件满足。 */

while (count.get() == capacity) {

notFull.await();

}

//入队

enqueue(node);

//原子性自增

c = count.getAndIncrement();

// 如果这个元素入队后,还有至少一个槽可以使用,调用 notFull.signal() 唤醒等待线程。

// 哪些线程会等待在 notFull 这个 Condition 上呢?

if (c + 1 < capacity)

notFull.signal();

} finally {

//解锁

putLock.unlock();

}

// 如果 c == 0,那么代表队列在这个元素入队前是空的(不包括head空节点),

// 那么所有的读线程都在等待 notEmpty 这个条件,等待唤醒,这里做一次唤醒操作

if (c == 0)

signalNotEmpty();

}

/** 链接节点在队列末尾 */

private void enqueue(Node<E> node) {

// assert putLock.isHeldByCurrentThread();

// assert last.next == null;

// 入队的代码非常简单,就是将 last 属性指向这个新元素,并且让原队尾的 next 指向这个元素

//last.next = node;

//last = node;

// 这里入队没有并发问题,因为只有获取到 putLock 独占锁以后,才可以进行此操作

last = last.next = node;

}

/**

* 等待PUT信号

* 仅在 take/poll 中调用

* 也就是说:元素入队后,如果需要,则会调用这个方法唤醒读线程来读

*/

private void signalNotFull() {

final ReentrantLock putLock = this.putLock;

putLock.lock();

try {

notFull.signal();//唤醒

} finally {

putLock.unlock();

}

}

}

Take源码分析

public class LinkedBlockingQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

public E take() throws InterruptedException {

E x;

int c = -1;

final AtomicInteger count = this.count;

final ReentrantLock takeLock = this.takeLock;

//首先,需要获取到 takeLock 才能进行出队操作

takeLock.lockInterruptibly();

try {

// 如果队列为空,等待 notEmpty 这个条件满足再继续执行

while (count.get() == 0) {

notEmpty.await();

}

//// 出队

x = dequeue();

//count 进行原子减 1

c = count.getAndDecrement();

// 如果这次出队后,队列中至少还有一个元素,那么调用 notEmpty.signal() 唤醒其他的读线程

if (c > 1)

notEmpty.signal();

} finally {

takeLock.unlock();

}

if (c == capacity)

signalNotFull();

return x;

}

/**

* 出队

*/

private E dequeue() {

// assert takeLock.isHeldByCurrentThread();

// assert head.item == null;

Node<E> h = head;

Node<E> first = h.next;

h.next = h; // help GC

head = first;

E x = first.item;

first.item = null;

return x;

}

/**

* Signals a waiting put. Called only from take/poll.

*/

private void signalNotFull() {

final ReentrantLock putLock = this.putLock;

putLock.lock();

try {

notFull.signal();

} finally {

putLock.unlock();

}

}

}

ArrayBlockingQueue对比

LinkedBlockingQueue实现一个线程添加文件对象,四个线程读取文件对象

package concurrent;

import java.io.File;

import java.io.FileFilter;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.atomic.AtomicInteger;

public class TestBlockingQueue {

static long randomTime() {

return (long) (Math.random() * 1000);

}

public static void main(String[] args) {

// 能容纳100个文件

final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);

// 线程池

final ExecutorService exec = Executors.newFixedThreadPool(5);

final File root = new File("F:\\JavaLib");

// 完成标志

final File exitFile = new File("");

// 读个数

final AtomicInteger rc = new AtomicInteger();

// 写个数

final AtomicInteger wc = new AtomicInteger();

// 读线程

Runnable read = new Runnable() {

public void run() {

scanFile(root);

scanFile(exitFile);

}

public void scanFile(File file) {

if (file.isDirectory()) {

File[] files = file.listFiles(new FileFilter() {

public boolean accept(File pathname) {

return pathname.isDirectory()

|| pathname.getPath().endsWith(".java");

}

});

for (File one : files)

scanFile(one);

} else {

try {

int index = rc.incrementAndGet();

System.out.println("Read0: " + index + " "

+ file.getPath());

queue.put(file);

} catch (InterruptedException e) {

}

}

}

};

exec.submit(read);

// 四个写线程

for (int index = 0; index < 4; index++) {

// write thread

final int NO = index;

Runnable write = new Runnable() {

String threadName = "Write" + NO;

public void run() {

while (true) {

try {

Thread.sleep(randomTime());

int index = wc.incrementAndGet();

File file = queue.take();

// 队列已经无对象

if (file == exitFile) {

// 再次添加"标志",以让其他线程正常退出

queue.put(exitFile);

break;

}

System.out.println(threadName + ": " + index + " "

+ file.getPath());

} catch (InterruptedException e) {

}

}

}

};

exec.submit(write);

}

exec.shutdown();

}

}

关注微信公众号:【入门小站】,解锁更多知识点。

【Java】Java高并发BlockingQueue重要的实现类

以上是 【Java】Java高并发BlockingQueue重要的实现类 的全部内容, 来源链接: utcz.com/a/106754.html

回到顶部