Java并发容器使用
Java并发容器使用
ArrayBlockingQueue使用
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(8);
// 阻塞添加
queue.add("1");
queue.add("2");
queue.add("3");
System.out.println(queue);
// 非阻塞添加
queue.put("4");
queue.put("5");
System.out.println(queue);
// 阻塞添加,但队列满不会进入阻塞
queue.offer("6");
queue.offer("7");
System.out.println(queue);
// 指定时间内进入阻塞
queue.offer("8", 3, TimeUnit.SECONDS);
queue.offer("9", 3, TimeUnit.SECONDS);// 8个队列满,不会进入阻塞
System.out.println(queue);
// 从头部获取数据并移出,如果队列为空进入阻塞直到有数据添加
String take = queue.take();
System.out.println(take);
// 获取数据并移出队列第一个元素,如果队列为空将会阻塞指定的时间,直到在此期间有新数据写入,或者当前线程被其他线程中断,当线程超时会返回null
String poll = queue.poll(3, TimeUnit.SECONDS);
System.out.println(poll);
// 从头部获取数据并移出,如果队列为空不会阻塞
String poll1 = queue.poll();
System.out.println(poll1);
// 从头部获取数据不会移除,队列为空不会阻塞直接返回null
String peek = queue.peek();
System.out.println(peek);
PriorityBlockingQueue使用
// 无边界队列,可以定义初始容量并不是最大容量
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>(2);
System.out.println(queue);
// 队列的添加方法都等于offer方法
queue.offer(1);
queue.offer(10);
queue.offer(3);
System.out.println(queue);
LinkedBlockingQueue使用
// 属于可选边界
LinkedBlockingQueue<Integer> integers = new LinkedBlockingQueue<>(10);
boolean b = integers.remainingCapacity() == 10;
System.out.println(b);
// 等于定义的
LinkedBlockingQueue<Integer> integers1 = new LinkedBlockingQueue<>();
boolean b1 = integers1.remainingCapacity() == Integer.MAX_VALUE;
System.out.println(b1);
InterruptedException使用
public class AtomicExample08 {
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedEntry> queue = new DelayQueue<>();
queue.add(new DelayedEntry("A", 1000L));
queue.add(new DelayedEntry("B", 1000L));
queue.add(new DelayedEntry("C", 1000L));
queue.add(new DelayedEntry("D", 1000L));
queue.add(new DelayedEntry("E", 1000L));
queue.add(new DelayedEntry("F", 1000L));
queue.add(new DelayedEntry("G", 1000L));
queue.add(new DelayedEntry("H", 1000L));
queue.add(new DelayedEntry("I", 1000L));
// 非阻塞读取,立即返回但不移除头部元素,队列为空返回null
assert queue.peek() != null;
System.out.println(queue.peek().value);
// 非阻塞读取,当队列为空或者头部元素未达到过期时间返回值为null
System.out.println(Objects.requireNonNull(queue.poll()).value);
// 最大阻塞单位时间,到达阻塞单位时间后,此刻为空或者头部元素未达到过期时间返回值为null,否则立即移出头部元素
System.out.println(Objects.requireNonNull(queue.poll(3, TimeUnit.SECONDS)).value);
// 会一直阻塞到队列中有元素,并且队列头部元素达到过期时间,之后从队列中移除并返回
System.out.println(queue.take().value);
}
// 首先要实现Delayed接口
@Getter
static class DelayedEntry implements Delayed {
private final String value;
private final long time;
private DelayedEntry(String value, long time) {
this.time = time;
this.value = value;
}
@Override
public long getDelay(TimeUnit unit) {
long delta = time - System.currentTimeMillis();
return unit.convert(delta, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
if (this.time < ((DelayedEntry) o).time) {
return -1;
} else if (this.time > ((DelayedEntry) o).time) {
return 1;
}
return 0;
}
}
}
ConcurrentLinkedQueue(并发队列)性能
ConcurrentLinkedQueue要优秀很多
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@Measurement(iterations = 10)
@Warmup(iterations = 10)
@BenchmarkMode(Mode.AverageTime)
@State(Scope.Thread)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public class AtomicExample09 {
private final static String DATA = "TEST";
private final static Object LOCK = new Object();
private SynchronizedLinkedList synchronizedLinkedList;
private ConcurrentLinkedQueue<String> concurrentLinkedQueue;
// 如果使用不当反而会降低性能
public static void main(String[] args) throws RunnerException {
Options options = new OptionsBuilder()
.include(AtomicExample09.class.getSimpleName())
.forks(1)
.build();
new Runner(options).run();
}
@Setup(Level.Iteration)
public void setUp() {
synchronizedLinkedList = new SynchronizedLinkedList();
concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
}
// 测试 SynchronizedLinkedList
@Group("sync")
@Benchmark
@GroupThreads(5)
public void synchronizedListAdd() {
synchronizedLinkedList.addLast(DATA);
}
@Group("sync")
@Benchmark
@GroupThreads(5)
public String synchronizedListGet() {
return synchronizedLinkedList.removeFirst();
}
// 测试 ConcurrentLinkedQueue
@Group("concurrent")
@Benchmark
@GroupThreads(5)
public void concurrentLinkedQueueAdd() {
concurrentLinkedQueue.offer(DATA);
}
@Group("concurrent")
@Benchmark
@GroupThreads(5)
public String concurrentLinkedQueueGet() {
return concurrentLinkedQueue.poll();
}
private static class SynchronizedLinkedList {
private final LinkedList<String> list = new LinkedList<>();
void addLast(String element) {
synchronized (LOCK) {
list.addLast(element);
}
}
String removeFirst() {
synchronized (LOCK) {
if (list.isEmpty()) {
return null;
}
return list.removeLast();
}
}
}
}
// 测试结果
// Benchmark Mode Cnt Score Error Units
// AtomicExample09.concurrent avgt 10 0.221 ± 0.649 us/op
// AtomicExample09.concurrent:concurrentLinkedQueueAdd avgt 10 0.407 ± 1.296 us/op
// AtomicExample09.concurrent:concurrentLinkedQueueGet avgt 10 0.034 ± 0.048 us/op
// AtomicExample09.sync avgt 10 0.240 ± 0.039 us/op
// AtomicExample09.sync:synchronizedListAdd avgt 10 0.232 ± 0.034 us/op
// AtomicExample09.sync:synchronizedListGet avgt 10 0.248 ± 0.044 us/op
测试需要下载 pom 包
<dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.36</version> </dependency> <!-- slf4j --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>2.0.16</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>2.0.16</version> </dependency> <!-- jmh --> <dependency> <groupId>org.openjdk.jmh</groupId> <artifactId>jmh-core</artifactId> <version>1.19</version> </dependency> <dependency> <groupId>org.openjdk.jmh</groupId> <artifactId>jmh-generator-annprocess</artifactId> <version>1.19</version> </dependency> </dependencies>
原文地址:https://blog.csdn.net/weixin_46533577/article/details/145286524
免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!