【Java】Java高并发BlockingQueue重要的实现类二

Java高并发BlockingQueue重要的实现类二

入门小站发布于 6 分钟前

DelayQueue

  • CompareTo(Delayed o):Delayed接口继承了Comparable接口,因此有了这个方法
  • getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,时间单位由单位参数指定

DelayQueue使用场景

  • 关闭空闲链接。服务器中,有很多客户端链接,空闲一段时间后需要关闭。
  • 缓存超过了缓存时间,就需要从缓存中移除。

DelayQueue超时订单处理案例

package com.rumenz.learn.delayqueue;

import java.util.concurrent.Delayed;

import java.util.concurrent.TimeUnit;

//DelayQueue里面的元素必须实现Delayed

public class Item<T> implements Delayed {

private Long expireTime;

private T data;

public Item(Long expireTime, T data) {

this.expireTime = expireTime+System.currentTimeMillis();

this.data = data;

}

@Override

public long getDelay(TimeUnit unit) {

long d = unit.convert(this.expireTime - System.currentTimeMillis(),unit);

return d;

}

@Override

public int compareTo(Delayed o) {

long d=getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS);

if(d==0){

return 0;

}

return d>0?1:-1;

}

public Long getExpireTime() {

return expireTime;

}

public void setExpireTime(Long expireTime) {

this.expireTime = expireTime;

}

public T getData() {

return data;

}

public void setData(T data) {

this.data = data;

}

}

// 订单实体类

package com.rumenz.learn.delayqueue;

public class OrderItem {

private Double orderAmount;

private String orderNo;

//0未支付 1支付了

private Integer orderStatus;

public OrderItem(Double orderAmount, String orderNo, Integer orderStatus) {

this.orderAmount = orderAmount;

this.orderNo = orderNo;

this.orderStatus = orderStatus;

}

public Double getOrderAmount() {

return orderAmount;

}

public void setOrderAmount(Double orderAmount) {

this.orderAmount = orderAmount;

}

public String getOrderNo() {

return orderNo;

}

public void setOrderNo(String orderNo) {

this.orderNo = orderNo;

}

public Integer getOrderStatus() {

return orderStatus;

}

public void setOrderStatus(Integer orderStatus) {

this.orderStatus = orderStatus;

}

}

//

package com.rumenz.learn.delayqueue;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Iterator;

import java.util.Map;

import java.util.Random;

import java.util.concurrent.*;

public class DelayQueueExample {

//3个线程 1个线程下单 1个线程支付 1个线程关闭超时订单 订单支付超时时间为10s

public static void main(String[] args) {

ExecutorService executorService = Executors.newFixedThreadPool(3);

DelayQueue<Item<OrderItem>> delayeds = new DelayQueue<>();

ConcurrentMap<String, OrderItem> map = new ConcurrentHashMap<>();

//下单线程

executorService.execute(()->{

SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

Integer orderNo=100;

while (true){

try{

Thread.sleep(3000);

Integer amount = new Random().nextInt(1000);

OrderItem orderItem=new OrderItem(amount.doubleValue(), String.valueOf(orderNo), 0);

Item<OrderItem> item=new Item<>(10*1000L,orderItem);

Date date=new Date();

date.setTime(item.getExpireTime());

System.out.println("=======================下单==========================");

System.out.println("生成订单时间:"+simpleDateFormat.format(new Date()));

System.out.println("订单编号:"+orderNo);

System.out.println("订单金额:"+orderItem.getOrderAmount());

System.out.println("支付过期时间:"+simpleDateFormat.format(date));

System.out.println("========================下单=========================");

map.put(String.valueOf(orderNo),orderItem);

orderNo++;

delayeds.offer(item);

}catch (Exception e){

e.printStackTrace();

}

}

});

//支付线程

executorService.execute(()->{

while (true){

try {

//随机等待 再支付

Thread.sleep(new Random().nextInt(15)*1000);

String orderNo="";

Iterator<Map.Entry<String, OrderItem>> iterator = map.entrySet().iterator();

if(iterator.hasNext()){

OrderItem orderItem = iterator.next().getValue();

orderItem.setOrderStatus(1);

orderNo=orderItem.getOrderNo();

System.out.println("-----------------------支付订单-----------------------");

System.out.println("订单支付"+orderNo);

System.out.println("支付金额"+orderItem.getOrderAmount());

System.out.println("-----------------------支付订单-----------------------");

}

map.remove(orderNo);

}catch (Exception e){

e.printStackTrace();

}

}

});

//关系过期的订单

executorService.execute(()->{

SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

while (true){

try{

Item<OrderItem> item = delayeds.take();

OrderItem data = item.getData();

Date date=new Date();

date.setTime(item.getExpireTime());

if(data.getOrderStatus()==0){

System.out.println("########################过期订单########################");

System.out.println("订单编号:"+data.getOrderNo());

System.out.println("订单金额:"+data.getOrderAmount());

System.out.println("订单到期支付时间:"+simpleDateFormat.format(date));

System.out.println("########################过期订单########################");

}

map.remove(data.getOrderNo());

}catch (Exception e){

e.printStackTrace();

}

}

});

executorService.shutdown();

}

}

SynchronousQueue

public class SynchronousQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

//内部栈

static final class TransferStack<E> extends Transferer<E> {}

//内部队列

static final class TransferQueue<E> extends Transferer<E> {}

public SynchronousQueue() {this(false);}

public SynchronousQueue(boolean fair) {

transferer = fair ?

new TransferQueue<E>() : new TransferStack<E>();

}

}

SynchronousQueue代码演示

package com.rumenz.learn.synchronousqueue;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.SynchronousQueue;

import java.util.concurrent.TimeUnit;

public class SynchronousQueueExample {

public static void main(String[] args) {

SynchronousQueue<String> queue = new SynchronousQueue<>();

ExecutorService executorService = Executors.newFixedThreadPool(2);

executorService.execute(()->{

try {

System.out.println(Thread.currentThread().getName()+"put 1");

queue.put("1");

System.out.println(Thread.currentThread().getName()+"put 2");

queue.put("2");

System.out.println(Thread.currentThread().getName()+"put 3");

queue.put("3");

System.out.println(Thread.currentThread().getName()+"put 4");

queue.put("4");

}catch (Exception e){

e.printStackTrace();

}

});

executorService.execute(()->{

try{

TimeUnit.SECONDS.sleep(1);

System.out.println("获取数据:"+queue.take());

TimeUnit.SECONDS.sleep(1);

System.out.println("获取数据:"+queue.take());

TimeUnit.SECONDS.sleep(1);

System.out.println("获取数据:"+queue.take());

TimeUnit.SECONDS.sleep(1);

System.out.println("获取数据:"+queue.take());

}catch (Exception e){

e.printStackTrace();

}

});

executorService.shutdown();

}

}

关注微信公众号:【入门小站】,关注更多知识点

【Java】Java高并发BlockingQueue重要的实现类二

java多线程

阅读 4发布于 6 分钟前

本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议

avatar

入门小站

rumenz.com

41 声望

2 粉丝

0 条评论

得票时间

avatar

入门小站

rumenz.com

41 声望

2 粉丝

宣传栏

DelayQueue

  • CompareTo(Delayed o):Delayed接口继承了Comparable接口,因此有了这个方法
  • getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,时间单位由单位参数指定

DelayQueue使用场景

  • 关闭空闲链接。服务器中,有很多客户端链接,空闲一段时间后需要关闭。
  • 缓存超过了缓存时间,就需要从缓存中移除。

DelayQueue超时订单处理案例

package com.rumenz.learn.delayqueue;

import java.util.concurrent.Delayed;

import java.util.concurrent.TimeUnit;

//DelayQueue里面的元素必须实现Delayed

public class Item<T> implements Delayed {

private Long expireTime;

private T data;

public Item(Long expireTime, T data) {

this.expireTime = expireTime+System.currentTimeMillis();

this.data = data;

}

@Override

public long getDelay(TimeUnit unit) {

long d = unit.convert(this.expireTime - System.currentTimeMillis(),unit);

return d;

}

@Override

public int compareTo(Delayed o) {

long d=getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS);

if(d==0){

return 0;

}

return d>0?1:-1;

}

public Long getExpireTime() {

return expireTime;

}

public void setExpireTime(Long expireTime) {

this.expireTime = expireTime;

}

public T getData() {

return data;

}

public void setData(T data) {

this.data = data;

}

}

// 订单实体类

package com.rumenz.learn.delayqueue;

public class OrderItem {

private Double orderAmount;

private String orderNo;

//0未支付 1支付了

private Integer orderStatus;

public OrderItem(Double orderAmount, String orderNo, Integer orderStatus) {

this.orderAmount = orderAmount;

this.orderNo = orderNo;

this.orderStatus = orderStatus;

}

public Double getOrderAmount() {

return orderAmount;

}

public void setOrderAmount(Double orderAmount) {

this.orderAmount = orderAmount;

}

public String getOrderNo() {

return orderNo;

}

public void setOrderNo(String orderNo) {

this.orderNo = orderNo;

}

public Integer getOrderStatus() {

return orderStatus;

}

public void setOrderStatus(Integer orderStatus) {

this.orderStatus = orderStatus;

}

}

//

package com.rumenz.learn.delayqueue;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Iterator;

import java.util.Map;

import java.util.Random;

import java.util.concurrent.*;

public class DelayQueueExample {

//3个线程 1个线程下单 1个线程支付 1个线程关闭超时订单 订单支付超时时间为10s

public static void main(String[] args) {

ExecutorService executorService = Executors.newFixedThreadPool(3);

DelayQueue<Item<OrderItem>> delayeds = new DelayQueue<>();

ConcurrentMap<String, OrderItem> map = new ConcurrentHashMap<>();

//下单线程

executorService.execute(()->{

SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

Integer orderNo=100;

while (true){

try{

Thread.sleep(3000);

Integer amount = new Random().nextInt(1000);

OrderItem orderItem=new OrderItem(amount.doubleValue(), String.valueOf(orderNo), 0);

Item<OrderItem> item=new Item<>(10*1000L,orderItem);

Date date=new Date();

date.setTime(item.getExpireTime());

System.out.println("=======================下单==========================");

System.out.println("生成订单时间:"+simpleDateFormat.format(new Date()));

System.out.println("订单编号:"+orderNo);

System.out.println("订单金额:"+orderItem.getOrderAmount());

System.out.println("支付过期时间:"+simpleDateFormat.format(date));

System.out.println("========================下单=========================");

map.put(String.valueOf(orderNo),orderItem);

orderNo++;

delayeds.offer(item);

}catch (Exception e){

e.printStackTrace();

}

}

});

//支付线程

executorService.execute(()->{

while (true){

try {

//随机等待 再支付

Thread.sleep(new Random().nextInt(15)*1000);

String orderNo="";

Iterator<Map.Entry<String, OrderItem>> iterator = map.entrySet().iterator();

if(iterator.hasNext()){

OrderItem orderItem = iterator.next().getValue();

orderItem.setOrderStatus(1);

orderNo=orderItem.getOrderNo();

System.out.println("-----------------------支付订单-----------------------");

System.out.println("订单支付"+orderNo);

System.out.println("支付金额"+orderItem.getOrderAmount());

System.out.println("-----------------------支付订单-----------------------");

}

map.remove(orderNo);

}catch (Exception e){

e.printStackTrace();

}

}

});

//关系过期的订单

executorService.execute(()->{

SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

while (true){

try{

Item<OrderItem> item = delayeds.take();

OrderItem data = item.getData();

Date date=new Date();

date.setTime(item.getExpireTime());

if(data.getOrderStatus()==0){

System.out.println("########################过期订单########################");

System.out.println("订单编号:"+data.getOrderNo());

System.out.println("订单金额:"+data.getOrderAmount());

System.out.println("订单到期支付时间:"+simpleDateFormat.format(date));

System.out.println("########################过期订单########################");

}

map.remove(data.getOrderNo());

}catch (Exception e){

e.printStackTrace();

}

}

});

executorService.shutdown();

}

}

SynchronousQueue

public class SynchronousQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable {

//内部栈

static final class TransferStack<E> extends Transferer<E> {}

//内部队列

static final class TransferQueue<E> extends Transferer<E> {}

public SynchronousQueue() {this(false);}

public SynchronousQueue(boolean fair) {

transferer = fair ?

new TransferQueue<E>() : new TransferStack<E>();

}

}

SynchronousQueue代码演示

package com.rumenz.learn.synchronousqueue;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.SynchronousQueue;

import java.util.concurrent.TimeUnit;

public class SynchronousQueueExample {

public static void main(String[] args) {

SynchronousQueue<String> queue = new SynchronousQueue<>();

ExecutorService executorService = Executors.newFixedThreadPool(2);

executorService.execute(()->{

try {

System.out.println(Thread.currentThread().getName()+"put 1");

queue.put("1");

System.out.println(Thread.currentThread().getName()+"put 2");

queue.put("2");

System.out.println(Thread.currentThread().getName()+"put 3");

queue.put("3");

System.out.println(Thread.currentThread().getName()+"put 4");

queue.put("4");

}catch (Exception e){

e.printStackTrace();

}

});

executorService.execute(()->{

try{

TimeUnit.SECONDS.sleep(1);

System.out.println("获取数据:"+queue.take());

TimeUnit.SECONDS.sleep(1);

System.out.println("获取数据:"+queue.take());

TimeUnit.SECONDS.sleep(1);

System.out.println("获取数据:"+queue.take());

TimeUnit.SECONDS.sleep(1);

System.out.println("获取数据:"+queue.take());

}catch (Exception e){

e.printStackTrace();

}

});

executorService.shutdown();

}

}

关注微信公众号:【入门小站】,关注更多知识点

【Java】Java高并发BlockingQueue重要的实现类二

以上是 【Java】Java高并发BlockingQueue重要的实现类二 的全部内容, 来源链接: utcz.com/a/107207.html

回到顶部