多线程productAndconsumer模式

编程

package com.thread.ch13;

public class Message {

private String data;

public Message(String data) {

this.data = data;

}

public String getData() {

return data;

}

}

package com.thread.ch13;

import java.util.LinkedList;

/**

* 共享變量

*/

public class MessageQueue {

private final LinkedList<Message> queue;

private static final int DEFAULT_MAX_LIMIT = 100;

private final int limit;

public MessageQueue(int limit) {

this.queue = new LinkedList<Message>();

this.limit = limit;

}

public void put(Message message) throws InterruptedException {

synchronized (queue) {

while ( queue.size()>=limit) {

queue.wait();

}

queue.addLast(message);

queue.notifyAll();

}

}

public Message take() throws InterruptedException {

synchronized (queue) {

while (queue.isEmpty()) {

queue.wait();

}

Message message = queue.removeFirst();

queue.notifyAll();

return message;

}

}

public int getQueueSize() {

synchronized (queue) {

return queue.size();

}

}

}

package com.thread.ch13;

import java.util.Random;

import java.util.concurrent.atomic.AtomicInteger;

public class ProductThread extends Thread {

private final MessageQueue messageQueue;

private AtomicInteger count = new AtomicInteger(0);

private Random random = new Random(System.currentTimeMillis());

public ProductThread(MessageQueue queue, int seq) {

super("Product-" + seq);

this.messageQueue = queue;

}

@Override

public void run() {

while (true) {

try {

Message message = new Message("Message-" + count.getAndIncrement());

messageQueue.put(message);

System.out.println(Thread.currentThread().getName() + "---put message" + message.getData());

Thread.sleep(random.nextInt(1000));

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

package com.thread.ch13;

import java.util.Random;

import java.util.concurrent.atomic.AtomicInteger;

public class ConsumerThread extends Thread {

private final MessageQueue messageQueue;

private AtomicInteger count = new AtomicInteger(0);

private Random random = new Random(System.currentTimeMillis());

public ConsumerThread(MessageQueue queue, int seq) {

super("Consumer-" + seq);

this.messageQueue = queue;

}

@Override

public void run() {

while (true) {

try {

Message message = messageQueue.take();

System.out.println(Thread.currentThread().getName() + "---tak message" + message.getData());

Thread.sleep(random.nextInt(1000));

} catch (InterruptedException e) {

e.printStackTrace();

break;

}

}

}

}

package com.thread.ch13;

public class ProductAndConsumerClient {

public static void main(String[] args) {

MessageQueue messageQueue = new MessageQueue(100);

new ProductThread(messageQueue, 1).start();

new ProductThread(messageQueue, 2).start();

new ProductThread(messageQueue, 3).start();

new ConsumerThread(messageQueue, 1).start();

new ConsumerThread(messageQueue, 2).start();

}

}

//----------------打印结果

Product-1---put messageMessage-0
Product-2---put messageMessage-0
Product-3---put messageMessage-0
Consumer-1---tak messageMessage-0
Consumer-2---tak messageMessage-0
Consumer-2---tak messageMessage-0
Product-1---put messageMessage-1
Product-2---put messageMessage-1
Product-3---put messageMessage-1
Consumer-1---tak messageMessage-1
Product-1---put messageMessage-2
Product-3---put messageMessage-2
Product-2---put messageMessage-2
Consumer-2---tak messageMessage-1
Consumer-1---tak messageMessage-1
Product-2---put messageMessage-3
Product-1---put messageMessage-3

 

以上是 多线程productAndconsumer模式 的全部内容, 来源链接: utcz.com/z/513738.html

回到顶部