php网站开发目录,网站cms相关知识,卖机械设备什么网站做推广好,电子行业网站建设上文中我们讲了Java库中自带的阻塞队列#xff0c;并且讲了如何用阻塞队列来实现生产者消费者模型
【Java】用Java库中自带的阻塞队列以及用阻塞队列实现生产者-消费者模型
下面我们来讲如何用代码实现一个阻塞队列
1、实现一个阻塞队列 阻塞队列 普通队列 线程安全 阻… 上文中我们讲了Java库中自带的阻塞队列并且讲了如何用阻塞队列来实现生产者消费者模型
【Java】用Java库中自带的阻塞队列以及用阻塞队列实现生产者-消费者模型
下面我们来讲如何用代码实现一个阻塞队列
1、实现一个阻塞队列 阻塞队列 普通队列 线程安全 阻塞 1首先实现一个普通队列
class MyBlockingQueue{private int head 0;private int tail 0;private int size 0;String[] array;public MyBlockingQueue(){array new String[1000];}//取出队首元素public String take() throws InterruptedException {//如果队列为空则返回nullif (size 0){return null;}//取出队首元素String elem array[head];//如果head已经到了队尾那么下一个置0if(head array.length){head 0;}head;size--;return elem;}//放入元素public void put(String elem) throws InterruptedException { if (size array.length){return;}array[tail] elem;if (tail array.length){tail 0;}tail;size;}
}
2线程安全
由于put()和take()方法中对各个变量都进行了多次修改因此我们在实现线程安全时直接对这两段代码加锁
public String take() throws InterruptedException {synchronized{if (size 0){return null;}String elem array[head];if(head array.length){head 0;}head;size--;return elem;}} public void put(String elem) throws InterruptedException { synchronized{if (size array.length){return;}array[tail] elem;if (tail array.length){tail 0;}tail;size;}}
并且为了防止内存可见性问题和指令重排序问题我们给三个变量加上volatile关键字进行修饰
什么是可见性问题和指令重排序问题
【Java】volatile-内存可见性问题
【Java】多线程-单例模式/volatile-指令重排序
private volatile int head 0;
private volatile int tail 0;
private volatile int size 0;
3阻塞 最后再加上阻塞
取队首元素时如果队列为空那么我们直接进行阻塞等到下一次在另一个线程放入元素时将其唤醒
放元素时如果队列满了我们将这个线程阻塞等到队列可用时我们在另一个线程唤醒 public String take() throws InterruptedException {synchronized (this){if (size 0){this.wait();}String elem array[head];if(head array.length){head 0;}head;size--;this.notify();return elem;}}public void put(String elem) throws InterruptedException {synchronized (this){if (size array.length){this.wait();}array[tail] elem;if (tail array.length){tail 0;}tail;size;this.notify();}}注意他们唤醒的对应关系 4while循环
这其中还存在一个问题那就是wait()的对象只能被notify()唤醒吗
答案是不。除了用notify()唤醒发生InterruptedException异常也可以将对象唤醒
假设队列为空的情况下发生了InterruptedException异常对象被唤醒代码继续往下执行再想取元素便会出错。因此这种情况下我们还要继续判断队列是否为空
为了解决这个问题我们将if判断改为while()循环判断就可以避免上面情况发生
//取出队首元素public String take() throws InterruptedException {synchronized (this){while (size 0){this.wait();}String elem array[head];if(head array.length){head 0;}head;size--;this.notify();return elem;}}//放入元素public void put(String elem) throws InterruptedException {synchronized (this){//判断队列是否满了如果满了则阻塞while (size array.length){this.wait();}array[tail] elem;if (tail array.length){tail 0;}tail;size;this.notify();}}
5完整代码
实现阻塞队列的完整代码如下
class MyBlockingQueue{private volatile int head 0;private volatile int tail 0;private volatile int size 0;String[] array;public MyBlockingQueue(){array new String[1000];}//取出队首元素public String take() throws InterruptedException {synchronized (this){while (size 0){this.wait();}String elem array[head];if(head array.length){head 0;}head;size--;this.notify();return elem;}}//放入元素public void put(String elem) throws InterruptedException {synchronized (this){//判断队列是否满了如果满了则阻塞while (size array.length){this.wait();}array[tail] elem;if (tail array.length){tail 0;}tail;size;this.notify();}}
}
2、实现生产者-消费者模型 代码如下
class MyBlockingQueue{private volatile int head 0;private volatile int tail 0;private volatile int size 0;String[] array;public MyBlockingQueue(){array new String[1000];}//取出队首元素public String take() throws InterruptedException {synchronized (this){while (size 0){this.wait();}String elem array[head];if(head array.length){head 0;}head;size--;this.notify();return elem;}}//放入元素public void put(String elem) throws InterruptedException {synchronized (this){//判断队列是否满了如果满了则阻塞while (size array.length){this.wait();}array[tail] elem;if (tail array.length){tail 0;}tail;size;this.notify();}}
}public class demo2 {public static void main(String[] args) {MyBlockingQueue myBlockingQueue new MyBlockingQueue();//生产者Thread thread1 new Thread(()-{int n 0;while (true){try {myBlockingQueue.put(n );} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(生产元素n);n;try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}}});//消费者Thread thread2 new Thread(()-{while (true){try {System.out.println(消费元素 myBlockingQueue.take());} catch (InterruptedException e) {throw new RuntimeException(e);}}});thread1.start();thread2.start();}
}运行结果如图