生产者消费者模式

编程

生产者消费者模式,这里介绍三种实现方法:

1.传统Synchronized

2.Lock+Condition

3.BlockingQueue

一、Synchronized

package com.yuxx.prodcons;

public class SyncWaitNotify {

public static void main(String[] args) {

Clerk1 clerk1 = new Clerk1();

Producter1 producter1 = new Producter1(clerk1);

Consumer1 consumer1 = new Consumer1(clerk1);

for (int i = 1; i <= 5; i++) {

new Thread(producter1,"生产者"+i).start();

}

for (int i = 1; i <= 5; i++) {

new Thread(consumer1,"消费者"+i).start();

}

}

}

/**

* 生产者

*/

class Producter1 implements Runnable{

private Clerk1 clerk1;

public Producter1(Clerk1 clerk1) {

this.clerk1 = clerk1;

}

@Override

public void run() {

for (int i = 0; i < 10; i++) {

//模拟网络延迟等

try {

Thread.sleep(400);

} catch (InterruptedException e) {

e.printStackTrace();

}

clerk1.add();

}

}

}

/**

* 消费者

*/

class Consumer1 implements Runnable{

private Clerk1 clerk1;

public Consumer1(Clerk1 clerk1) {

this.clerk1 = clerk1;

}

@Override

public void run() {

for (int i = 0; i < 10; i++) {

clerk1.sale();

}

}

}

/**

* 店员,进货卖货

*/

class Clerk1{

//产品数量

private int product = 0;

/**

* 进货

*/

public synchronized void add(){

String threadName = Thread.currentThread().getName();

while (product >= 10){

System.out.println(threadName + ":产品已满,停止进货!");

try {

//防止虚假唤醒,将wait()放入while块

this.wait();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

System.out.println(threadName+ ":" + ++product);

this.notify();

}

/**

* 卖货

*/

public synchronized void sale(){

String threadName = Thread.currentThread().getName();

while (product <= 0){

System.out.println(threadName + ":产品缺货,停止卖货!");

try {

//防止虚假唤醒,将wait()放入while块

this.wait();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

System.out.println(threadName + ":" + --product);

this.notify();

}

}

二、Lock+Condition

package com.yuxx.prodcons;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.ReentrantLock;

public class LockCondition {

private ReentrantLock lock = new ReentrantLock();

private Condition addCondition = lock.newCondition();

private Condition saleCondition = lock.newCondition();

private int product = 0;

/**

* 进货

*/

public void add(){

lock.lock();

String threadName = Thread.currentThread().getName();

System.out.println(threadName + " in add.");

try{

while (product >= 10){

System.out.println(threadName + ":产品已满,停止进货!");

addCondition.await();

}

System.out.println(threadName + "进货:" + ++product);

saleCondition.signal();

}catch (Exception e){

e.printStackTrace();

}finally {

lock.unlock();

}

}

/**

* 卖货

*/

public void sale(){

lock.lock();

String threadName = Thread.currentThread().getName();

System.out.println(threadName + " in sale.");

try{

while (product <= 0){

System.out.println(threadName + ":产品缺货,停止售卖!");

saleCondition.await();

}

Thread.sleep(400);

System.out.println(threadName + "买货:" + --product);

addCondition.signal();

}catch (Exception e){

e.printStackTrace();

}finally {

lock.unlock();

}

}

}

class LockConditionTest{

public static void main(String[] args) {

LockCondition lc = new LockCondition();

new Thread(()->{

for (int j = 0; j < 10; j++) {

lc.add();

}

},"生产者A").start();

new Thread(()->{

for (int j = 0; j < 10; j++) {

lc.sale();

}

},"消费者B").start();

new Thread(()->{

for (int j = 0; j < 10; j++) {

lc.add();

}

},"生产者C").start();

new Thread(()->{

for (int j = 0; j < 10; j++) {

lc.sale();

}

},"消费者D").start();

}

}

此处扩展一下Lock+Condition的另一个案例: https://my.oschina.net/alexjava/blog/4371065 。

三、BlockingQueue

package com.yuxx.prodcons;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;

public class BlockQueue {

private volatile boolean FLAG = true;

private AtomicInteger atomicInteger = new AtomicInteger();

BlockingQueue<String> blockingQueue = null;

public BlockQueue(BlockingQueue<String> blockingQueue) {

this.blockingQueue = blockingQueue;

}

public void product() throws InterruptedException {

String data;

boolean retValue;

while (FLAG){

data = atomicInteger.incrementAndGet() + "";

retValue = blockingQueue.offer(data,2L, TimeUnit.SECONDS);

if(retValue){

System.out.println(Thread.currentThread().getName() + " 插入队列" + data + "成功!");

}else{

System.out.println(Thread.currentThread().getName() + " 插入队列" + data + "失败!");

}

TimeUnit.SECONDS.sleep(1);

}

System.out.println(Thread.currentThread().getName() + " FLAG=false,生产结束。");

}

public void consume() throws InterruptedException {

String result;

while (FLAG){

result = blockingQueue.poll(2,TimeUnit.SECONDS);

if(null == result || "".equals(result)){

FLAG = false;

System.out.println(Thread.currentThread().getName() + " 超过2秒没有获取到数据,消费退出。");

System.out.println();

System.out.println();

System.out.println();

return;

}

System.out.println(Thread.currentThread().getName() + " 获取数据" + result + "成功!");

}

}

public void stop(){

this.FLAG = false;

}

}

class BlockingQueueDemo{

public static void main(String[] args) {

BlockQueue blockQueue = new BlockQueue(new ArrayBlockingQueue<String>(10));

new Thread(()->{

System.out.println(Thread.currentThread().getName() + " 生产线程启动!");

try {

blockQueue.product();

} catch (InterruptedException e) {

e.printStackTrace();

}

},"Product").start();

new Thread(()->{

System.out.println(Thread.currentThread().getName() + " 消费线程启动!");

try {

blockQueue.consume();

} catch (InterruptedException e) {

e.printStackTrace();

}

},"Consumer").start();

try {

TimeUnit.SECONDS.sleep(5);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println();

System.out.println();

System.out.println();

System.out.println("5秒时间到,活动结束。");

blockQueue.stop();

}

}

 

以上是 生产者消费者模式 的全部内容, 来源链接: utcz.com/z/518309.html

回到顶部