自学内容网 自学内容网

【Java 并发编程】阻塞队列与仿真餐厅

前言


        阻塞队列 (BlockingQueue) 顾名思义是一种支持阻塞操作的队列,因为内部机制中添加了 wait 和 notify 方法,所以阻塞队列具备了线程之前相互协调的功能。阻塞队列主要运用于两个场景,一是生产者与消费者模型,二是线程池。本章节的内容主要是围绕第一种场景进行展开。

        举个生产者与消费者栗子,假设没有阻塞队列,如果生产者与消费者两个线程之间需要通信协调,那么这两个线程的方法中必然有对方的 “影子”。生产者线程方法的 wait 需要消费者线程方法的 notify。那么这两个线程的耦合度就很高,如果生产者这边的系统升级了,可能会间接导致消费者这边用不了阻塞队列将生产者和消费者完全解耦,使它们不需要了解对方的存在,从而简化了系统设计和维护。


前期回顾:【Java 并发编程】单例模式

代码地址:仿真餐厅


目录

前言

阻塞队列

BlockingQueue简介

有界与无界

有界

 无界

阻塞队列作用

ArrayBlockingQueue 使用与实现

简单使用

简单实现

生产者与消费者概念

仿真餐厅

 

阻塞队列


BlockingQueue简介

以上是线程 1 往阻塞队列中添加元素,而线程 2 从阻塞队列中移除元素。

        阻塞队列,首先它是一个队列,不仅继承了队列的所有方法,而且提供了 put 与 take 两种方法。

take 当阻塞队列是空时,从队列中获取元素并弹出元素的操作将会被阻塞。
put  当阻塞队列是满时,从队列中添加元素的操作将会被阻塞。

有界与无界

        首先阻塞队列有很多实例,如 LinkedBlockingQueue、ArrayBlockingQueue 等。而阻塞队列中能容纳的元素个数通常情况下是有限的。

有界

        比如说,我们去实例化一个 ArrayBlockingQueue 的阻塞队列,就可以在构造方法中传入一个整型的数字,表示这个基于数组的阻塞队列最大能够容纳的元素个数。这种我们称之为 “有界队列”。

 无界
    public final class Integer extends Number implements Comparable<Integer>, Constable, ConstantDesc {
    public static final int MAX_VALUE = 2147483647;
    ...
    }

    public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {
    ...
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    ...
    }

        无界队列是没有设置固定大小的队列,但是它并不像我们理解的那样没有任何限制,而是它的元素存储范围很大。像 LinkedBlockingQueue 内部默认长度是 MAX_VALUE,所以我们感觉不到它的长度限制。注意,无界队列潜在一定的风险,如果在并发量比较大的情况下,线程池中几乎可以无限制的添加任务,容易导致内存溢出问题。


阻塞队列作用

        在传统的生产者与消费者模式下,假设存在多个生产者线程和消费者线程,它们共享一个有限容量的缓冲池(阻塞队列)。生产者线程负责生成资源并将其存入缓冲池,而消费者线程则从缓冲池取出资源进行消费。如果直接使用普通的非同步队列,在多线程环境下进行资源的存取操作时,可能会出现以下问题:

  1. 线程安全问题:当多个线程同时访问同一个队列时,可能出现竞态条件导致的数据不一致,例如重复消费、丢失数据或者数据状态错乱。

  2. 死锁与活跃性问题:在没有正确同步机制的情况下,生产者和消费者线程可能陷入互相等待对方释放资源的状态,从而导致死锁;或者当缓冲区已满/空时,线程因无法继续执行而进入无限期等待状态,影响系统整体的效率和响应性。

  3. 自定义同步逻辑复杂:为了解决上述问题,开发者需要自行编写复杂的等待、通知逻辑,即当队列满时阻止生产者添加元素,唤醒消费者消费;反之,当队列空时阻止消费者获取元素,唤醒生产者填充资源。这些逻辑容易出错且不易维护。


ArrayBlockingQueue 使用与实现

简单使用

        通过上述可知 ArrayBlockingQueue 是一个数组实现的阻塞队列需要指定存储大小。我们可以写个简单的生产者与消费者案例:

class ArrayBlockingQueueTest {
    private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10);
    static class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    blockingQueue.put(i);
                    System.out.println("生产者生产数据:" + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    Integer data = blockingQueue.take();
                    System.out.println("消费者消费数据:" + data);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

class Test{
    public static void main(String[] args) {
        Thread t1 = new Thread(new ArrayBlockingQueueTest.Producer());
        Thread t2 = new Thread(new ArrayBlockingQueueTest.Consumer());
        t1.start();
        t2.start();
    }
}

运行结果:

消费者消费数据:0
生产者生产数据:0
生产者生产数据:1
生产者生产数据:2
生产者生产数据:3
...
生产者生产数据:98
消费者消费数据:98
生产者生产数据:99
消费者消费数据:99

        以上是生产者频繁生产数据与消费者频繁消费数据的例子。

ArrayBlockingQueue 核心源码:

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {
 /* 使用数组存储队列中的元素 */
 final Object[] items;

 /* 使用独占锁ReetrantLock */
 final ReentrantLock lock;

 /* 等待出队的条件 */
 private final Condition notEmpty;

 /* 等待入队的条件 */
 private final Condition notFull;

 /* 初始化时,需要指定队列大小 */
 public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    /* 初始化时,也指出指定是否为公平锁 */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    /*入队操作*/
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    /*出队操作*/
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
}

        接下来我们模拟实现一个属于自己的 ArrayBlockingQueue 的代码。

简单实现

class MyArrayBlockingQueue<E>{
    private E[] elem;
    private volatile int head;
    private volatile int tail;
    private volatile int size;
    private final int capacity;
    
    public MyArrayBlockingQueue(int InitCapacity){
        this.capacity = InitCapacity;
        elem = (E[]) new Object[InitCapacity];
    }
    
    public synchronized void put(E e) throws InterruptedException {
        while (size == capacity) {
            this.wait();
        }
        elem[tail] = e;
        size++;
        tail = (tail + 1) % capacity;
        this.notify();
    }
    
    public synchronized E take() throws InterruptedException {
        while (size == 0) {
            this.wait();
        }
        E e = elem[head];
        size--;
        head = (head + 1) % capacity;
        this.notify();
        return e;
    }
}

代码测试:

class Producer extends Thread {

    private MyArrayBlockingQueue container;

    public Producer(MyArrayBlockingQueue container) {
        this.container = container;
    }

    private void Func() throws InterruptedException {
        HashMap<Integer,String> FoodMenu = new HashMap<>();
        FoodMenu.put(0,"红烧牛肉");
        FoodMenu.put(1,"茄子青椒");
        FoodMenu.put(2,"荔浦芋头");
        for (int i = 0; i < 3; i++) {
            container.put(FoodMenu.get(i));
            System.out.println("生产了"+FoodMenu.get(i));
        }
    }

    @Override
    public void run() {
        try {
            Func();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}


class Consumer extends Thread {

    private MyArrayBlockingQueue container;

    public Consumer(MyArrayBlockingQueue container) {
        this.container = container;
    }

    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
            try {
                System.out.println("消费了"+container.take());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

class Test{
    public static void main(String[] args) {
        MyArrayBlockingQueue<String> mab = new MyArrayBlockingQueue<>(10);
        Producer producer = new Producer(mab);
        Consumer consumer = new Consumer(mab);
        producer.start();
        consumer.start();
    }
}

运行结果: 

生产了红烧牛肉
生产了茄子青椒
消费了红烧牛肉
消费了茄子青椒
生产了荔浦芋头
消费了荔浦芋头

生产者与消费者概念

        生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

如果缓冲区已经满了,则生产者线程阻塞。
如果缓冲区已经空了,则消费者线程阻塞。

仿真餐厅

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

// 下单
class Course {
    private static Random rand = new Random();
    private static int foodNum = 10;
    public static Food[] foods = new Food[10];
    static {
        HashMap<Integer,String> FoodMenu = new HashMap<>();
        FoodMenu.put(0,"红烧牛肉");
        FoodMenu.put(1,"茄子青椒");
        FoodMenu.put(2,"荔浦芋头");
        FoodMenu.put(3,"红烧排骨");
        FoodMenu.put(4,"宫保鸡丁");
        FoodMenu.put(5,"鱼香肉丝");
        FoodMenu.put(6,"回锅肉饭");
        FoodMenu.put(7,"青椒炒肉");
        FoodMenu.put(8,"爆炒鱿鱼");
        FoodMenu.put(9,"油焖大虾");
        for (int i = 0; i < foodNum; i++) {
            foods[i] = new Food(FoodMenu.get(i));
        }
    }
    public static Food randomSelection() {
        return foods[rand.nextInt(foodNum)];
    }
}

// 食物
class Food {
    String foodName;
    public Food(String name) {
        foodName = name;
    }
    @Override
    public String toString() {
        return "食物 " + foodName + " ";
    }
}

//订单类,记录点菜的Customer,点的Food,服务员是谁,由顾客产生
class Order {
    private Food food;
    private Customer customer;
    private WaitPerson waitPerson;
    public Order(Food food, Customer customer, WaitPerson waitPerson) {
        this.food = food;
        this.customer = customer;
        this.waitPerson = waitPerson;
    }
    public Food getFood() {
        return food;
    }
    public void setFood(Food food) {
        this.food = food;
    }
    public Customer getCustomer() {
        return customer;
    }
    public void setCustomer(Customer customer) {
        this.customer = customer;
    }
    public WaitPerson getWaitPerson() {
        return waitPerson;
    }
    public void setWaitPerson(WaitPerson waitPerson) {
        this.waitPerson = waitPerson;
    }
    @Override
    public String toString() {
        return customer + "点了" + food + "由服务员"
                + waitPerson + "服务";
    }
}

//用来装食物的碟子,被waitPerson所使用  由Chef产生,放入WaitPerson的队列中
class Plate {
    public Plate(Food food, Order order) {
        this.food = food;
        this.order = order;
    }
    private Food food;
    private Order order;
    public Food getFood() {
        return food;
    }
    public void setFood(Food food) {
        this.food = food;
    }
    public Order getOrder() {
        return order;
    }
    public void setOrder(Order order) {
        this.order = order;
    }
}

//顾客类,由餐厅产生,点餐,等餐,吃饭,走人
class Customer implements Runnable {
    private static Random rand = new Random();
    private static int counter = 0;
    // 借助共享的静态变量counter,构造出id自增的食客对象
    private final int id = counter++;
    // 从中取出食物吃
    private final SynchronousQueue<Plate> plate;
    //Customer 最关心的是waitPerson,因为他下单或者桌子上的食物都是由WaitPerson服务的
    private final WaitPerson waitPerson;
    public Customer(WaitPerson waitPerson) {
        plate = new SynchronousQueue<>();
        this.waitPerson = waitPerson;
    }
    public void putPlate(Plate p) throws InterruptedException {
        this.plate.put(p);
    }
    public void placeOrder() throws InterruptedException {
    //模拟下单时间
        TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(300));
        Order order = new Order(Course.randomSelection(), this, waitPerson);
        System.out.println(order);
        waitPerson.plateOrder(order);;
    }
    @Override
    public void run() {
        try {
            do {
                placeOrder();
                Plate p = this.plate.take();
                // 模拟吃饭的时间
                TimeUnit.MILLISECONDS.sleep(200 + rand.nextInt(1000));
                System.out.println(this + "正在吃 " + p.getFood());
                // 有1/2的机会还想再吃一个,没吃饱
            } while (rand.nextBoolean());
        } catch (InterruptedException e) {
            System.out.println(this + "吃饭被终止!");
        }
        System.out.println(this + "吃完买单");
    }
    @Override
    public String toString() {
        return "顾客" + id + "号 ";
    }
}

//向BlockingQueue这种东西是不应该暴露给别的类的,最好只是暴露接口给别人
//服务员,从Chef手中拿食物给对应的Customer,每一个waitPerson都维护着自己的一个盘子队列
//不停的取食物然后送给对应的customer
class WaitPerson implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private final BlockingQueue<Plate> plates;
    private final Restaurant restaurant;
    public WaitPerson(Restaurant restaurant) {
        this.plates = new LinkedBlockingQueue<>();
        this.restaurant = restaurant;
    }
    public void plateOrder(Order order){
        restaurant.addOrder(order);
    }

    public void putPlate(Plate plate) throws InterruptedException {
        this.plates.put(plate);
    }
    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                Plate plate = plates.take();
                //在盘子取出订单
                Order order = plate.getOrder();
                //模拟移动到顾客旁边的时间
                TimeUnit.MILLISECONDS.sleep(300);
                //在订单中找到Customer
                Customer customer = order.getCustomer();
                //然后把食物给Plate给Customer
                customer.putPlate(plate);
            }
        } catch (InterruptedException e) {
            System.out.println(this+ "服务被终止!");
        }
    }
    @Override
    public String toString() {
        return " Waiter" + id + "号 ";
    }
}
//厨师类, 接收order产生食物并且装plate,然后给WaitPerson
class Chef implements Runnable {
    private static Random rand = new Random();
    private static int counter = 0;
    private final int id = counter++;
    private final Restaurant restaurant;
    public Chef(Restaurant restaurant) {
        this.restaurant = restaurant;
    }
    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                Order order = this.restaurant.takeOrder();
                Food food = order.getFood();
                //模拟做菜的时间
                TimeUnit.MILLISECONDS.sleep(rand.nextInt(800) + 100);
                Plate plate = new Plate(food, order);
                order.getWaitPerson().putPlate(plate);
            }
        } catch (InterruptedException e) {
            System.out.println(this + "炒菜被终止!");
        }
        System.out.println(this + "下班!");
    }
    @Override
    public String toString() {
        return "厨师:" + id + "号 ";
    }

}

//餐厅类,负责协调管理WaitPerson,Chef,Customer队伍, 并且让他们都run起来
//同时还自动每隔一段时间产生一名Customer
class Restaurant implements Runnable {
    private ArrayList<WaitPerson> waitPersons = new ArrayList<>();
    private ArrayList<Chef> chefs = new ArrayList<>();
    private BlockingQueue<Order> orders = new LinkedBlockingQueue<>();
    private ExecutorService exec;
    private static Random rand = new Random();
    private int nWaitPerson;
    //让WaitPerson Chef都工作起来
    public Restaurant(int nWaitPersons, int nChefs, ExecutorService exec) {
        this.exec = exec;
        this.nWaitPerson = nWaitPersons;
        for (int i = 0; i < nWaitPersons; i++) {
            WaitPerson waitPerson = new WaitPerson(this);
            waitPersons.add(waitPerson);
            exec.execute(waitPerson);
        }
        for (int i = 0; i < nChefs; i++) {
            Chef chef = new Chef(this);
            chefs.add(chef);
            exec.execute(chef);
        }
    }
    //接单
    public void addOrder(Order order) {
        orders.add(order);
    }
    //厨师取单
    public Order takeOrder() throws InterruptedException {
        return this.orders.take();
    }
    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                Customer customer = new Customer(
                        this.waitPersons.get(rand.nextInt(nWaitPerson)));
                TimeUnit.MILLISECONDS.sleep(300 + rand.nextInt(400));
                exec.execute(customer);
            }
        } catch (InterruptedException e) {
            System.out.println("餐厅运营被终止!");
        }
        System.out.println("餐厅打样~");
    }

}

class RestaurantWithQueues {
    public static void main(String[] args) throws IOException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Restaurant restaurant = new Restaurant(10, 10, exec);
        exec.execute(restaurant);
        System.out.println("按回车键退出>:");
        System.in.read();
        exec.shutdownNow();
    }
}

运行结果:

按回车键退出>:
顾客0号 点了食物 荔浦芋头 由服务员 Waiter7号 服务
顾客1号 点了食物 茄子青椒 由服务员 Waiter2号 服务
顾客2号 点了食物 宫保鸡丁 由服务员 Waiter9号 服务
顾客0号 正在吃 食物 荔浦芋头 
顾客0号 点了食物 青椒炒肉 由服务员 Waiter7号 服务
顾客3号 点了食物 爆炒鱿鱼 由服务员 Waiter0号 服务
顾客1号 正在吃 食物 茄子青椒 
顾客2号 正在吃 食物 宫保鸡丁 
顾客1号 点了食物 红烧排骨 由服务员 Waiter2号 服务
顾客4号 点了食物 宫保鸡丁 由服务员 Waiter3号 服务
顾客0号 正在吃 食物 青椒炒肉 
顾客2号 点了食物 宫保鸡丁 由服务员 Waiter9号 服务
顾客0号 点了食物 爆炒鱿鱼 由服务员 Waiter7号 服务
顾客5号 点了食物 爆炒鱿鱼 由服务员 Waiter6号 服务
顾客1号 正在吃 食物 红烧排骨 


原文地址:https://blog.csdn.net/2301_79201049/article/details/142990620

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!