【Java】Java高并发BlockingQueue重要的实现类二
Java高并发BlockingQueue重要的实现类二
入门小站发布于 6 分钟前
DelayQueue
- CompareTo(Delayed o):Delayed接口继承了Comparable接口,因此有了这个方法
- getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,时间单位由单位参数指定
DelayQueue使用场景
- 关闭空闲链接。服务器中,有很多客户端链接,空闲一段时间后需要关闭。
- 缓存超过了缓存时间,就需要从缓存中移除。
DelayQueue超时订单处理案例
package com.rumenz.learn.delayqueue;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
//DelayQueue里面的元素必须实现Delayed
public class Item<T> implements Delayed {
private Long expireTime;
private T data;
public Item(Long expireTime, T data) {
this.expireTime = expireTime+System.currentTimeMillis();
this.data = data;
}
@Override
public long getDelay(TimeUnit unit) {
long d = unit.convert(this.expireTime - System.currentTimeMillis(),unit);
return d;
}
@Override
public int compareTo(Delayed o) {
long d=getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS);
if(d==0){
return 0;
}
return d>0?1:-1;
}
public Long getExpireTime() {
return expireTime;
}
public void setExpireTime(Long expireTime) {
this.expireTime = expireTime;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
}
// 订单实体类
package com.rumenz.learn.delayqueue;
public class OrderItem {
private Double orderAmount;
private String orderNo;
//0未支付 1支付了
private Integer orderStatus;
public OrderItem(Double orderAmount, String orderNo, Integer orderStatus) {
this.orderAmount = orderAmount;
this.orderNo = orderNo;
this.orderStatus = orderStatus;
}
public Double getOrderAmount() {
return orderAmount;
}
public void setOrderAmount(Double orderAmount) {
this.orderAmount = orderAmount;
}
public String getOrderNo() {
return orderNo;
}
public void setOrderNo(String orderNo) {
this.orderNo = orderNo;
}
public Integer getOrderStatus() {
return orderStatus;
}
public void setOrderStatus(Integer orderStatus) {
this.orderStatus = orderStatus;
}
}
//
package com.rumenz.learn.delayqueue;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.*;
public class DelayQueueExample {
//3个线程 1个线程下单 1个线程支付 1个线程关闭超时订单 订单支付超时时间为10s
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
DelayQueue<Item<OrderItem>> delayeds = new DelayQueue<>();
ConcurrentMap<String, OrderItem> map = new ConcurrentHashMap<>();
//下单线程
executorService.execute(()->{
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Integer orderNo=100;
while (true){
try{
Thread.sleep(3000);
Integer amount = new Random().nextInt(1000);
OrderItem orderItem=new OrderItem(amount.doubleValue(), String.valueOf(orderNo), 0);
Item<OrderItem> item=new Item<>(10*1000L,orderItem);
Date date=new Date();
date.setTime(item.getExpireTime());
System.out.println("=======================下单==========================");
System.out.println("生成订单时间:"+simpleDateFormat.format(new Date()));
System.out.println("订单编号:"+orderNo);
System.out.println("订单金额:"+orderItem.getOrderAmount());
System.out.println("支付过期时间:"+simpleDateFormat.format(date));
System.out.println("========================下单=========================");
map.put(String.valueOf(orderNo),orderItem);
orderNo++;
delayeds.offer(item);
}catch (Exception e){
e.printStackTrace();
}
}
});
//支付线程
executorService.execute(()->{
while (true){
try {
//随机等待 再支付
Thread.sleep(new Random().nextInt(15)*1000);
String orderNo="";
Iterator<Map.Entry<String, OrderItem>> iterator = map.entrySet().iterator();
if(iterator.hasNext()){
OrderItem orderItem = iterator.next().getValue();
orderItem.setOrderStatus(1);
orderNo=orderItem.getOrderNo();
System.out.println("-----------------------支付订单-----------------------");
System.out.println("订单支付"+orderNo);
System.out.println("支付金额"+orderItem.getOrderAmount());
System.out.println("-----------------------支付订单-----------------------");
}
map.remove(orderNo);
}catch (Exception e){
e.printStackTrace();
}
}
});
//关系过期的订单
executorService.execute(()->{
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
while (true){
try{
Item<OrderItem> item = delayeds.take();
OrderItem data = item.getData();
Date date=new Date();
date.setTime(item.getExpireTime());
if(data.getOrderStatus()==0){
System.out.println("########################过期订单########################");
System.out.println("订单编号:"+data.getOrderNo());
System.out.println("订单金额:"+data.getOrderAmount());
System.out.println("订单到期支付时间:"+simpleDateFormat.format(date));
System.out.println("########################过期订单########################");
}
map.remove(data.getOrderNo());
}catch (Exception e){
e.printStackTrace();
}
}
});
executorService.shutdown();
}
}
SynchronousQueue
public class SynchronousQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {
//内部栈
static final class TransferStack<E> extends Transferer<E> {}
//内部队列
static final class TransferQueue<E> extends Transferer<E> {}
public SynchronousQueue() {this(false);}
public SynchronousQueue(boolean fair) {
transferer = fair ?
new TransferQueue<E>() : new TransferStack<E>();
}
}
SynchronousQueue
代码演示
package com.rumenz.learn.synchronousqueue;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueExample {
public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>();
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(()->{
try {
System.out.println(Thread.currentThread().getName()+"put 1");
queue.put("1");
System.out.println(Thread.currentThread().getName()+"put 2");
queue.put("2");
System.out.println(Thread.currentThread().getName()+"put 3");
queue.put("3");
System.out.println(Thread.currentThread().getName()+"put 4");
queue.put("4");
}catch (Exception e){
e.printStackTrace();
}
});
executorService.execute(()->{
try{
TimeUnit.SECONDS.sleep(1);
System.out.println("获取数据:"+queue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println("获取数据:"+queue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println("获取数据:"+queue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println("获取数据:"+queue.take());
}catch (Exception e){
e.printStackTrace();
}
});
executorService.shutdown();
}
}
关注微信公众号:【入门小站】,关注更多知识点
java多线程
阅读 4发布于 6 分钟前
本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议
入门小站
rumenz.com
41 声望
2 粉丝
入门小站
rumenz.com
41 声望
2 粉丝
宣传栏
DelayQueue
- CompareTo(Delayed o):Delayed接口继承了Comparable接口,因此有了这个方法
- getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,时间单位由单位参数指定
DelayQueue使用场景
- 关闭空闲链接。服务器中,有很多客户端链接,空闲一段时间后需要关闭。
- 缓存超过了缓存时间,就需要从缓存中移除。
DelayQueue超时订单处理案例
package com.rumenz.learn.delayqueue;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
//DelayQueue里面的元素必须实现Delayed
public class Item<T> implements Delayed {
private Long expireTime;
private T data;
public Item(Long expireTime, T data) {
this.expireTime = expireTime+System.currentTimeMillis();
this.data = data;
}
@Override
public long getDelay(TimeUnit unit) {
long d = unit.convert(this.expireTime - System.currentTimeMillis(),unit);
return d;
}
@Override
public int compareTo(Delayed o) {
long d=getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS);
if(d==0){
return 0;
}
return d>0?1:-1;
}
public Long getExpireTime() {
return expireTime;
}
public void setExpireTime(Long expireTime) {
this.expireTime = expireTime;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
}
// 订单实体类
package com.rumenz.learn.delayqueue;
public class OrderItem {
private Double orderAmount;
private String orderNo;
//0未支付 1支付了
private Integer orderStatus;
public OrderItem(Double orderAmount, String orderNo, Integer orderStatus) {
this.orderAmount = orderAmount;
this.orderNo = orderNo;
this.orderStatus = orderStatus;
}
public Double getOrderAmount() {
return orderAmount;
}
public void setOrderAmount(Double orderAmount) {
this.orderAmount = orderAmount;
}
public String getOrderNo() {
return orderNo;
}
public void setOrderNo(String orderNo) {
this.orderNo = orderNo;
}
public Integer getOrderStatus() {
return orderStatus;
}
public void setOrderStatus(Integer orderStatus) {
this.orderStatus = orderStatus;
}
}
//
package com.rumenz.learn.delayqueue;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.*;
public class DelayQueueExample {
//3个线程 1个线程下单 1个线程支付 1个线程关闭超时订单 订单支付超时时间为10s
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
DelayQueue<Item<OrderItem>> delayeds = new DelayQueue<>();
ConcurrentMap<String, OrderItem> map = new ConcurrentHashMap<>();
//下单线程
executorService.execute(()->{
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Integer orderNo=100;
while (true){
try{
Thread.sleep(3000);
Integer amount = new Random().nextInt(1000);
OrderItem orderItem=new OrderItem(amount.doubleValue(), String.valueOf(orderNo), 0);
Item<OrderItem> item=new Item<>(10*1000L,orderItem);
Date date=new Date();
date.setTime(item.getExpireTime());
System.out.println("=======================下单==========================");
System.out.println("生成订单时间:"+simpleDateFormat.format(new Date()));
System.out.println("订单编号:"+orderNo);
System.out.println("订单金额:"+orderItem.getOrderAmount());
System.out.println("支付过期时间:"+simpleDateFormat.format(date));
System.out.println("========================下单=========================");
map.put(String.valueOf(orderNo),orderItem);
orderNo++;
delayeds.offer(item);
}catch (Exception e){
e.printStackTrace();
}
}
});
//支付线程
executorService.execute(()->{
while (true){
try {
//随机等待 再支付
Thread.sleep(new Random().nextInt(15)*1000);
String orderNo="";
Iterator<Map.Entry<String, OrderItem>> iterator = map.entrySet().iterator();
if(iterator.hasNext()){
OrderItem orderItem = iterator.next().getValue();
orderItem.setOrderStatus(1);
orderNo=orderItem.getOrderNo();
System.out.println("-----------------------支付订单-----------------------");
System.out.println("订单支付"+orderNo);
System.out.println("支付金额"+orderItem.getOrderAmount());
System.out.println("-----------------------支付订单-----------------------");
}
map.remove(orderNo);
}catch (Exception e){
e.printStackTrace();
}
}
});
//关系过期的订单
executorService.execute(()->{
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
while (true){
try{
Item<OrderItem> item = delayeds.take();
OrderItem data = item.getData();
Date date=new Date();
date.setTime(item.getExpireTime());
if(data.getOrderStatus()==0){
System.out.println("########################过期订单########################");
System.out.println("订单编号:"+data.getOrderNo());
System.out.println("订单金额:"+data.getOrderAmount());
System.out.println("订单到期支付时间:"+simpleDateFormat.format(date));
System.out.println("########################过期订单########################");
}
map.remove(data.getOrderNo());
}catch (Exception e){
e.printStackTrace();
}
}
});
executorService.shutdown();
}
}
SynchronousQueue
public class SynchronousQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {
//内部栈
static final class TransferStack<E> extends Transferer<E> {}
//内部队列
static final class TransferQueue<E> extends Transferer<E> {}
public SynchronousQueue() {this(false);}
public SynchronousQueue(boolean fair) {
transferer = fair ?
new TransferQueue<E>() : new TransferStack<E>();
}
}
SynchronousQueue
代码演示
package com.rumenz.learn.synchronousqueue;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueExample {
public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>();
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(()->{
try {
System.out.println(Thread.currentThread().getName()+"put 1");
queue.put("1");
System.out.println(Thread.currentThread().getName()+"put 2");
queue.put("2");
System.out.println(Thread.currentThread().getName()+"put 3");
queue.put("3");
System.out.println(Thread.currentThread().getName()+"put 4");
queue.put("4");
}catch (Exception e){
e.printStackTrace();
}
});
executorService.execute(()->{
try{
TimeUnit.SECONDS.sleep(1);
System.out.println("获取数据:"+queue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println("获取数据:"+queue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println("获取数据:"+queue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println("获取数据:"+queue.take());
}catch (Exception e){
e.printStackTrace();
}
});
executorService.shutdown();
}
}
关注微信公众号:【入门小站】,关注更多知识点
以上是 【Java】Java高并发BlockingQueue重要的实现类二 的全部内容, 来源链接: utcz.com/a/107207.html
得票时间