Java阻塞队列的简单实现

编程

来源:

https://blog.biezhi.me/2019/01/simple-blocking-queue.html

Java 并发常用的组件中有一种队列叫阻塞队列(BlockingQueue),当队列为空时,获取元素的线程会阻塞等待直到队列有数据;当队列满时,想要存储元素的线程会阻塞等待直到队列有空间。我们经常会用这种数据结构可以实现生产者、消费者模型。

本文会通过两种方式来实现简单的有界阻塞队列,在最后分别测试不同实现的性能差异。

Monitor 和 Condition

看过 Java 并发相关书籍的同学应该都见过 Monitor 这个词,有人称为监视器也有人叫它管程,不过都是一个意思:一个同步工具,相当于操作系统中的互斥量(mutex),即值为 1 的信号量。

synchronized 关键词的背后就是靠 monitor 实现的,monitor 的重要特点是,同一个时刻,只有一个线程能进入 monitor 中定义的临界区,这使得 monitor 能够达到互斥的效果。但仅仅有互斥的作用是不够的,无法进入 monitor 临界区的线程应该被阻塞,在必要的时候可以被唤醒,所以 Java 提供了 waitnotifynotifyAll 的 API 给我们使用。

  • wait(): 让当前线程进入等待队列,同时会释放锁,直到被唤醒。
  • notify(): 从条件队列中随机唤醒一个线程,让它去参与锁竞争
  • notifyAll(): 唤醒条件队列中的所有线程,让它们去参与锁竞争


实现同步的一种方式是使用 synchronized 关键字,还可以使用 Lock 接口下的实现来完成,比如 ReentrantLock,它是一把重入锁(synchronized 也是),基于 AQS 并发框架实现。我们可以使用它来进行加锁和释放锁,如果遇到有条件需要阻塞可以使用 Condition API。

  • Lock#newCondition(): 创建一个新的条件
  • Condition#await(): 让当前线程等待
  • Condition#signal(): 唤醒一个等待线程

条件和锁总是息息相关,在没有 Lock 接口的时候你会发现 monitor 机制有一个严重的问题:一把锁只能对应一个条件(也就是只可以做一次 wait),那么在唤醒的时候就可能出现唤醒丢失。举个例子,在两个方法上有不同的条件会导致阻塞,它们持有一把锁,唤醒时候如果用 notify 只会从条件队列选择一个,使用 notifyAll 会带来大量的 CPU 上下文切换和锁竞争,伪代码如下:

 1synchronized void foo() {

2 while(CONDITION1){

3 wait();

4 }

5 notifyAll();

6}

7synchronized void bar() {

8 while(CONDITION2){

9 wait();

10 }

11 notifyAll();

12}

具体实现

实现思路

我们通过定义一个 Queue 接口来实现两种队列,该队列是有界队列,使用数组的方式实现,如果你有兴趣也可以使用链表或栈来实现这个队列。提供 put 方法添加元素(满了则阻塞),take 方法弹出元素(没有元素则阻塞)。

定义接口

 1public interface Queue<E> {

2

3 // 添加新元素,当队列满则阻塞

4 void put(E e) throws InterruptedException;

5

6 // 弹出队头元素,当队列空则阻塞

7 E take() throws InterruptedException;

8

9 // 队列元素个数

10 int size();

11

12 // 队列是否为空

13 boolean isEmpty();

14

15}

基于 synchronized 的实现

核心思路:

  • 添加元素时队列满则阻塞
  • 弹出元素时队列空则阻塞
  • 添加元素后唤醒消费者
  • 弹出元素后唤醒生产者

 1public class BlockingQueueWithSync<E> implements Queue<E> {

2

3 private E[] array;

4 private int head; // 队头指针

5 private int tail; // 队尾指针

6

7 private volatile int size; // 队列元素个数

8

9 public BlockingQueueWithSync(int capacity) {

10 array = (E[]) new Object[capacity];

11 }

12

13 @Override

14 public synchronized void put(E e) throws InterruptedException {

15 // 当队列满的时候阻塞

16 while (size == array.length) {

17 this.wait();

18 }

19

20 array[tail] = e;

21 // 队列装满后索引归零

22 if (++tail == array.length) {

23 tail = 0;

24 }

25 ++size;

26 // 通知其他消费端有数据了

27 this.notifyAll();

28 }

29

30 @Override

31 public synchronized E take() throws InterruptedException {

32 // 当队列空的时候阻塞

33 while (isEmpty()) {

34 this.wait();

35 }

36

37 E element = array[head];

38 // 消费完后从0开始

39 if (++head == array.length) {

40 head = 0;

41 }

42 --size;

43 // 通知其他生产者可以生产了

44 this.notifyAll();

45 return element;

46 }

47

48 @Override

49 public synchronized boolean isEmpty() {

50 return size == 0;

51 }

52

53 @Override

54 public synchronized int size() {

55 return size;

56 }

57

58}

基于 ReentrantLock 的实现

 1public class BlockingQueueWithLock<E> implements Queue<E> {

2

3 private E[] array;

4 private int head;

5 private int tail;

6

7 private volatile int size;

8

9 private Lock lock = new ReentrantLock();

10 private Condition notFull = lock.newCondition();

11 private Condition notEmpty = lock.newCondition();

12

13 public BlockingQueueWithLock(int capacity) {

14 array = (E[]) new Object[capacity];

15 }

16

17 @Override

18 public void put(E e) throws InterruptedException {

19 lock.lockInterruptibly();

20 try {

21 // 队列满,阻塞

22 while (size == array.length) {

23 notFull.await();

24 }

25 array[tail] = e;

26 if (++tail == array.length) {

27 tail = 0;

28 }

29 ++size;

30 notEmpty.signal();

31 } finally {

32 lock.unlock();

33 }

34 }

35

36 @Override

37 public E take() throws InterruptedException {

38 lock.lockInterruptibly();

39 try {

40 // 队列空,阻塞

41 while (isEmpty()) {

42 notEmpty.await();

43 }

44 E element = array[head];

45 if (++head == array.length) {

46 head = 0;

47 }

48 --size;

49 // 通知isFull条件队列有元素出去

50 notFull.signal();

51 return element;

52 } finally {

53 lock.unlock();

54 }

55 }

56

57 @Override

58 public boolean isEmpty() {

59 lock.lock();

60 try {

61 return size == 0;

62 } finally {

63 lock.unlock();

64 }

65 }

66

67 @Override

68 public int size() {

69 lock.lock();

70 try {

71 return size;

72 } finally {

73 lock.unlock();

74 }

75 }

76

77}

对比性能

 1public class Benchmark {

2

3 @Test

4 public void testWithMonitor() {

5 Queue<Integer> queue = new BlockingQueueWithSync<>(5);

6 execute(queue);

7 }

8

9 @Test

10 public void testWithCondition() {

11 Queue<Integer> queue = new BlockingQueueWithLock<>(5);

12 execute(queue);

13 }

14

15 private void execute(Queue<Integer> queue) {

16 ExecutorService executorService = Executors.newCachedThreadPool();

17 for (int i = 1; i <= 1000; i++) {

18 final int finalNum = i;

19 executorService.execute(() -> {

20 try {

21 queue.put(finalNum);

22 Integer take = queue.take();

23 System.out.println("item: " + take);

24 } catch (InterruptedException e) {

25 e.printStackTrace();

26 }

27 });

28 }

29 executorService.shutdown();

30 }

31

32}

这个测试程序让 2 个队列的可存储的元素数都为 5,开启 1000 个线程进行 puttake 操作,运行后查看总耗时。

可以看出,使用 synchronized 的方式性能较差。

● java匠人手法-优雅的处理空值

● REST API 的安全基础

● 深入浅出 CAS

● Spring Boot Devtools热部署

● Spring Boot AOP记录用户操作日志

● Spring Boot整合Mongo DB

● 【图文讲解】你一定能看懂的HTTPS原理剖析!

● 基础面试,为什么面试官总喜欢问String?

● Spring Boot Admin 2.2.0发布,支持最新Spring Boot/Cloud之外,新增中文展示!

● 你应该知道的 @ConfigurationProperties 注解的使用姿势,这一篇就够了

如有收获,请帮忙转发,您的鼓励是作者最大的动力,谢谢!

本文由博客一文多发平台 OpenWrite 发布!

以上是 Java阻塞队列的简单实现 的全部内容, 来源链接: utcz.com/z/511753.html

回到顶部