多线程productAndconsumer模式
package com.thread.ch13;public class Message {
private String data;
public Message(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
package com.thread.ch13;import java.util.LinkedList;
/**
* 共享變量
*/
public class MessageQueue {
private final LinkedList<Message> queue;
private static final int DEFAULT_MAX_LIMIT = 100;
private final int limit;
public MessageQueue(int limit) {
this.queue = new LinkedList<Message>();
this.limit = limit;
}
public void put(Message message) throws InterruptedException {
synchronized (queue) {
while ( queue.size()>=limit) {
queue.wait();
}
queue.addLast(message);
queue.notifyAll();
}
}
public Message take() throws InterruptedException {
synchronized (queue) {
while (queue.isEmpty()) {
queue.wait();
}
Message message = queue.removeFirst();
queue.notifyAll();
return message;
}
}
public int getQueueSize() {
synchronized (queue) {
return queue.size();
}
}
}
package com.thread.ch13;import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
public class ProductThread extends Thread {
private final MessageQueue messageQueue;
private AtomicInteger count = new AtomicInteger(0);
private Random random = new Random(System.currentTimeMillis());
public ProductThread(MessageQueue queue, int seq) {
super("Product-" + seq);
this.messageQueue = queue;
}
@Override
public void run() {
while (true) {
try {
Message message = new Message("Message-" + count.getAndIncrement());
messageQueue.put(message);
System.out.println(Thread.currentThread().getName() + "---put message" + message.getData());
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.thread.ch13;import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
public class ConsumerThread extends Thread {
private final MessageQueue messageQueue;
private AtomicInteger count = new AtomicInteger(0);
private Random random = new Random(System.currentTimeMillis());
public ConsumerThread(MessageQueue queue, int seq) {
super("Consumer-" + seq);
this.messageQueue = queue;
}
@Override
public void run() {
while (true) {
try {
Message message = messageQueue.take();
System.out.println(Thread.currentThread().getName() + "---tak message" + message.getData());
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
package com.thread.ch13;public class ProductAndConsumerClient {
public static void main(String[] args) {
MessageQueue messageQueue = new MessageQueue(100);
new ProductThread(messageQueue, 1).start();
new ProductThread(messageQueue, 2).start();
new ProductThread(messageQueue, 3).start();
new ConsumerThread(messageQueue, 1).start();
new ConsumerThread(messageQueue, 2).start();
}
}
//----------------打印结果
Product-1---put messageMessage-0
Product-2---put messageMessage-0
Product-3---put messageMessage-0
Consumer-1---tak messageMessage-0
Consumer-2---tak messageMessage-0
Consumer-2---tak messageMessage-0
Product-1---put messageMessage-1
Product-2---put messageMessage-1
Product-3---put messageMessage-1
Consumer-1---tak messageMessage-1
Product-1---put messageMessage-2
Product-3---put messageMessage-2
Product-2---put messageMessage-2
Consumer-2---tak messageMessage-1
Consumer-1---tak messageMessage-1
Product-2---put messageMessage-3
Product-1---put messageMessage-3
以上是 多线程productAndconsumer模式 的全部内容, 来源链接: utcz.com/z/513738.html