Java并发编程:阻塞队列(BlockingQueue)

阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于BlockingQueue实现的。

BlockingQueue 的操作方法

BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

四组不同的行为方式解释:

  • 抛异常:如果试图的操作无法立即执行,抛一个异常。
  • 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  • 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  • 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是true / false)。

无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue 将会抛出一个 NullPointerException。

可以访问到 BlockingQueue 中的所有元素,而不仅仅是开始和结束的元素。比如说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉。那么你可以调用诸如 remove(o) 方法来将队列之中的特定对象进行移除。但是这么干效率并不高(译者注:基于队列的数据结构,获取除开始或结束位置的其他对象的效率不会太高),因此你尽量不要用这一类的方法,除非你确实不得不那么做。

BlockingQueue 的实现类

BlockingQueue 是个接口,你需要使用它的实现之一来使用BlockingQueue,java.util.concurrent包下具有以下 BlockingQueue 接口的实现类:

  • ArrayBlockingQueue:ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。

  • DelayQueue:DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。

  • LinkedBlockingQueue:LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。

  • PriorityBlockingQueue:PriorityBlockingQueue 是一个无界的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。

  • SynchronousQueue:SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。

使用例子:

阻塞队列的最长使用的例子就是生产者消费者模式,也是各种实现生产者消费者模式方式中首选的方式。使用者不用关心什么阻塞生产,什么时候阻塞消费,使用非常方便,代码如下:

package MyThread;import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;public class BlockingQueueTest {    //生产者    public static class Producer implements Runnable{        private final BlockingQueue<Integer> blockingQueue;        private volatile boolean flag;        private Random random;        public Producer(BlockingQueue<Integer> blockingQueue) {            this.blockingQueue = blockingQueue;            flag=false;            random=new Random();        }        public void run() {            while(!flag){                int info=random.nextInt(100);                try {                    blockingQueue.put(info);                    System.out.println(Thread.currentThread().getName()+" produce "+info);                    Thread.sleep(50);                } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }                           }        }        public void shutDown(){            flag=true;        }    }    //消费者    public static class Consumer implements Runnable{        private final BlockingQueue<Integer> blockingQueue;        private volatile boolean flag;        public Consumer(BlockingQueue<Integer> blockingQueue) {            this.blockingQueue = blockingQueue;        }        public void run() {            while(!flag){                int info;                try {                    info = blockingQueue.take();                    System.out.println(Thread.currentThread().getName()+" consumer "+info);                    Thread.sleep(50);                } catch (InterruptedException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }                           }        }        public void shutDown(){            flag=true;        }    }    public static void main(String[] args){        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>(10);        Producer producer=new Producer(blockingQueue);        Consumer consumer=new Consumer(blockingQueue);        //创建5个生产者,5个消费者        for(int i=0;i<10;i++){            if(i<5){                new Thread(producer,"producer"+i).start();            }else{                new Thread(consumer,"consumer"+(i-5)).start();            }        }        try {            Thread.sleep(1000);        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        producer.shutDown();        consumer.shutDown();    }}

阻塞队列原理:

其实阻塞队列实现阻塞同步的方式很简单,使用的就是是lock锁的多条件(condition)阻塞控制。使用BlockingQueue封装了根据条件阻塞线程的过程,而我们就不用关心繁琐的await/signal操作了。

下面是Jdk 1.7中ArrayBlockingQueue部分代码:

public ArrayBlockingQueue(int capacity, boolean fair) {        if (capacity <= 0)            throw new IllegalArgumentException();        //创建数组            this.items = new Object[capacity];        //创建锁和阻塞条件        lock = new ReentrantLock(fair);           notEmpty = lock.newCondition();        notFull =  lock.newCondition();    }//添加元素的方法public void put(E e) throws InterruptedException {        checkNotNull(e);        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == items.length)                notFull.await();            //如果队列不满就入队            enqueue(e);        } finally {            lock.unlock();        }    } //入队的方法 private void enqueue(E x) {        final Object[] items = this.items;        items[putIndex] = x;        if (++putIndex == items.length)            putIndex = 0;        count++;        notEmpty.signal();    } //移除元素的方法 public E take() throws InterruptedException {        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == 0)                notEmpty.await();            return dequeue();        } finally {            lock.unlock();        }    } //出队的方法 private E dequeue() {        final Object[] items = this.items;        @SuppressWarnings("unchecked")        E x = (E) items[takeIndex];        items[takeIndex] = null;        if (++takeIndex == items.length)            takeIndex = 0;        count--;        if (itrs != null)            itrs.elementDequeued();        notFull.signal();        return x;

双端阻塞队列(BlockingDeque)

concurrent包下还提供双端阻塞队列(BlockingDeque),和BlockingQueue是类似的,只不过BlockingDeque提供从任意一端插入或者抽取元素的队列。

其他文章:
Java并发编程-并发工具包(java.util.concurrent)使用指南(全)

(0)

相关推荐

  • Java高并发24-使用自定义锁生成一个消费模型

    一.使用自定义锁实现生成--消费模型 下面我们使用上节自定义的锁实现一个简单的生产--消费模型,代码如下: package com.ruigege.LockSourceAnalysis6; impor ...

  • JAVA中常见的阻塞队列详解

    在之前的线程池的介绍中我们看到了很多阻塞队列,这篇文章我们主要来说说阻塞队列的事. 阻塞队列也就是 BlockingQueue ,这个类是一个接 口,同时继承了 Queue 接口,这两个接口都是在JD ...

  • java开发技术之Executors创建线程池的弊端

    java开发技术之Executors创建线程池的弊端

  • Java并发队列和容器

    [前言:无论是大数据从业人员还是Java从业人员,掌握Java高并发和多线程是必备技能之一.本文主要阐述Java并发包下的阻塞队列和并发容器,其实研读过大数据相关技术如Spark.Storm等源码的, ...

  • Java并发编程之内置锁(synchronized)

    简介 synchronized在JDK5.0的早期版本中是重量级锁,效率很低,但从JDK6.0开始,JDK在关键字synchronized上做了大量的优化,如偏向锁.轻量级锁等,使它的效率有了很大的提 ...

  • Java并发编程之线程的创建

    简介 线程是基本的调度单位,它被包含在进程之中,是进程中的实际运作单位,它本身是不会独立存在.一个进程至少有一个线程,进程中的多个线程共享进程的资源. Java中创建线程的方式有多种如继承Thread ...

  • Java并发编程实战(5)- 线程生命周期

    在这篇文章中,我们来聊一下线程的生命周期. 目录 概述 操作系统中的线程生命周期 Java中的线程生命周期 Java线程状态转换 运行状态和阻塞状态之间的转换 运行状态和无时限等待状态的切换 运行状态 ...

  • Java并发编程实战(4)- 死锁

    概述 在上一篇文章中,我们讨论了如何使用一个互斥锁去保护多个资源,以银行账户转账为例,当时给出的解决方法是基于Class对象创建互斥锁. 这样虽然解决了同步的问题,但是能在现实中使用吗?答案是不可以, ...

  • 《Java并发编程:设计原则与模式(第二版)》.pdf

    回复"面试"获取全套面试资料 并发任务强调在一个时间段内同时执行,而一个时间段由多个单位时间累积而成,所以说并发的多个任务在单位时间内不一定同时在执行. 在单CPU的时代多个任务都 ...

  • 最强Java并发编程详解:知识点梳理,BAT面试题等

    来源:cnblogs.com/pengdai/p/12026959.html 知识体系系统性梳理 Java 并发之基础 A. Java进阶 - Java 并发之基础:首先全局的了解并发的知识体系,同时 ...

  • (8条消息) Java并发编程之原子性

    线程安全: 当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或协调,这个类都能表现出正确的行为,那么就称这个类时线程安全的. 线程 ...

  • 《Java并发编程的艺术》.pdf

    回复"面试"获取全套面试资料 在当今互联网的时代,大量的互联网应用都面对着海量的访问请求. 并发编程在我们的应用中成为越来越不可或缺的一部分. 通过并发编程的形式,可以将多核CPU ...

  • Java并发多线程编程——Volatile原理与使用

    优质文章,第一时间送达 76套java从入门到精通实战课程分享 一.volitile的理解 Volatile称之为轻量级锁,被volatile修饰的变量,在线程之间是可见的. 可见即一个线程修改了这个 ...