找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

567

积分

0

好友

49

主题
发表于 昨天 17:58 | 查看: 3| 回复: 0

多个线程之间的协作,如同团队中的配合,需要一套清晰的规则和信号机制。在日常开发中,我们经常会遇到需要多个线程协同工作的场景,例如主线程需要等待几个工作线程都完成初始化后才能继续执行,或者需要控制同时访问某个资源的线程数量。

Java并发包中提供了三个强大的同步工具类:CountDownLatch、CyclicBarrier和Semaphore,它们就像是线程世界的交通信号灯,让混乱的并发执行变得有序可控。

1 CountDownLatch:倒计时门闩,等待多线程完成的利器

CountDownLatch可以理解为一个倒计时器,它允许一个或多个线程等待其他线程完成操作后再继续执行。这就像团队组织旅游,导游要等到所有游客都集合完毕后才能出发前往下一个景点。

1.1 核心概念与API

CountDownLatch的工作原理基于一个计数器。创建CountDownLatch时需指定一个正整数作为初始计数值,每当有线程完成自己的任务时,计数器减1。当计数器值为0时,表示所有线程已完成任务,此时等待的线程可以被唤醒继续执行。

其主要API包括:

  • CountDownLatch(int count):构造方法,初始化计数器值
  • await():等待计数器归零,会阻塞当前线程
  • await(long timeout, TimeUnit unit):带超时的等待
  • countDown():计数器减1
  • getCount():获取当前计数器的值

1.2 实战案例:多数据源加载

假设我们需要从多个数据源加载数据,只有所有数据都加载完成后,才能进行数据整合和展示:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class DataLoadingExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建CountDownLatch,计数器设置为3(对应三个数据源)
        CountDownLatch latch = new CountDownLatch(3);
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // 记录开始时间
        long start = System.currentTimeMillis();

        // 模拟从三个不同数据源加载数据
        executor.submit(() -> loadDataFromSource("数据库", 3000, latch));
        executor.submit(() -> loadDataFromSource("Redis", 2000, latch));
        executor.submit(() -> loadDataFromSource("API", 1500, latch));

        System.out.println("等待所有数据源加载完成...");
        // 等待所有数据源加载完成
        latch.await();

        // 记录完成时间
        long end = System.currentTimeMillis();
        System.out.println("所有数据源加载完成,总耗时: " + (end - start) + "ms");
        System.out.println("开始处理整合的数据...");

        executor.shutdown();
    }

    private static void loadDataFromSource(String source, int sleepTime, CountDownLatch latch) {
        try {
            System.out.println("开始从" + source + "加载数据...");
            // 模拟耗时操作
            Thread.sleep(sleepTime);
            System.out.println(source + "数据加载完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 完成后计数器减1
            latch.countDown();
            System.out.println(source + "数据源加载完成,通知CountDownLatch计数器减1,当前计数:" + latch.getCount());
        }
    }
}

执行结果可以看到,尽管三个数据源的加载时间不同,但主线程会等待所有数据源都加载完成后才继续执行。

1.3 原理解析

CountDownLatch内部基于AQS(AbstractQueuedSynchronizer)实现,使用AQS的共享模式。当创建一个CountDownLatch实例时,传入的初始计数值会被保存在AQS的state变量中:

  • await()方法会先检查state是否为0,如果是则继续执行,否则将当前线程加入等待队列。
  • countDown()方法会将state值减1,当减到0时,会唤醒所有在await()上等待的线程。

1.4 注意事项

  • 计数器不能重置:CountDownLatch的计数器无法重置,一旦计数到0,就不能再用了。
  • 只能等待一次性事件:适合等待一次性事件,不适合周期性重复的场景。
  • 注意处理中断异常await()方法会抛出InterruptedException。
  • 可能导致永久等待:如果某个任务没有正确调用countDown(),可能导致等待线程永远阻塞。

2 CyclicBarrier:循环屏障,多阶段任务协调的得力助手

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续运行。这就像几个朋友约定一起吃饭,必须所有人都到齐后才会开始点餐。

2.1 核心概念与API

与CountDownLatch不同,CyclicBarrier可以重复使用,特别适合分阶段任务的场景。主要API包括:

  • CyclicBarrier(int parties):创建一个屏障,等待指定数量的线程。
  • CyclicBarrier(int parties, Runnable barrierAction):创建一个屏障,并在所有线程到达时执行barrierAction。
  • await():等待所有线程到达屏障点。
  • await(long timeout, TimeUnit unit):带超时的等待。
  • reset():重置屏障。
  • getNumberWaiting():获取当前在屏障处等待的线程数。
  • isBroken():查询屏障是否被破坏。

2.2 实战案例:多阶段计算任务

假设我们有一个复杂的计算任务,可以分成三个阶段,每个阶段都需要多个线程协同计算,只有当所有线程完成当前阶段后,才能一起进入下一阶段:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MultiStageCalculation {
    public static void main(String[] args) {
        int workerCount = 3;

        // 创建CyclicBarrier,所有线程到达屏障时会执行指定的操作
        CyclicBarrier barrier = new CyclicBarrier(workerCount, () -> {
            System.out.println("======= 所有线程完成当前阶段,准备进入下一阶段 =======");
        });

        ExecutorService executor = Executors.newFixedThreadPool(workerCount);

        for (int i = 0; i < workerCount; i++) {
            final int workerId = i;
            executor.submit(() -> {
                try {
                    // 第一阶段:数据准备
                    System.out.println("工作线程" + workerId + "开始准备数据...");
                    Thread.sleep(1000 + (int)(Math.random() * 1000));
                    System.out.println("工作线程" + workerId + "完成数据准备");
                    barrier.await(); // 等待所有线程完成数据准备

                    // 第二阶段:数据计算
                    System.out.println("工作线程" + workerId + "开始数据计算...");
                    Thread.sleep(2000 + (int)(Math.random() * 1000));
                    System.out.println("工作线程" + workerId + "完成数据计算");
                    barrier.await(); // 等待所有线程完成数据计算

                    // 第三阶段:结果汇总
                    System.out.println("工作线程" + workerId + "开始汇总结果...");
                    Thread.sleep(1000 + (int)(Math.random() * 1000));
                    System.out.println("工作线程" + workerId + "完成结果汇总");
                    barrier.await(); // 等待所有线程完成结果汇总

                    System.out.println("工作线程" + workerId + "所有任务完成!");
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }
}

在这个例子中,三个工作线程需要协同完成三个阶段的任务。每个阶段完成后,线程都会在屏障处等待其他线程,只有当所有线程都到达屏障点后,才会一起进入下一阶段。

2.3 原理解析

CyclicBarrier基于ReentrantLock和Condition实现,与CountDownLatch直接使用AQS不同:

  • 内部使用ReentrantLock和Condition实现线程同步。
  • 维护计数器,记录还未到达屏障的线程数。
  • 当线程调用await()时,计数器减1。
  • 如果计数器不为0,当前线程进入等待状态。
  • 当最后一个线程到达屏障点,计数器变为0。
  • 执行屏障动作(如果有)。
  • 重置计数器为初始值。
  • 唤醒所有等待的线程。

2.4 注意事项

  • 可以重复使用:与CountDownLatch不同,CyclicBarrier可以通过自动重置或手动调用reset()方法重置。
  • 必须所有线程都调用await():如果有线程没有调用await(),可能导致其他线程永久等待。
  • 处理中断和超时情况await()方法会抛出InterruptedException和BrokenBarrierException。
  • 注意屏障破坏:当有线程中断或超时,屏障会被破坏(broken),需要调用reset()重置。

3 Semaphore:信号量,控制并发访问的流量警察

Semaphore(信号量)用来控制同时访问特定资源的线程数量,通过协调各个线程以保证合理使用公共资源。可以把Semaphore想象成一个停车场的入口系统:停车场只有有限的车位(许可),当车位已满时,新来的车辆需要等待。

3.1 核心概念与API

Semaphore维护了一个许可证集合,线程在访问资源前必须获取许可,用完后释放。主要API包括:

  • Semaphore(int permits):创建指定许可数的信号量,默认非公平模式。
  • Semaphore(int permits, boolean fair):创建信号量,可指定是否公平。
  • acquire():获取一个许可,如果没有可用的许可则阻塞。
  • acquire(int permits):获取指定数量的许可。
  • tryAcquire():尝试获取许可,立即返回成功或失败。
  • tryAcquire(long timeout, TimeUnit unit):带超时的尝试获取。
  • release():释放一个许可。
  • release(int permits):释放指定数量的许可。
  • availablePermits():返回当前可用的许可数。

3.2 实战案例:数据库连接池

假设我们要设计一个简单的数据库连接池,限制同时活动的连接数,防止连接数过多导致数据库压力过大:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

public class DatabaseConnectionPool {
    // 连接池大小
    private final int MAX_CONNECTIONS = 10;
    // 信号量,控制允许的并发连接数
    private final Semaphore semaphore;
    // 连接列表
    private final List<JdbcConnection> connectionList = new ArrayList<>();
    // 当前已创建的连接数
    private final AtomicInteger createdConnections = new AtomicInteger(0);

    public DatabaseConnectionPool() {
        // 创建信号量,设置最大许可数为连接池大小
        this.semaphore = new Semaphore(MAX_CONNECTIONS);
    }

    // 获取数据库连接
    public JdbcConnection getConnection() throws InterruptedException {
        // 获取许可
        semaphore.acquire();
        // 获取或创建连接
        return getOrCreateConnection();
    }

    // 释放连接
    public void releaseConnection(JdbcConnection connection) {
        if (connection != null) {
            // 将连接放回池中
            returnConnectionToPool(connection);
            // 释放许可
            semaphore.release();
            System.out.println("释放连接: " + connection.getId() +
                               ", 释放许可后当前可用许可: " + semaphore.availablePermits());
        }
    }

    // 获取或创建连接(简化示例,实际需要同步控制)
    private synchronized JdbcConnection getOrCreateConnection() {
        // 首先尝试从池中获取可用连接
        for (JdbcConnection conn : connectionList) {
            if (!conn.isInUse()) {
                conn.setInUse(true);
                System.out.println("复用连接: " + conn.getId() +
                                   ", 当前可用许可: " + semaphore.availablePermits());
                return conn;
            }
        }

        // 如果没有可用连接且未达到最大连接数,则创建新连接
        if (createdConnections.get() < MAX_CONNECTIONS) {
            JdbcConnection newConn = createNewConnection();
            connectionList.add(newConn);
            System.out.println("创建新连接: " + newConn.getId() +
                               ", 当前可用许可: " + semaphore.availablePermits());
            return newConn;
        }

        // 这种情况理论上不会发生,因为Semaphore控制了并发量
        throw new IllegalStateException("无法获取连接");
    }

    // 创建新连接
    private JdbcConnection createNewConnection() {
        int id = createdConnections.incrementAndGet();
        // 模拟创建JDBC连接
        return new JdbcConnection(id);
    }

    // 将连接放回池中
    private void returnConnectionToPool(JdbcConnection connection) {
        connection.setInUse(false);
    }

    // 模拟数据库连接类
    public static class JdbcConnection {
        private final int id;
        private boolean inUse;

        public JdbcConnection(int id) {
            this.id = id;
            this.inUse = true;
        }

        public int getId() { return id; }
        public boolean isInUse() { return inUse; }
        public void setInUse(boolean inUse) { this.inUse = inUse; }

        // 模拟执行SQL
        public void executeQuery(String sql) {
            System.out.println("连接" + id + "执行SQL: " + sql);
        }
    }
}

在这个连接池实现中,Semaphore用于控制同时获取数据库连接的线程数量,防止过多的连接导致数据库压力过大。

3.3 其他方法

Semaphore还提供了一些有用的方法:

  • int availablePermits():返回此信号量中当前可用的许可证数。
  • int getQueueLength():返回正在等待获取许可证的线程数。
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证。
  • void reducePermits(int reduction):减少reduction个许可证(protected方法)。
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合(protected方法)。

3.4 公平性与性能优化

Semaphore有公平和非公平两种模式:

  • 非公平模式:通常具有更高的吞吐量,因为它允许更多的线程尝试获取许可。
  • 公平模式:保证先请求的线程先获得许可,避免线程饥饿。

在实际应用中,需要根据具体的场景和需求来选择。如果对执行顺序有严格要求,应选择公平模式;如果追求高吞吐量,非公平模式可能是更好的选择。

4 三大同步工具对比与选型指南

了解了这三个工具后,我们来总结一下它们的特点和适用场景。

4.1 核心区别对比

特性 CountDownLatch CyclicBarrier Semaphore
是否可重用 不可重用,一次性 可重用,自动/手动重置 可重用,许可可重复获取释放
主要用途 等待一个或多个事件完成 多个线程相互等待到屏障点 控制同时访问资源的线程数
计数器变化 递减,到0释放等待线程 递增到设定值释放所有线程 许可数获取时减少,释放时增加
底层实现 基于AQS共享模式 基于ReentrantLock和Condition 基于AQS共享模式

4.2 选型指南

  • 选择CountDownLatch当:需要等待一个或多个事件完成后再继续执行,且事件是一次性的。
  • 选择CyclicBarrier当:多个线程需要相互等待,到达一个公共屏障点后才能继续执行,特别是多阶段任务。
  • 选择Semaphore当:需要控制同时访问特定资源的线程数量,进行流量控制。

5 总结

CountDownLatch、CyclicBarrier和Semaphore是Java并发编程中不可或缺的三个同步工具类,它们分别解决了不同的线程协调问题。

  • CountDownLatch像是旅行团的导游,要等到所有游客(线程)都完成某个动作(如集合)后,才能继续下一步行程。
  • CyclicBarrier像是一群朋友约定吃饭,必须所有人都到齐后才会开始点餐,而且这种约定可以多次进行(多阶段任务)。
  • Semaphore像是停车场的入口系统,只有有空车位(许可)时才会允许车辆进入,车位满时需要等待。

在实际开发中,根据具体场景选择合适的同步工具,可以大大简化并发编程的复杂度,提高程序的可靠性和性能。




上一篇:Bili.Copilot:Windows原生B站客户端,集成AI助手总结与对话视频内容
下一篇:Python设计模式实战:单一职责原则SRP核心解析与代码重构指南
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-11 04:57 , Processed in 0.084620 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表