亲宝软件园·资讯

展开

Java多线程阻塞队列

Moon Bay 人气:0

一.阻塞队列介绍

1.1阻塞队列特性

阻塞队列特性:

一.安全性

二.产生阻塞效果

阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则.阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:

阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型.

1.2阻塞队列的优点

我们可以将阻塞队列比做成"生产者"和"消费者"的"交易平台".

我们可以把这个模型来比做成"包饺子"

A 的作用是擀饺子皮,也就是"生产者"

B 的作用是包饺子,也就是"消费者"

X 的作用一个当作放擀好饺子皮的一个盘中,也就是阻塞队列

这样我们根据A,B,X可以想象以下场景

场景一:

当A擀饺子皮的速度过快,X被A的杆好饺子皮放满了,这样A就需要停止擀饺子皮这一个操作,这时只能等待B来利用A提供的饺子皮包饺子后X所空出的空间,来给A提供生产的环境

场景二:

当B包饺子的速度过快,X被B的包饺子所用的饺子皮用空,这样B就需要停止包饺子这一个操作,这时只能等待A提供的饺子皮包饺子后X所存在饺子皮,来给B提供消费的环境

二.生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取

(1) 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力.

比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求,服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求.这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮.

(2) 阻塞队列也能使生产者和消费者之间 解耦.

比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包. 擀饺子皮的人就是 “生产者”, 包饺子的人就是 “消费者”.擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包), 包饺子的人也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的).

2.1阻塞队列对生产者的优化

优化一:能够让多个服务器程序之间更充分的解耦合:

如果不使用生产者和消费者模型,此时A和B的耦合性比较强,如果A线程出现一些状况B就会挂,B线程出现一些状况A就会挂,这时当我们引入阻塞队列后我们就可以将A,B线程分开,如果A,B线程挂了有阻塞队列的存在下,是不会影响别的线程

优化二:能够对于请求进行"削峰填谷":

我们可以联想到我国的三峡大坝,三峡大坝就相当于阻塞队列,当我们遇到雨水大的季节,我们就可以关闭三峡大坝,利用三峡大坝来存水;当我们遇到干旱期,我们就可以打开三峡大坝的门,来放水解决干旱问题

三.标准库中的阻塞队列

3.1Java提供阻塞队列实现的标准类

java官方也提供了阻塞队列的标准类,主要有下面几个:

标准类说明
ArrayBlockingQueue一个由数组结构组成的有界阻塞队列
LinkedBlockingQueue一个由链表结构组成的有界阻塞队列
PriorityBlockingQueue一个支持优先级排序的无界阻塞队列
DelayQueue一个使用优先级队列实现的无界阻塞队列
SynchronousQueue一个不存储元素的阻塞队列
LinkedTransferQueue一个由链表结构组成的无界阻塞队列
LinkedBlockingDeque一个由链表结构组成的双向阻塞队列
BlockingQueue接口单向阻塞队列实现了该接口
BlockingDeque接口双向阻塞队列实现了该接口

3.2Blockingqueue基本使用

在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可.

BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.

put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.

BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.

BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();

四.阻塞队列实现

4.1阻塞队列的代码实现

我们通过 “循环队列” 的方式来实现

使用 synchronized 进行加锁控制.put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一定队列就不满了, 因为同时可能是唤醒了多个线程).take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)

我们在设计阻塞队列的时候可以将队列联想成一个圆

class BlockingQueue{
    //队列里存放的个数
   volatile private int size = 0;
    //队列的头节点
    private int head = 0;
    //队列的尾节点
    private int prov = 0;
    //创建一个数组,我们来给这个数组的容量设置为100
    private int[] array = new int[100];
    //创建一个专业的锁对象
    private Object object = new Object();
    //实现阻塞队列中的put方法
    public void put(int value) throws InterruptedException {
        synchronized (object) {
            //当数组已经满了
            if (size == array.length) {
                object.wait();
            } else {
                //我们可以优化成prov = (prov + 1) % items.length
                array[prov] = value;
                prov ++;
               if (prov >= array.length) {
                   prov = 0;
               }
            }
            size++;
            object.notify();
        }
    }
    //实现阻塞队列中的take方法
    public int take() throws InterruptedException {
        synchronized (object) {
            if (size == 0) {
                object.wait();
            }
            int x = array[head];
            head++;
            if (head >= array.length) {
                head = 0;
            }
            size--;
            object.notify();
            return x;
        }
    }
}

4.2阻塞队列搭配生产者与消费者的代码实现

class BlockingQueue{
    //队列里存放的个数
   volatile private int size = 0;
    //队列的头节点
    private int head = 0;
    //队列的尾节点
    private int prov = 0;
    //创建一个数组,我们来给这个数组的容量设置为100
    private int[] array = new int[100];
    //创建一个专业的锁对象
    private Object object = new Object();
    //实现阻塞队列中的put方法
    public void put(int value) throws InterruptedException {
        synchronized (object) {
            //当数组已经满了
            if (size == array.length) {
                object.wait();
            } else {
                //我们可以优化成prov = (prov + 1) % items.length
                array[prov] = value;
                prov ++;
               if (prov >= array.length) {
                   prov = 0;
               }
            }
            size++;
            object.notify();
        }
    }
    //实现阻塞队列中的take方法
    public int take() throws InterruptedException {
        synchronized (object) {
            if (size == 0) {
                object.wait();
            }
            int x = array[head];
            head++;
            if (head >= array.length) {
                head = 0;
            }
            size--;
            object.notify();
            return x;
        }
    }
}
public class Test {
    public static void main(String[] args) {
        BlockingQueue blockingQueue = new BlockingQueue();
        Thread thread1 = new Thread(()-> {
            while (true) {
                for (int i = 0; i < 100; i++) {
                    try {
                        blockingQueue.put(i);
                        System.out.println("生产了"+i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        Thread thread2 = new Thread(()->{
                while (true) {
                    try {
                        int b = blockingQueue.take();
                        System.out.println("消耗了"+b);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
        });
        thread1.start();
        thread2.start();
    }
}

加载全部内容

相关教程
猜你喜欢
用户评论