多个线程之间的协作,如同团队中的配合,需要一套清晰的规则和信号机制。在日常开发中,我们经常会遇到需要多个线程协同工作的场景,例如主线程需要等待几个工作线程都完成初始化后才能继续执行,或者需要控制同时访问某个资源的线程数量。
Java并发包中提供了三个强大的同步工具类:CountDownLatch、CyclicBarrier和Semaphore,它们就像是线程世界的交通信号灯,让混乱的并发执行变得有序可控。
1 CountDownLatch:倒计时门闩,等待多线程完成的利器
CountDownLatch可以理解为一个倒计时器,它允许一个或多个线程等待其他线程完成操作后再继续执行。这就像团队组织旅游,导游要等到所有游客都集合完毕后才能出发前往下一个景点。
1.1 核心概念与API
CountDownLatch的工作原理基于一个计数器。创建CountDownLatch时需指定一个正整数作为初始计数值,每当有线程完成自己的任务时,计数器减1。当计数器值为0时,表示所有线程已完成任务,此时等待的线程可以被唤醒继续执行。
其主要API包括:
CountDownLatch(int count):构造方法,初始化计数器值
await():等待计数器归零,会阻塞当前线程
await(long timeout, TimeUnit unit):带超时的等待
countDown():计数器减1
getCount():获取当前计数器的值
1.2 实战案例:多数据源加载
假设我们需要从多个数据源加载数据,只有所有数据都加载完成后,才能进行数据整合和展示:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DataLoadingExample {
public static void main(String[] args) throws InterruptedException {
// 创建CountDownLatch,计数器设置为3(对应三个数据源)
CountDownLatch latch = new CountDownLatch(3);
ExecutorService executor = Executors.newFixedThreadPool(3);
// 记录开始时间
long start = System.currentTimeMillis();
// 模拟从三个不同数据源加载数据
executor.submit(() -> loadDataFromSource("数据库", 3000, latch));
executor.submit(() -> loadDataFromSource("Redis", 2000, latch));
executor.submit(() -> loadDataFromSource("API", 1500, latch));
System.out.println("等待所有数据源加载完成...");
// 等待所有数据源加载完成
latch.await();
// 记录完成时间
long end = System.currentTimeMillis();
System.out.println("所有数据源加载完成,总耗时: " + (end - start) + "ms");
System.out.println("开始处理整合的数据...");
executor.shutdown();
}
private static void loadDataFromSource(String source, int sleepTime, CountDownLatch latch) {
try {
System.out.println("开始从" + source + "加载数据...");
// 模拟耗时操作
Thread.sleep(sleepTime);
System.out.println(source + "数据加载完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 完成后计数器减1
latch.countDown();
System.out.println(source + "数据源加载完成,通知CountDownLatch计数器减1,当前计数:" + latch.getCount());
}
}
}
执行结果可以看到,尽管三个数据源的加载时间不同,但主线程会等待所有数据源都加载完成后才继续执行。
1.3 原理解析
CountDownLatch内部基于AQS(AbstractQueuedSynchronizer)实现,使用AQS的共享模式。当创建一个CountDownLatch实例时,传入的初始计数值会被保存在AQS的state变量中:
await()方法会先检查state是否为0,如果是则继续执行,否则将当前线程加入等待队列。
countDown()方法会将state值减1,当减到0时,会唤醒所有在await()上等待的线程。
1.4 注意事项
- 计数器不能重置:CountDownLatch的计数器无法重置,一旦计数到0,就不能再用了。
- 只能等待一次性事件:适合等待一次性事件,不适合周期性重复的场景。
- 注意处理中断异常:
await()方法会抛出InterruptedException。
- 可能导致永久等待:如果某个任务没有正确调用
countDown(),可能导致等待线程永远阻塞。
2 CyclicBarrier:循环屏障,多阶段任务协调的得力助手
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续运行。这就像几个朋友约定一起吃饭,必须所有人都到齐后才会开始点餐。
2.1 核心概念与API
与CountDownLatch不同,CyclicBarrier可以重复使用,特别适合分阶段任务的场景。主要API包括:
CyclicBarrier(int parties):创建一个屏障,等待指定数量的线程。
CyclicBarrier(int parties, Runnable barrierAction):创建一个屏障,并在所有线程到达时执行barrierAction。
await():等待所有线程到达屏障点。
await(long timeout, TimeUnit unit):带超时的等待。
reset():重置屏障。
getNumberWaiting():获取当前在屏障处等待的线程数。
isBroken():查询屏障是否被破坏。
2.2 实战案例:多阶段计算任务
假设我们有一个复杂的计算任务,可以分成三个阶段,每个阶段都需要多个线程协同计算,只有当所有线程完成当前阶段后,才能一起进入下一阶段:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MultiStageCalculation {
public static void main(String[] args) {
int workerCount = 3;
// 创建CyclicBarrier,所有线程到达屏障时会执行指定的操作
CyclicBarrier barrier = new CyclicBarrier(workerCount, () -> {
System.out.println("======= 所有线程完成当前阶段,准备进入下一阶段 =======");
});
ExecutorService executor = Executors.newFixedThreadPool(workerCount);
for (int i = 0; i < workerCount; i++) {
final int workerId = i;
executor.submit(() -> {
try {
// 第一阶段:数据准备
System.out.println("工作线程" + workerId + "开始准备数据...");
Thread.sleep(1000 + (int)(Math.random() * 1000));
System.out.println("工作线程" + workerId + "完成数据准备");
barrier.await(); // 等待所有线程完成数据准备
// 第二阶段:数据计算
System.out.println("工作线程" + workerId + "开始数据计算...");
Thread.sleep(2000 + (int)(Math.random() * 1000));
System.out.println("工作线程" + workerId + "完成数据计算");
barrier.await(); // 等待所有线程完成数据计算
// 第三阶段:结果汇总
System.out.println("工作线程" + workerId + "开始汇总结果...");
Thread.sleep(1000 + (int)(Math.random() * 1000));
System.out.println("工作线程" + workerId + "完成结果汇总");
barrier.await(); // 等待所有线程完成结果汇总
System.out.println("工作线程" + workerId + "所有任务完成!");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
在这个例子中,三个工作线程需要协同完成三个阶段的任务。每个阶段完成后,线程都会在屏障处等待其他线程,只有当所有线程都到达屏障点后,才会一起进入下一阶段。
2.3 原理解析
CyclicBarrier基于ReentrantLock和Condition实现,与CountDownLatch直接使用AQS不同:
- 内部使用ReentrantLock和Condition实现线程同步。
- 维护计数器,记录还未到达屏障的线程数。
- 当线程调用
await()时,计数器减1。
- 如果计数器不为0,当前线程进入等待状态。
- 当最后一个线程到达屏障点,计数器变为0。
- 执行屏障动作(如果有)。
- 重置计数器为初始值。
- 唤醒所有等待的线程。
2.4 注意事项
- 可以重复使用:与CountDownLatch不同,CyclicBarrier可以通过自动重置或手动调用
reset()方法重置。
- 必须所有线程都调用await():如果有线程没有调用
await(),可能导致其他线程永久等待。
- 处理中断和超时情况:
await()方法会抛出InterruptedException和BrokenBarrierException。
- 注意屏障破坏:当有线程中断或超时,屏障会被破坏(broken),需要调用
reset()重置。
3 Semaphore:信号量,控制并发访问的流量警察
Semaphore(信号量)用来控制同时访问特定资源的线程数量,通过协调各个线程以保证合理使用公共资源。可以把Semaphore想象成一个停车场的入口系统:停车场只有有限的车位(许可),当车位已满时,新来的车辆需要等待。
3.1 核心概念与API
Semaphore维护了一个许可证集合,线程在访问资源前必须获取许可,用完后释放。主要API包括:
Semaphore(int permits):创建指定许可数的信号量,默认非公平模式。
Semaphore(int permits, boolean fair):创建信号量,可指定是否公平。
acquire():获取一个许可,如果没有可用的许可则阻塞。
acquire(int permits):获取指定数量的许可。
tryAcquire():尝试获取许可,立即返回成功或失败。
tryAcquire(long timeout, TimeUnit unit):带超时的尝试获取。
release():释放一个许可。
release(int permits):释放指定数量的许可。
availablePermits():返回当前可用的许可数。
3.2 实战案例:数据库连接池
假设我们要设计一个简单的数据库连接池,限制同时活动的连接数,防止连接数过多导致数据库压力过大:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
public class DatabaseConnectionPool {
// 连接池大小
private final int MAX_CONNECTIONS = 10;
// 信号量,控制允许的并发连接数
private final Semaphore semaphore;
// 连接列表
private final List<JdbcConnection> connectionList = new ArrayList<>();
// 当前已创建的连接数
private final AtomicInteger createdConnections = new AtomicInteger(0);
public DatabaseConnectionPool() {
// 创建信号量,设置最大许可数为连接池大小
this.semaphore = new Semaphore(MAX_CONNECTIONS);
}
// 获取数据库连接
public JdbcConnection getConnection() throws InterruptedException {
// 获取许可
semaphore.acquire();
// 获取或创建连接
return getOrCreateConnection();
}
// 释放连接
public void releaseConnection(JdbcConnection connection) {
if (connection != null) {
// 将连接放回池中
returnConnectionToPool(connection);
// 释放许可
semaphore.release();
System.out.println("释放连接: " + connection.getId() +
", 释放许可后当前可用许可: " + semaphore.availablePermits());
}
}
// 获取或创建连接(简化示例,实际需要同步控制)
private synchronized JdbcConnection getOrCreateConnection() {
// 首先尝试从池中获取可用连接
for (JdbcConnection conn : connectionList) {
if (!conn.isInUse()) {
conn.setInUse(true);
System.out.println("复用连接: " + conn.getId() +
", 当前可用许可: " + semaphore.availablePermits());
return conn;
}
}
// 如果没有可用连接且未达到最大连接数,则创建新连接
if (createdConnections.get() < MAX_CONNECTIONS) {
JdbcConnection newConn = createNewConnection();
connectionList.add(newConn);
System.out.println("创建新连接: " + newConn.getId() +
", 当前可用许可: " + semaphore.availablePermits());
return newConn;
}
// 这种情况理论上不会发生,因为Semaphore控制了并发量
throw new IllegalStateException("无法获取连接");
}
// 创建新连接
private JdbcConnection createNewConnection() {
int id = createdConnections.incrementAndGet();
// 模拟创建JDBC连接
return new JdbcConnection(id);
}
// 将连接放回池中
private void returnConnectionToPool(JdbcConnection connection) {
connection.setInUse(false);
}
// 模拟数据库连接类
public static class JdbcConnection {
private final int id;
private boolean inUse;
public JdbcConnection(int id) {
this.id = id;
this.inUse = true;
}
public int getId() { return id; }
public boolean isInUse() { return inUse; }
public void setInUse(boolean inUse) { this.inUse = inUse; }
// 模拟执行SQL
public void executeQuery(String sql) {
System.out.println("连接" + id + "执行SQL: " + sql);
}
}
}
在这个连接池实现中,Semaphore用于控制同时获取数据库连接的线程数量,防止过多的连接导致数据库压力过大。
3.3 其他方法
Semaphore还提供了一些有用的方法:
int availablePermits():返回此信号量中当前可用的许可证数。
int getQueueLength():返回正在等待获取许可证的线程数。
boolean hasQueuedThreads():是否有线程正在等待获取许可证。
void reducePermits(int reduction):减少reduction个许可证(protected方法)。
Collection getQueuedThreads():返回所有等待获取许可证的线程集合(protected方法)。
3.4 公平性与性能优化
Semaphore有公平和非公平两种模式:
- 非公平模式:通常具有更高的吞吐量,因为它允许更多的线程尝试获取许可。
- 公平模式:保证先请求的线程先获得许可,避免线程饥饿。
在实际应用中,需要根据具体的场景和需求来选择。如果对执行顺序有严格要求,应选择公平模式;如果追求高吞吐量,非公平模式可能是更好的选择。
4 三大同步工具对比与选型指南
了解了这三个工具后,我们来总结一下它们的特点和适用场景。
4.1 核心区别对比
| 特性 |
CountDownLatch |
CyclicBarrier |
Semaphore |
| 是否可重用 |
不可重用,一次性 |
可重用,自动/手动重置 |
可重用,许可可重复获取释放 |
| 主要用途 |
等待一个或多个事件完成 |
多个线程相互等待到屏障点 |
控制同时访问资源的线程数 |
| 计数器变化 |
递减,到0释放等待线程 |
递增到设定值释放所有线程 |
许可数获取时减少,释放时增加 |
| 底层实现 |
基于AQS共享模式 |
基于ReentrantLock和Condition |
基于AQS共享模式 |
4.2 选型指南
- 选择CountDownLatch当:需要等待一个或多个事件完成后再继续执行,且事件是一次性的。
- 选择CyclicBarrier当:多个线程需要相互等待,到达一个公共屏障点后才能继续执行,特别是多阶段任务。
- 选择Semaphore当:需要控制同时访问特定资源的线程数量,进行流量控制。
5 总结
CountDownLatch、CyclicBarrier和Semaphore是Java并发编程中不可或缺的三个同步工具类,它们分别解决了不同的线程协调问题。
- CountDownLatch像是旅行团的导游,要等到所有游客(线程)都完成某个动作(如集合)后,才能继续下一步行程。
- CyclicBarrier像是一群朋友约定吃饭,必须所有人都到齐后才会开始点餐,而且这种约定可以多次进行(多阶段任务)。
- Semaphore像是停车场的入口系统,只有有空车位(许可)时才会允许车辆进入,车位满时需要等待。
在实际开发中,根据具体场景选择合适的同步工具,可以大大简化并发编程的复杂度,提高程序的可靠性和性能。