找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1422

积分

0

好友

204

主题
发表于 6 天前 | 查看: 14| 回复: 0

在高并发与多线程应用中,协调不同线程之间的执行顺序和资源共享至关重要。java.util.concurrent(JUC)包提供了一系列强大的线程协作工具,它们基于底层并发框架构建,极大地简化了并发编程模型。本文将深入解析CountDownLatchCyclicBarrierPhaserSemaphoreExchanger这五大核心工具类的原理、使用场景及最佳实践。

CountDownLatch(JDK 5+):倒计时门闩

CountDownLatch 用于实现“一个或多个线程等待其他一组线程完成操作”的场景。它通过一个计数器进行同步,初始化时设定计数值,线程调用countDown()使计数减一,而调用await()的线程会阻塞直到计数器归零。

核心 API 与使用示例

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CountDownLatch latch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 正在执行任务...");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown(); // 任务完成,计数减一
                }
            }).start();
        }

        latch.await(); // 主线程等待所有任务完成
        System.out.println("所有任务已完成,主线程继续执行!");
    }
}

典型应用场景

  • 服务启动检查:主线程等待数据库、缓存等核心组件全部初始化完成后,再启动对外服务。
  • 并行任务汇总:多个子任务并发执行,主任务需等待所有子任务完成后才能进行结果合并。

注意事项与原理

  1. 不可重用:计数器归零后,该CountDownLatch实例的使命就结束了,如需重用,请考虑CyclicBarrier
  2. 防死锁:确保所有线程最终都会调用countDown(),通常应将其置于finally块中。
  3. 可中断await()方法会响应中断。
  4. 实现原理:其核心基于AQS(AbstractQueuedSynchronizer)框架。内部状态state代表剩余计数值。await()对应获取共享锁,countDown()对应释放共享锁。想要深入理解Java并发锁机制,可以参考网络与系统底层原理相关的知识。

CyclicBarrier(JDK 5+):循环屏障

CyclicBarrier 让一组线程互相等待,直到所有线程都到达一个公共屏障点后,才能继续执行。它支持一个可选的“屏障动作”(Runnable),在所有线程到达后执行,并且可以重置后重复使用。

核心 API 与使用示例

public class CyclicBarrierExample {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(3, () -> {
            System.out.println("所有线程已到达屏障,执行汇总操作!");
        });

        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 第一阶段完成");
                    barrier.await(); // 等待其他线程
                    System.out.println(Thread.currentThread().getName() + " 第二阶段开始");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

典型应用场景

  • 多阶段计算:如并行计算中,每个线程完成一个阶段计算后,在屏障点同步并交换数据,然后进入下一阶段。
  • 模拟并发:在压力测试中,使用它让所有请求线程准备就绪后同时发起。

注意事项与原理

  1. 可重用:当所有等待线程被释放后,屏障会自动重置,可迎接下一轮同步。
  2. 屏障损坏:若等待线程被中断、超时或reset()被调用,屏障会进入“损坏”状态,后续调用await()将抛出BrokenBarrierException
  3. 实现原理:其内部使用ReentrantLockCondition实现,而非AQS。线程调用await()时在Condition上等待,最后一个线程到达后执行屏障动作并唤醒所有等待线程。

CyclicBarrier 与 CountDownLatch 对比

特性 CountDownLatch CyclicBarrier
计数方向 减到0 加到指定值
是否可重用
主要参与者 两类角色(等待线程、计数线程) 一类角色(所有线程相互等待)
核心方法 await(), countDown() await()

Phaser(JDK 7+):灵活的阶段同步器

Phaser 是功能更为强大的同步器,可以看作是CyclicBarrier的增强版。它支持动态注册/注销参与者多阶段(Phase)同步以及分层(Tiering) 结构,适用于复杂的多阶段任务协调。

核心概念与使用示例

public class PhaserExample {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(1); // 主线程注册自己,用于控制阶段

        for (int i = 0; i < 3; i++) {
            phaser.register(); // 为每个工作线程注册
            new Thread(() -> {
                for (int phase = 0; phase < 3; phase++) {
                    System.out.println(Thread.currentThread().getName() + " 完成阶段 " + phase);
                    // 到达并等待本阶段其他参与者
                    phaser.arriveAndAwaitAdvance();
                }
                // 任务结束,注销自己
                phaser.arriveAndDeregister();
            }).start();
        }

        // 主线程解除注册,并等待所有阶段完成
        phaser.arriveAndDeregister();
        while (!phaser.isTerminated()) {
            Thread.yield();
        }
        System.out.println("所有阶段已完成!");
    }
}

高级特性:分层结构

当参与者数量巨大时,单一的Phaser可能成为性能瓶颈。Phaser支持构建树状父子结构,子Phaser向父Phaser注册,组内线程先在子Phaser同步,再由子Phaser的代表与父Phaser同步,从而减少锁竞争。

Phaser 与 CyclicBarrier 对比

特性 CyclicBarrier Phaser
动态调整 参与者数量固定 支持register(), arriveAndDeregister()
阶段控制 隐式循环 显式阶段号(Phase),可获取当前阶段
分层支持 不支持 支持父子结构,适合大规模并发
终止机制 损坏后不可用 参与者数为0时自动终止,可查询状态

Semaphore(JDK 5+):信号量

Semaphore 用于控制同时访问特定资源的线程数量,它通过维护一组“许可”(permits)来实现流量控制或资源池化。

核心 API 与使用示例

public class SemaphoreExample {
    private static final Semaphore semaphore = new Semaphore(2); // 最多允许2个线程并发访问

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 获取许可
                    System.out.println(Thread.currentThread().getName() + " 获取许可,正在执行...");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphore.release(); // 释放许可
                    System.out.println(Thread.currentThread().getName() + " 释放许可");
                }
            }).start();
        }
    }
}

典型应用场景

  • 资源池管理:如数据库连接池,限制最大连接数。
  • 流量控制:对高并发接口进行限流,例如每秒最多处理N个请求。
  • 互斥锁:当许可数为1时,Semaphore可退化为一个互斥锁。

注意事项与原理

  1. 公平性:构造函数可指定是否为公平模式,公平模式下按申请顺序获取许可。
  2. 非阻塞获取tryAcquire()方法尝试获取许可,失败立即返回,不会阻塞。
  3. 实现原理:同样基于AQS框架。state值表示可用许可数。acquire()是获取共享锁,release()是释放共享锁。

Exchanger(JDK 5+):数据交换器

Exchanger 提供了一个同步点,用于配对的两个线程之间交换数据。每个线程在到达交换点后,调用exchange(V x)方法,将自己的数据传入,并阻塞直到配对线程也调用了此方法,然后接收对方的数据并返回。

使用示例

public class ExchangerExample {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        new Thread(() -> {
            try {
                String data = "Thread-A 的数据";
                System.out.println("A 准备交换: " + data);
                String received = exchanger.exchange(data);
                System.out.println("A 收到: " + received);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();

        new Thread(() -> {
            try {
                Thread.sleep(1000);
                String data = "Thread-B 的数据";
                System.out.println("B 准备交换: " + data);
                String received = exchanger.exchange(data);
                System.out.println("B 收到: " + received);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

典型应用场景

  • 双缓冲技术:一个线程填充缓冲区A,另一个线程填充缓冲区B,填充完成后两者交换缓冲区,实现高效的读写分离。
  • 遗传算法:配对线程交换染色体数据进行交叉操作。

注意事项

  • 仅支持两两配对,如果三个线程调用exchange(),后到的线程将因为没有配对线程而一直等待。
  • 其底层实现采用了无锁的CAS操作和slot(槽位)机制,以实现高效的数据交换。

总结与选型建议

工具类 核心目的 是否可重用 适用场景
CountDownLatch 一等多 启动准备、并行任务汇总
CyclicBarrier 多等多 分阶段并行计算、多线程测试
Phaser 动态多阶段同步 复杂的、参与者可变的多阶段任务(如工作流)
Semaphore 控制并发数 限流、资源池(如连接池)
Exchanger 两线程交换数据 双缓冲、配对线程数据交换

在选择时,应首先明确同步需求:是等待完成(CountDownLatch)、阶段同步(CyclicBarrier/Phaser)、控制并发(Semaphore)还是数据交换(Exchanger)。对于复杂的、参与者动态变化的多阶段任务,Phaser是最佳选择。这些工具都是构建健壮、高效的Java高并发应用的基础构件,深入理解其原理和使用场景,能帮助开发者更好地驾驭并发编程。




上一篇:CentOS 7.9 部署 Keepalived 实现 Nginx 高可用集群配置
下一篇:云服务器部署Docker与K8s:主流Linux发行版兼容性与选型指南
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-24 20:53 , Processed in 0.417021 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表