多线程生产者消费者模型
时间:2022-09-09 12:30:00
文章目录
- 1,wait和notifyAll实现
- 2.实现阻塞队列
1,wait和notifyAll实现
wait
:使当前线程处于睡眠状态,释放锁(sleep是抱锁睡眠)
notifyAll
:唤醒所有wait状态的线程
缓冲区满时,生产者停止执行线程,放弃锁,让自己处于等状态,让其他线程执行;
当缓冲区空时,消费者停止执行线程,放弃锁,让自己处于等状态,让其他线程执行。
当生产者将产品放入缓冲区时,向其他等待线程发出可执行的通知,同时放弃锁,使自己处于等待状态;
当消费者从缓冲区取出产品时,他们会向其他等待线程发出可执行的通知,放弃锁,让自己处于等待状态。
store.java(商店)
public class Store {
/// private int size; ////当前存储的产品数量 private int count = 0; // ///饿汉单例(线程安全) // private static Store store= new Store(); // public static Store getStore() {
// return store; // } //懒汉单例 private static Store store = null; public synchronized static Store getStore() {
if(store == null) {
store = new Store(); return store; } return store; } private Store(){
} public int getSize() {
return size; } public void setSize(int size) {
this.size = size; } //生产 public synchronized void produce() {
while(count
>= size
)
{
try
{
this
.
wait
(
)
;
}
catch
(
InterruptedException e
)
{
e
.
printStackTrace
(
)
;
}
}
System
.out
.
println
(
Thread
.
currentThread
(
)
.
getName
(
)
+
"--剩余数"
+
(
++count
)
)
;
this
.
notifyAll
(
)
;
}
//消费
public
synchronized
void
consume
(
)
{
while
(count
<=
0
)
{
try
{
this
.
wait
(
)
;
}
catch
(
InterruptedException e
)
{
e
.
printStackTrace
(
)
;
}
}
System
.out
.
println
(
Thread
.
currentThread
(
)
.
getName
(
)
+
"--剩余数"
+
(
--count
)
)
;
this
.
notifyAll
(
)
;
}
}
生产线程
public class Producer implements Runnable{
private Store store = Store.getStore();
@Override
public void run() {
while (true) {
try {
Thread.sleep(50);
store.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消费线程
public class Consumer implements Runnable{
private Store store = Store.getStore();
@Override
public void run() {
while(true) {
try {
Thread.sleep(80);
store.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
测试
public class Test {
public static void main(String[] args) {
//设置最大数量
Store.getStore().setSize(10);
Thread t1= new Thread(new Producer(),"生产者A");
Thread t2= new Thread(new Producer(),"生产者B");
Thread t3= new Thread(new Consumer(),"消费者a");
Thread t4= new Thread(new Consumer(),"消费者b");
t1.start();
t2.start();
t3.start();
t4.start();
}
}
2,阻塞队列实现
阻塞队列:增删改查方法均被lock锁住了,所以线程安全
因此可以使用阻塞队列做缓冲区,进行添加产品和消费产品
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iOuVPYtB-1653839801205)(https://s2.loli.net/2022/05/29/zmp3tUnxgkGQ5c7.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6erNtEU5-1653839801207)(https://s2.loli.net/2022/05/29/jUcEARwykDSTl5J.png)]
生产者
public class Producer extends Thread{
private BlockingQueue queue;
Producer(String name,BlockingQueue queue) {
super(name);
this.queue = queue;
}
@Override
public void run() {
synchronized (this) {
for(int i = 0 ; i < 5 ; i ++) {
try {
Thread.sleep(100);
queue.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.getName()+"生产"+i);
}
}
}
}
生产者
public class Consumer extends Thread{
private BlockingQueue queue;
Consumer(String name,BlockingQueue queue) {
super(name);
this.queue = queue;
}
@Override
public void run() {
synchronized (this) {
for(int i = 0 ; i < 10 ; i++) {
try {
Thread.sleep(200);
Object o = queue.take();
System.out.println(this.getName()+"消费"+o);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
test
public class Test {
public static void main(String[] args) {
BlockingQueue store = new ArrayBlockingQueue(5);
Producer t1 = new Producer("A",store);
Producer t2 = new Producer("B",store);
Consumer t3 = new Consumer("C",store);
Consumer t4 = new Consumer("D",store);
t1.start();
t2.start();
t3.start();
t4.start();
}
}