Appearance
🔧 并发工具类
并发工具类概述
Java 5 引入的 java.util.concurrent 包提供了丰富的并发工具类,简化多线程编程。
ExecutorService
线程池
java
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交任务
executor.submit(() -> {
System.out.println("任务执行");
});
// 关闭线程池
executor.shutdown();常用线程池类型
java
// 固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
// 单线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();
// 缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 定时任务线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);线程池最佳实践(替代 Executors 默认)
java
ThreadPoolExecutor ioPool = new ThreadPoolExecutor(
16, 64,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10_000),
new ThreadFactoryBuilder().setNameFormat("io-pool-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);- 命名线程便于排障;合理队列容量与拒绝策略,避免 OOM 与任务丢失。
Future 和 Callable
java
// 使用 Callable 提交任务
ExecutorService executor = Executors.newFixedThreadPool(5);
Future<Integer> future = executor.submit(() -> {
Thread.sleep(1000);
return 42;
});
// 获取结果
try {
Integer result = future.get(); // 阻塞等待结果
System.out.println("结果: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}CompletableFuture(进阶)
java
CompletableFuture<String> user = CompletableFuture.supplyAsync(() -> getUser());
CompletableFuture<List<Order>> orders = CompletableFuture.supplyAsync(() -> getOrders());
String result = user.thenCombine(orders, (u, os) -> render(u, os))
.orTimeout(500, TimeUnit.MILLISECONDS)
.exceptionally(ex -> "fallback")
.join();- 组合/聚合/异常链路与超时控制;
allOf/anyOf支持并发聚合。
CountDownLatch
java
// 倒计时门闩
CountDownLatch latch = new CountDownLatch(3);
// 多个线程
for (int i = 0; i < 3; i++) {
new Thread(() -> {
// 执行任务
latch.countDown(); // 计数减1
}).start();
}
// 等待所有线程完成
latch.await();
System.out.println("所有任务完成");CyclicBarrier
java
// 循环屏障
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有线程到达屏障");
});
// 多个线程
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
// 执行任务
barrier.await(); // 等待所有线程到达
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}Semaphore
java
// 信号量
Semaphore semaphore = new Semaphore(3); // 最多3个线程同时访问
// 多个线程
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
// 执行任务
semaphore.release(); // 释放许可
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}Phaser(阶段同步器)
java
Phaser phaser = new Phaser(1); // 注册主线程
for (int i = 0; i < 3; i++) {
phaser.register();
int phaseNo = i;
new Thread(() -> {
// 阶段1
phaser.arriveAndAwaitAdvance();
// 阶段2
phaser.arriveAndAwaitAdvance();
phaser.arriveAndDeregister();
}, "t-" + phaseNo).start();
}
phaser.arriveAndDeregister(); // 释放主线程Exchanger(数据交换)
java
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try { exchanger.exchange("A"); } catch (InterruptedException ignored) {}
}).start();
new Thread(() -> {
try { String v = exchanger.exchange("B"); } catch (InterruptedException ignored) {}
}).start();并发集合与累加器
java
ConcurrentHashMap<String, Long> map = new ConcurrentHashMap<>();
map.merge("k", 1L, Long::sum);
LongAdder adder = new LongAdder();
adder.increment();
long sum = adder.sum();锁与同步器(Lock/StampedLock 概览)
ReentrantLock:可中断、公平锁、条件队列;ReentrantReadWriteLock:读多写少;StampedLock:乐观读,需校验戳;不支持重入与条件变量。
java
StampedLock lock = new StampedLock();
long stamp = lock.tryOptimisticRead();
int v = data;
if (!lock.validate(stamp)) {
stamp = lock.readLock();
try { v = data; } finally { lock.unlockRead(stamp); }
}ForkJoin 与并行计算
java
class SumTask extends RecursiveTask<Long> {
final long[] arr; final int lo, hi;
SumTask(long[] a, int l, int h) { arr=a; lo=l; hi=h; }
protected Long compute() {
if (hi - lo <= 10_000) {
long s=0; for (int i=lo;i<hi;i++) s+=arr[i]; return s;
}
int mid = (lo + hi) >>> 1;
SumTask left = new SumTask(arr, lo, mid);
SumTask right = new SumTask(arr, mid, hi);
left.fork();
long r = right.compute() + left.join();
return r;
}
}RateLimiter(Guava 扩展)与限流思路
java
RateLimiter limiter = RateLimiter.create(100.0); // 每秒100个许可
limiter.acquire(); // 获取1个许可- 限流常与队列/拒绝策略/降级结合,保障系统稳态。
下一步
掌握了并发工具类后,可以继续学习:
- JVM 基础 - 学习 JVM 原理
- Java 8+ 新特性 - 学习 Java 新特性
💡 提示:并发工具类提供了更高级的并发控制,合理使用可以提高程序性能
