生产者消费者模式
生产者消费者模式,这里介绍三种实现方法:
1.传统Synchronized
2.Lock+Condition
3.BlockingQueue
一、Synchronized
package com.yuxx.prodcons;public class SyncWaitNotify {
public static void main(String[] args) {
Clerk1 clerk1 = new Clerk1();
Producter1 producter1 = new Producter1(clerk1);
Consumer1 consumer1 = new Consumer1(clerk1);
for (int i = 1; i <= 5; i++) {
new Thread(producter1,"生产者"+i).start();
}
for (int i = 1; i <= 5; i++) {
new Thread(consumer1,"消费者"+i).start();
}
}
}
/**
* 生产者
*/
class Producter1 implements Runnable{
private Clerk1 clerk1;
public Producter1(Clerk1 clerk1) {
this.clerk1 = clerk1;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
//模拟网络延迟等
try {
Thread.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk1.add();
}
}
}
/**
* 消费者
*/
class Consumer1 implements Runnable{
private Clerk1 clerk1;
public Consumer1(Clerk1 clerk1) {
this.clerk1 = clerk1;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
clerk1.sale();
}
}
}
/**
* 店员,进货卖货
*/
class Clerk1{
//产品数量
private int product = 0;
/**
* 进货
*/
public synchronized void add(){
String threadName = Thread.currentThread().getName();
while (product >= 10){
System.out.println(threadName + ":产品已满,停止进货!");
try {
//防止虚假唤醒,将wait()放入while块
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(threadName+ ":" + ++product);
this.notify();
}
/**
* 卖货
*/
public synchronized void sale(){
String threadName = Thread.currentThread().getName();
while (product <= 0){
System.out.println(threadName + ":产品缺货,停止卖货!");
try {
//防止虚假唤醒,将wait()放入while块
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(threadName + ":" + --product);
this.notify();
}
}
二、Lock+Condition
package com.yuxx.prodcons;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class LockCondition {
private ReentrantLock lock = new ReentrantLock();
private Condition addCondition = lock.newCondition();
private Condition saleCondition = lock.newCondition();
private int product = 0;
/**
* 进货
*/
public void add(){
lock.lock();
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " in add.");
try{
while (product >= 10){
System.out.println(threadName + ":产品已满,停止进货!");
addCondition.await();
}
System.out.println(threadName + "进货:" + ++product);
saleCondition.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
/**
* 卖货
*/
public void sale(){
lock.lock();
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " in sale.");
try{
while (product <= 0){
System.out.println(threadName + ":产品缺货,停止售卖!");
saleCondition.await();
}
Thread.sleep(400);
System.out.println(threadName + "买货:" + --product);
addCondition.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
class LockConditionTest{
public static void main(String[] args) {
LockCondition lc = new LockCondition();
new Thread(()->{
for (int j = 0; j < 10; j++) {
lc.add();
}
},"生产者A").start();
new Thread(()->{
for (int j = 0; j < 10; j++) {
lc.sale();
}
},"消费者B").start();
new Thread(()->{
for (int j = 0; j < 10; j++) {
lc.add();
}
},"生产者C").start();
new Thread(()->{
for (int j = 0; j < 10; j++) {
lc.sale();
}
},"消费者D").start();
}
}
此处扩展一下Lock+Condition的另一个案例: https://my.oschina.net/alexjava/blog/4371065 。
三、BlockingQueue
package com.yuxx.prodcons;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockQueue {
private volatile boolean FLAG = true;
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public BlockQueue(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void product() throws InterruptedException {
String data;
boolean retValue;
while (FLAG){
data = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(data,2L, TimeUnit.SECONDS);
if(retValue){
System.out.println(Thread.currentThread().getName() + " 插入队列" + data + "成功!");
}else{
System.out.println(Thread.currentThread().getName() + " 插入队列" + data + "失败!");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + " FLAG=false,生产结束。");
}
public void consume() throws InterruptedException {
String result;
while (FLAG){
result = blockingQueue.poll(2,TimeUnit.SECONDS);
if(null == result || "".equals(result)){
FLAG = false;
System.out.println(Thread.currentThread().getName() + " 超过2秒没有获取到数据,消费退出。");
System.out.println();
System.out.println();
System.out.println();
return;
}
System.out.println(Thread.currentThread().getName() + " 获取数据" + result + "成功!");
}
}
public void stop(){
this.FLAG = false;
}
}
class BlockingQueueDemo{
public static void main(String[] args) {
BlockQueue blockQueue = new BlockQueue(new ArrayBlockingQueue<String>(10));
new Thread(()->{
System.out.println(Thread.currentThread().getName() + " 生产线程启动!");
try {
blockQueue.product();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Product").start();
new Thread(()->{
System.out.println(Thread.currentThread().getName() + " 消费线程启动!");
try {
blockQueue.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Consumer").start();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println();
System.out.println();
System.out.println();
System.out.println("5秒时间到,活动结束。");
blockQueue.stop();
}
}
以上是 生产者消费者模式 的全部内容, 来源链接: utcz.com/z/518309.html