Java之BlockingQueue

编程

LinkedBlockingQueue

       基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

DelayQueue

       DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
使用场景:
       DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。

PriorityBlockingQueue

       基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

SynchronousQueue

       一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
  声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:
  如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
  但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

4、秒杀demo

秒杀对列

package com.hsshy.beam.queue.jvm;

import com.hsshy.beam.queue.entity.SuccessKilled;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.LinkedBlockingQueue;

/**

* 秒杀队列(固定长度为100)

* @author 科帮网 By https://blog.52itstyle.com

* 创建时间 2018年5月10日

*/

publicclassSeckillQueue{

//队列大小

static final int QUEUE_MAX_SIZE =100;

/** 用于多线程间下单的队列 */

static BlockingQueue<SuccessKilled> blockingQueue =new LinkedBlockingQueue<SuccessKilled>(QUEUE_MAX_SIZE);

/**

* 私有的默认构造子,保证外界无法直接实例化

*/

privateSeckillQueue(){};

/**

* 类级的内部类,也就是静态的成员式内部类,该内部类的实例与外部类的实例

* 没有绑定关系,而且只有被调用到才会装载,从而实现了延迟加载

*/

privatestaticclassSingletonHolder{

/**

* 静态初始化器,由JVM来保证线程安全

*/

privatestatic SeckillQueue queue =newSeckillQueue();

}

//单例队列

publicstatic SeckillQueue getMailQueue(){

return SingletonHolder.queue;

}

/**

* 生产入队

* @param kill

* @throws InterruptedException

* add(e) 队列未满时,返回true;队列满则抛出IllegalStateException(“Queue full”)异常——AbstractQueue

* put(e) 队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。

* offer(e) 队列未满时,返回true;队列满时返回false。非阻塞立即返回。

* offer(e, time, unit) 设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false,插入成功返回true。

*/

public Boolean produce(SuccessKilled kill) throws InterruptedException {

return blockingQueue.offer(kill);

}

/**

* 消费出队

* poll() 获取并移除队首元素,在指定的时间内去轮询队列看有没有首元素有则返回,否者超时后返回null

* take() 与带超时时间的poll类似不同在于take时候如果当前队列空了它会一直等待其他线程调用notEmpty.signal()才会被唤醒

*/

public SuccessKilled consume() throws InterruptedException {

return blockingQueue.take();

}

// 获取队列大小

publicintsize(){

return blockingQueue.size();

}

}

springboot启动时执行类

packagecom.hsshy.beam.queue.jvm;

importcom.hsshy.beam.queue.entity.SuccessKilled;

importcom.hsshy.beam.queue.service.ISeckillService;

importorg.springframework.beans.factory.annotation.Autowired;

importorg.springframework.boot.ApplicationArguments;

importorg.springframework.boot.ApplicationRunner;

importorg.springframework.stereotype.Component;

/**

* 消费秒杀队列

* 创建者 科帮网

* 创建时间 2018年4月3日

*/

@Component

publicclassTaskRunnerimplementsApplicationRunner{

@Autowired

privateISeckillService seckillService;

@Override

publicvoidrun(ApplicationArgumentsvar)throwsException{

while(true){

//进程内队列

SuccessKilled kill =SeckillQueue.getMailQueue().consume();

if(kill!=null){

seckillService.startSeckil(kill.getId(), kill.getUserId());

}

}

}

}

controller层

@ApiOperation(value="秒杀柒(进程内队列)",nickname="科帮网")

@PostMapping("/startQueue")

publicRstartQueue(long seckillId){

seckillService.deleteSeckill(seckillId);

finallong killId = seckillId;

LOGGER.info("开始秒杀柒(正常)");

for(int i=0;i<1000;i++){

finallong userId = i;

Runnable task =newRunnable(){

@Override

publicvoidrun(){

SuccessKilled kill =newSuccessKilled();

kill.setId(killId);

kill.setUserId(userId);

try{

Boolean flag =SeckillQueue.getMailQueue().produce(kill);

if(flag){

LOGGER.info("用户:{}{}",kill.getUserId(),"秒杀成功");

}else{

LOGGER.info("用户:{}{}",userId,"秒杀失败");

}

}catch(InterruptedException e){

e.printStackTrace();

LOGGER.info("用户:{}{}",userId,"秒杀失败");

}

}

};

executor.execute(task);

}

try{

Thread.sleep(10000);

Long seckillCount = seckillService.getSeckillCount(seckillId);

LOGGER.info("一共秒杀出{}件商品",seckillCount);

}catch(InterruptedException e){

e.printStackTrace();

}

returnR.ok();

}

5、延时队列demo

消息体

packagecom.hsshy.beam.queue.delay;

importjava.util.concurrent.Delayed;

importjava.util.concurrent.TimeUnit;

/**

* 消息体定义 实现Delayed接口就是实现两个方法即compareTo 和 getDelay最重要的就是getDelay方法,这个方法用来判断是否到期……

*

*/

publicclassMessageimplementsDelayed{

privateint id;

privateString body;// 消息内容

privatelong excuteTime;// 延迟时长,这个是必须的属性因为要按照这个判断延时时长。

publicintgetId(){

return id;

}

publicStringgetBody(){

return body;

}

publiclonggetExcuteTime(){

return excuteTime;

}

publicMessage(int id,String body,long delayTime){

this.id = id;

this.body = body;

this.excuteTime =TimeUnit.NANOSECONDS.convert(delayTime,TimeUnit.MILLISECONDS)+System.nanoTime();

}

// 自定义实现比较方法返回 1 0 -1三个参数

@Override

publicintcompareTo(Delayed delayed){

Message msg =(Message) delayed;

returnInteger.valueOf(this.id)>Integer.valueOf(msg.id)?1

:(Integer.valueOf(this.id)<Integer.valueOf(msg.id)?-1:0);

}

// 延迟任务是否到时就是按照这个方法判断如果返回的是负数则说明到期否则还没到期

@Override

publiclonggetDelay(TimeUnit unit){

return unit.convert(this.excuteTime -System.nanoTime(),TimeUnit.NANOSECONDS);

}

}

消费者

package com.hsshy.beam.queue.delay;

import java.util.concurrent.DelayQueue;

publicclassConsumer implements Runnable {

// 延时队列 ,消费者从其中获取消息进行消费

private DelayQueue<Message> queue;

publicConsumer(DelayQueue<Message> queue){

this.queue = queue;

}

@Override

publicvoidrun(){

while(true){

try{

Message take = queue.take();

System.out.println("消费消息id:"+ take.getId()+" 消息体:"+ take.getBody());

}catch(InterruptedException e){

e.printStackTrace();

}

}

}

}

测试类

package com.hsshy.beam.queue.delay;

import java.util.concurrent.DelayQueue;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

publicclassDelayQueueTest{

publicstaticvoidmain(String[] args){

// 创建延时队列

DelayQueue<Message> queue =new DelayQueue<Message>();

// 添加延时消息,m1 延时3s

Message m1 =newMessage(1,"world",3000);

// 添加延时消息,m2 延时10s

Message m2 =newMessage(2,"hello",10000);

//将延时消息放到延时队列中

queue.offer(m2);

queue.offer(m1);

// 启动消费线程 消费添加到延时队列中的消息,前提是任务到了延期时间

ExecutorService exec = Executors.newFixedThreadPool(1);

exec.execute(newConsumer(queue));

exec.shutdown();

}

}

参考链接:

https://www.cnblogs.com/KingIceMou/p/8075343.html
http://www.cnblogs.com/WangHaiMing/p/8798709.html
https://gitee.com/52itstyle/spring-boot-seckill

以上是 Java之BlockingQueue 的全部内容, 来源链接: utcz.com/z/512941.html

回到顶部