自学内容网 自学内容网

Java并发容器使用

Java并发容器使用

ArrayBlockingQueue使用

ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(8);

// 阻塞添加
queue.add("1");
queue.add("2");
queue.add("3");
System.out.println(queue);

// 非阻塞添加
queue.put("4");
queue.put("5");
System.out.println(queue);

// 阻塞添加,但队列满不会进入阻塞
queue.offer("6");
queue.offer("7");
System.out.println(queue);

// 指定时间内进入阻塞
queue.offer("8", 3, TimeUnit.SECONDS);
queue.offer("9", 3, TimeUnit.SECONDS);// 8个队列满,不会进入阻塞
System.out.println(queue);

// 从头部获取数据并移出,如果队列为空进入阻塞直到有数据添加
String take = queue.take();
System.out.println(take);

// 获取数据并移出队列第一个元素,如果队列为空将会阻塞指定的时间,直到在此期间有新数据写入,或者当前线程被其他线程中断,当线程超时会返回null
String poll = queue.poll(3, TimeUnit.SECONDS);
System.out.println(poll);

// 从头部获取数据并移出,如果队列为空不会阻塞
String poll1 = queue.poll();
System.out.println(poll1);

// 从头部获取数据不会移除,队列为空不会阻塞直接返回null
String peek = queue.peek();
System.out.println(peek);

PriorityBlockingQueue使用

// 无边界队列,可以定义初始容量并不是最大容量
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>(2);
System.out.println(queue);

// 队列的添加方法都等于offer方法
queue.offer(1);
queue.offer(10);
queue.offer(3);
System.out.println(queue);

LinkedBlockingQueue使用

// 属于可选边界
LinkedBlockingQueue<Integer> integers = new LinkedBlockingQueue<>(10);
boolean b = integers.remainingCapacity() == 10;
System.out.println(b);

// 等于定义的
LinkedBlockingQueue<Integer> integers1 = new LinkedBlockingQueue<>();
boolean b1 = integers1.remainingCapacity() == Integer.MAX_VALUE;
System.out.println(b1);

InterruptedException使用

public class AtomicExample08 {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayedEntry> queue = new DelayQueue<>();
        queue.add(new DelayedEntry("A", 1000L));
        queue.add(new DelayedEntry("B", 1000L));
        queue.add(new DelayedEntry("C", 1000L));
        queue.add(new DelayedEntry("D", 1000L));
        queue.add(new DelayedEntry("E", 1000L));
        queue.add(new DelayedEntry("F", 1000L));
        queue.add(new DelayedEntry("G", 1000L));
        queue.add(new DelayedEntry("H", 1000L));
        queue.add(new DelayedEntry("I", 1000L));

        // 非阻塞读取,立即返回但不移除头部元素,队列为空返回null
        assert queue.peek() != null;
        System.out.println(queue.peek().value);

        // 非阻塞读取,当队列为空或者头部元素未达到过期时间返回值为null
        System.out.println(Objects.requireNonNull(queue.poll()).value);

        // 最大阻塞单位时间,到达阻塞单位时间后,此刻为空或者头部元素未达到过期时间返回值为null,否则立即移出头部元素
        System.out.println(Objects.requireNonNull(queue.poll(3, TimeUnit.SECONDS)).value);

        // 会一直阻塞到队列中有元素,并且队列头部元素达到过期时间,之后从队列中移除并返回
        System.out.println(queue.take().value);
    }

    // 首先要实现Delayed接口
    @Getter
    static class DelayedEntry implements Delayed {
        private final String value;
        private final long time;

        private DelayedEntry(String value, long time) {
            this.time = time;
            this.value = value;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            long delta = time - System.currentTimeMillis();
            return unit.convert(delta, TimeUnit.MILLISECONDS);
        }


        @Override
        public int compareTo(Delayed o) {
            if (this.time < ((DelayedEntry) o).time) {
                return -1;
            } else if (this.time > ((DelayedEntry) o).time) {
                return 1;
            }
            return 0;
        }
    }
}

ConcurrentLinkedQueue(并发队列)性能

ConcurrentLinkedQueue要优秀很多

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;

@Measurement(iterations = 10)
@Warmup(iterations = 10)
@BenchmarkMode(Mode.AverageTime)
@State(Scope.Thread)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public class AtomicExample09 {
    private final static String DATA = "TEST";
    private final static Object LOCK = new Object();
    private SynchronizedLinkedList synchronizedLinkedList;
    private ConcurrentLinkedQueue<String> concurrentLinkedQueue;

    // 如果使用不当反而会降低性能
    public static void main(String[] args) throws RunnerException {
        Options options = new OptionsBuilder()
                .include(AtomicExample09.class.getSimpleName())
                .forks(1)
                .build();
        new Runner(options).run();
    }

    @Setup(Level.Iteration)
    public void setUp() {
        synchronizedLinkedList = new SynchronizedLinkedList();
        concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
    }

    // 测试 SynchronizedLinkedList
    @Group("sync")
    @Benchmark
    @GroupThreads(5)
    public void synchronizedListAdd() {
        synchronizedLinkedList.addLast(DATA);
    }

    @Group("sync")
    @Benchmark
    @GroupThreads(5)
    public String synchronizedListGet() {
        return synchronizedLinkedList.removeFirst();
    }

    // 测试 ConcurrentLinkedQueue
    @Group("concurrent")
    @Benchmark
    @GroupThreads(5)
    public void concurrentLinkedQueueAdd() {
        concurrentLinkedQueue.offer(DATA);
    }

    @Group("concurrent")
    @Benchmark
    @GroupThreads(5)
    public String concurrentLinkedQueueGet() {
        return concurrentLinkedQueue.poll();
    }

    private static class SynchronizedLinkedList {
        private final LinkedList<String> list = new LinkedList<>();

        void addLast(String element) {
            synchronized (LOCK) {
                list.addLast(element);
            }
        }

        String removeFirst() {
            synchronized (LOCK) {
                if (list.isEmpty()) {
                    return null;
                }
                return list.removeLast();
            }
        }
    }
}
// 测试结果
// Benchmark                                            Mode  Cnt  Score   Error  Units
// AtomicExample09.concurrent                           avgt   10  0.221 ± 0.649  us/op
// AtomicExample09.concurrent:concurrentLinkedQueueAdd  avgt   10  0.407 ± 1.296  us/op
// AtomicExample09.concurrent:concurrentLinkedQueueGet  avgt   10  0.034 ± 0.048  us/op
// AtomicExample09.sync                                 avgt   10  0.240 ± 0.039  us/op
// AtomicExample09.sync:synchronizedListAdd             avgt   10  0.232 ± 0.034  us/op
// AtomicExample09.sync:synchronizedListGet             avgt   10  0.248 ± 0.044  us/op

测试需要下载 pom 包

<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.36</version>
    </dependency>
    <!-- slf4j -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>2.0.16</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>2.0.16</version>
    </dependency>
    <!-- jmh -->
    <dependency>
        <groupId>org.openjdk.jmh</groupId>
        <artifactId>jmh-core</artifactId>
        <version>1.19</version>
    </dependency>
    <dependency>
        <groupId>org.openjdk.jmh</groupId>
        <artifactId>jmh-generator-annprocess</artifactId>
        <version>1.19</version>
    </dependency>
</dependencies>

原文地址:https://blog.csdn.net/weixin_46533577/article/details/145286524

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!