Skip to content

🔧 并发工具类

并发工具类概述

Java 5 引入的 java.util.concurrent 包提供了丰富的并发工具类,简化多线程编程。

ExecutorService

线程池

java
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(5);

// 提交任务
executor.submit(() -> {
    System.out.println("任务执行");
});

// 关闭线程池
executor.shutdown();

常用线程池类型

java
// 固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(5);

// 单线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();

// 缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();

// 定时任务线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);

线程池最佳实践(替代 Executors 默认)

java
ThreadPoolExecutor ioPool = new ThreadPoolExecutor(
    16, 64,
    60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(10_000),
    new ThreadFactoryBuilder().setNameFormat("io-pool-%d").build(),
    new ThreadPoolExecutor.CallerRunsPolicy()
);
  • 命名线程便于排障;合理队列容量与拒绝策略,避免 OOM 与任务丢失。

Future 和 Callable

java
// 使用 Callable 提交任务
ExecutorService executor = Executors.newFixedThreadPool(5);

Future<Integer> future = executor.submit(() -> {
    Thread.sleep(1000);
    return 42;
});

// 获取结果
try {
    Integer result = future.get();  // 阻塞等待结果
    System.out.println("结果: " + result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

CompletableFuture(进阶)

java
CompletableFuture<String> user = CompletableFuture.supplyAsync(() -> getUser());
CompletableFuture<List<Order>> orders = CompletableFuture.supplyAsync(() -> getOrders());

String result = user.thenCombine(orders, (u, os) -> render(u, os))
    .orTimeout(500, TimeUnit.MILLISECONDS)
    .exceptionally(ex -> "fallback")
    .join();
  • 组合/聚合/异常链路与超时控制;allOf/anyOf 支持并发聚合。

CountDownLatch

java
// 倒计时门闩
CountDownLatch latch = new CountDownLatch(3);

// 多个线程
for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        // 执行任务
        latch.countDown();  // 计数减1
    }).start();
}

// 等待所有线程完成
latch.await();
System.out.println("所有任务完成");

CyclicBarrier

java
// 循环屏障
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println("所有线程到达屏障");
});

// 多个线程
for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        try {
            // 执行任务
            barrier.await();  // 等待所有线程到达
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }).start();
}

Semaphore

java
// 信号量
Semaphore semaphore = new Semaphore(3);  // 最多3个线程同时访问

// 多个线程
for (int i = 0; i < 10; i++) {
    new Thread(() -> {
        try {
            semaphore.acquire();  // 获取许可
            // 执行任务
            semaphore.release();  // 释放许可
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}

Phaser(阶段同步器)

java
Phaser phaser = new Phaser(1); // 注册主线程
for (int i = 0; i < 3; i++) {
    phaser.register();
    int phaseNo = i;
    new Thread(() -> {
        // 阶段1
        phaser.arriveAndAwaitAdvance();
        // 阶段2
        phaser.arriveAndAwaitAdvance();
        phaser.arriveAndDeregister();
    }, "t-" + phaseNo).start();
}
phaser.arriveAndDeregister(); // 释放主线程

Exchanger(数据交换)

java
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
    try { exchanger.exchange("A"); } catch (InterruptedException ignored) {}
}).start();
new Thread(() -> {
    try { String v = exchanger.exchange("B"); } catch (InterruptedException ignored) {}
}).start();

并发集合与累加器

java
ConcurrentHashMap<String, Long> map = new ConcurrentHashMap<>();
map.merge("k", 1L, Long::sum);

LongAdder adder = new LongAdder();
adder.increment();
long sum = adder.sum();

锁与同步器(Lock/StampedLock 概览)

  • ReentrantLock:可中断、公平锁、条件队列;
  • ReentrantReadWriteLock:读多写少;
  • StampedLock:乐观读,需校验戳;不支持重入与条件变量。
java
StampedLock lock = new StampedLock();
long stamp = lock.tryOptimisticRead();
int v = data;
if (!lock.validate(stamp)) {
    stamp = lock.readLock();
    try { v = data; } finally { lock.unlockRead(stamp); }
}

ForkJoin 与并行计算

java
class SumTask extends RecursiveTask<Long> {
    final long[] arr; final int lo, hi;
    SumTask(long[] a, int l, int h) { arr=a; lo=l; hi=h; }
    protected Long compute() {
        if (hi - lo <= 10_000) {
            long s=0; for (int i=lo;i<hi;i++) s+=arr[i]; return s;
        }
        int mid = (lo + hi) >>> 1;
        SumTask left = new SumTask(arr, lo, mid);
        SumTask right = new SumTask(arr, mid, hi);
        left.fork();
        long r = right.compute() + left.join();
        return r;
    }
}

RateLimiter(Guava 扩展)与限流思路

java
RateLimiter limiter = RateLimiter.create(100.0); // 每秒100个许可
limiter.acquire(); // 获取1个许可
  • 限流常与队列/拒绝策略/降级结合,保障系统稳态。

下一步

掌握了并发工具类后,可以继续学习:


💡 提示:并发工具类提供了更高级的并发控制,合理使用可以提高程序性能