底层并发是构建高性能、高可靠Java应用的核心内功。理解其运作机制,能帮助开发者避免常见的并发陷阱,设计出更健壮的系统。
一、线程:Java并发的基本单位
1.1 什么是线程?
线程是程序执行的最小单位,可以类比为进程(一个应用程序)中独立工作的子任务。多个线程共享进程的内存和资源,但拥有独立的执行路径。
// 创建线程的两种方式
// 方式一:继承Thread类
class MyThread extends Thread {
@Override
public void run() {
System.out.println(“线程运行中...”);
}
}
// 方式二:实现Runnable接口
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println(“Runnable线程运行中...”);
}
}
public class ThreadDemo {
public static void main(String[] args) {
// 方式一使用
MyThread thread1 = new MyThread();
thread1.start();
// 方式二使用
Thread thread2 = new Thread(new MyRunnable());
thread2.start();
}
}
为什么推荐实现Runnable接口?
- 避免单继承的限制(Java是单继承)
- 资源共享更方便
- 更符合面向对象的设计思想
1.2 最佳线程数:别贪多!
这是一个需要权衡的问题,通常遵循以下经验法则:
公式参考:最佳线程数 ≈ CPU核心数 × (1 + 等待时间/计算时间)
实践中更常用的经验值为:
- CPU密集型任务(计算为主):线程数 ≈ CPU核心数
- I/O密集型任务(等待为主):线程数 ≈ CPU核心数 × (1~2) 或更高
// 获取CPU核心数
int cpuCores = Runtime.getRuntime().availableProcessors();
System.out.println(“CPU核心数: “ + cpuCores);
反例:盲目创建大量线程
// 错误示范:创建1000个线程同时运行
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
// 做一些简单计算
int result = 1 + 1;
}).start();
}
问题:线程创建和上下文切换的开销远大于计算本身,可能导致性能严重下降。
1.3 我可以创建多少线程?
理论极限受限于以下因素:
- 操作系统限制:例如Linux默认每个进程约1024个线程。
- 内存限制:每个线程需要独立的栈内存(默认约1MB)。
- CPU限制:线程过多会导致频繁的上下文切换,消耗大量CPU资源。
// 测试最大线程数(谨慎运行!)
public class MaxThreadTest {
public static void main(String[] args) {
int count = 0;
try {
while (true) {
new Thread(() -> {
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
// 忽略
}
}).start();
count++;
System.out.println(“已创建线程数: “ + count);
}
} catch (OutOfMemoryError e) {
System.out.println(“最终创建线程数: “ + count);
}
}
}
二、异常捕获:别让异常“溜走”
线程内未捕获的异常会直接导致线程终止,且默认不会通知主线程。
2.1 错误的异常处理
// 错误示范:异常会“消失“
Thread thread = new Thread(() -> {
throw new RuntimeException(“线程异常了!”);
});
thread.start();
// 主线程继续执行,完全不知道子线程挂了
Thread.sleep(1000);
System.out.println(“主线程还在运行...”);
2.2 正确的异常捕获方式
方式一:设置UncaughtExceptionHandler
Thread thread = new Thread(() -> {
throw new RuntimeException(“线程异常了!”);
});
// 设置异常处理器
thread.setUncaughtExceptionHandler((t, e) -> {
System.out.println(“线程 “ + t.getName() + “ 抛出异常: “ + e.getMessage());
});
thread.start();
方式二:使用Future(推荐)
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(() -> {
throw new RuntimeException(“任务异常了!”);
});
try {
future.get(); // 这里会抛出ExecutionException
} catch (ExecutionException e) {
System.out.println(“捕获到异常: “ + e.getCause().getMessage());
} finally {
executor.shutdown();
}
三、共享资源与资源竞争
3.1 资源竞争:并发程序的典型问题
当多个线程同时读写同一个共享资源时,就会发生资源竞争,导致数据不一致。这是多线程环境下必须解决的核心问题。
class Counter {
private int count = 0;
public void increment() {
count++; // 这里存在竞态条件!
}
public int getCount() {
return count;
}
}
public class RaceConditionDemo {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
counter.increment();
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
counter.increment();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
// 结果不一定是20000!
System.out.println(“最终计数: “ + counter.getCount());
}
}
为什么count++不是原子操作?
它包含三个步骤:1. 读取count值;2. 将值加1;3. 写回新值。这三个步骤可能被其他线程打断。
3.2 解决资源竞争
方法一:synchronized关键字
class SafeCounter {
private int count = 0;
// 方法同步
public synchronized void increment() {
count++;
}
// 代码块同步
public void increment2() {
synchronized (this) {
count++;
}
}
}
方法二:使用Atomic原子类
import java.util.concurrent.atomic.AtomicInteger;
class AtomicCounter {
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet(); // 原子操作
}
public int getCount() {
return count.get();
}
}
3.3 复合操作的同步问题
class EvenProducer {
private int currentEven = 0;
// 非同步版本 – 有问题!
public int next() {
currentEven++;
currentEven++;
return currentEven;
}
// 同步版本 – 正确!
public synchronized int nextSync() {
currentEven++;
currentEven++;
return currentEven;
}
}
问题分析:非同步版本的两次递增操作可能被其他线程打断,导致方法返回奇数,违背其产生偶数的设计初衷。
四、volatile关键字:可见性与有序性的保证
4.1 字分裂(Word Tearing)
在32位JVM上,对long和double(64位)的读写可能不是原子的,因为它可能需要两次32位操作。使用volatile可以保证其读写操作的原子性。
class WordTearing {
private volatile long value = 0L; // 添加volatile保证原子读写
public void setValue(long v) {
value = v;
}
public long getValue() {
return value;
}
}
4.2 可见性(Visibility)
没有volatile修饰,一个线程对变量的修改可能对其他线程不可见,因为变量可能被缓存在CPU寄存器或各级缓存中。
class VisibilityProblem {
private boolean flag = false; // 没有volatile!
public void writer() {
flag = true; // 步骤1
}
public void reader() {
while (!flag) { // 可能永远循环,读不到flag的变化!
// 空循环
}
System.out.println(“Flag is true now!”);
}
}
解决方案:为flag添加volatile修饰:private volatile boolean flag = false;。
4.3 禁止指令重排序(Reordering)
编译器和处理器为了优化性能可能对指令进行重排序。volatile通过内存屏障(Memory Barrier)来禁止这种重排,保证有序性。
class ReorderingExample {
private int x = 0;
private int y = 0;
private volatile boolean ready = false;
public void writer() {
x = 42; // 步骤1
y = 42; // 步骤2
ready = true; // 步骤3 – volatile写,建立happens-before关系
}
public void reader() {
if (ready) { // volatile读
// 由于happens-before规则,这里看到的x和y一定都是42
System.out.println(“x: “ + x + “, y: “ + y);
}
}
}
4.4 何时使用volatile?
适用场景:
- 状态标志位(如开关控制)。
- 一次性安全发布(如Double-Checked Locking模式)。
- 独立观察:定期发布的、多个线程只读的变量。
不适用场景:
- 需要原子性的复合操作(如
i++)。
- 多个变量之间存在约束关系,需要一起原子更新的情况。
五、原子性(Atomicity)
5.1 volatile无法保证复合原子性
class SequenceGenerator {
private volatile int value = 0;
// 这个方法是线程不安全的!
public int getNext() {
return value++; // 即使value是volatile,这个操作也不是原子的!
}
}
正确做法:使用原子类
import java.util.concurrent.atomic.AtomicInteger;
class SafeSequenceGenerator {
private AtomicInteger value = new AtomicInteger(0);
public int getNext() {
return value.getAndIncrement(); // 原子操作!
}
}
5.2 Java并发原子类
Java并发包提供了丰富的原子类,用于解决特定类型的原子操作:
AtomicInteger, AtomicLong
AtomicBoolean
AtomicReference
AtomicIntegerArray等
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
class AtomicDemo {
// 原子整数
private AtomicInteger counter = new AtomicInteger(0);
// 原子引用
private AtomicReference<String> latestMessage = new AtomicReference<>(“”);
public void updateCounter() {
// 经典的CAS(Compare-And-Swap)操作循环
int oldValue, newValue;
do {
oldValue = counter.get();
newValue = oldValue + 1;
} while (!counter.compareAndSet(oldValue, newValue));
}
public void updateMessage(String msg) {
latestMessage.set(msg);
}
}
六、临界区与锁优化
6.1 使用专用锁对象
class SeparateLockObject {
private final Object lock = new Object(); // 专用锁对象
private int count = 0;
public void increment() {
synchronized (lock) { // 使用专门的锁对象,而非this
count++;
}
}
}
优点:细化锁粒度,减少不同方法间因使用同一把锁(如this)而导致的不必要阻塞,提升并发性能。
6.2 使用显式Lock对象
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.TimeUnit;
class ExplicitLockDemo {
private final Lock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock(); // 获取锁
try {
count++;
} finally {
lock.unlock(); // 必须在finally块中释放锁!
}
}
// 尝试获取锁(带超时)
public boolean tryIncrement() {
boolean acquired = false;
try {
acquired = lock.tryLock(1, TimeUnit.SECONDS);
if (acquired) {
count++;
return true;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
} finally {
if (acquired) {
lock.unlock();
}
}
return false;
}
}
ReentrantLock vs synchronized: |
特性 |
synchronized |
ReentrantLock |
| 可重入 |
✓ |
✓ |
| 公平锁 |
✗ |
可选 |
| 尝试获取锁 |
✗ |
✓ (tryLock) |
| 可中断获取 |
✗ |
✓ (lockInterruptibly) |
| 超时机制 |
✗ |
✓ |
| 条件变量 |
✗ |
✓ (newCondition) |
七、并发库组件应用
7.1 延迟队列(DelayQueue)
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
class DelayedTask implements Delayed {
private final String name;
private final long startTime;
public DelayedTask(String name, long delayInMillis) {
this.name = name;
this.startTime = System.currentTimeMillis() + delayInMillis;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
return Long.compare(this.startTime, ((DelayedTask) other).startTime);
}
@Override
public String toString() {
return name;
}
}
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedTask> queue = new DelayQueue<>();
// 添加延迟任务
queue.put(new DelayedTask(“Task1”, 3000)); // 3秒后到期
queue.put(new DelayedTask(“Task2”, 1000)); // 1秒后到期
// 按延迟时间顺序取出,take()会阻塞直到有元素到期
while (!queue.isEmpty()) {
DelayedTask task = queue.take();
System.out.println(“执行任务: “ + task);
}
}
}
使用场景:缓存过期清理、定时任务调度、游戏技能冷却等。
7.2 优先级阻塞队列(PriorityBlockingQueue)
import java.util.concurrent.PriorityBlockingQueue;
class PriorityTask implements Comparable<PriorityTask> {
private final String name;
private final int priority;
public PriorityTask(String name, int priority) {
this.name = name;
this.priority = priority;
}
@Override
public int compareTo(PriorityTask other) {
// 数值小的优先级高(先出队)
return Integer.compare(this.priority, other.priority);
}
@Override
public String toString() {
return name + “ (优先级: “ + priority + “)”;
}
}
public class PriorityQueueDemo {
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<PriorityTask> queue = new PriorityBlockingQueue<>();
// 添加任务(乱序)
queue.put(new PriorityTask(“普通任务”, 3));
queue.put(new PriorityTask(“紧急任务”, 1));
queue.put(new PriorityTask(“中等任务”, 2));
// 按优先级顺序取出
while (!queue.isEmpty()) {
PriorityTask task = queue.take();
System.out.println(“处理: “ + task);
}
}
}
7.3 并发集合(Concurrent Collections)
Java提供了多种线程安全的并发集合,其中很多采用了无锁或细粒度锁技术来实现高性能。
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
public class ConcurrentCollectionsDemo {
public static void main(String[] args) {
// 1. ConcurrentHashMap – 分段锁(JDK8后改为CAS+synchronized)
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put(“key1”, 1);
map.putIfAbsent(“key1”, 2); // 原子操作
// 2. ConcurrentLinkedQueue – 无锁(CAS)实现的并发队列
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer(“item1”);
queue.poll();
// 3. CopyOnWriteArrayList – 写时复制,适合读多写少场景
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add(“element”);
// 遍历时安全,但遍历的是迭代器创建时的数组快照
// 4. 使用AtomicReference实现简单的无锁栈(Treiber Stack)
AtomicReference<Node<String>> stackTop = new AtomicReference<>();
// 无锁push操作
Node<String> newHead = new Node<>(“data”);
Node<String> oldHead;
do {
oldHead = stackTop.get();
newHead.next = oldHead;
} while (!stackTop.compareAndSet(oldHead, newHead));
}
static class Node<T> {
T data;
Node<T> next;
Node(T data) {
this.data = data;
}
}
}
无锁/细粒度锁集合的优点:
- 高并发性:减少了锁竞争,吞吐量更高。
- 避免死锁:无锁算法本身不会产生死锁。
- 更好的扩展性:随着线程数增加,性能衰减较平缓。
需要注意的缺点:
- 实现复杂。
- 存在ABA问题(部分原子类如
AtomicStampedReference已解决)。
- 可能消耗更多内存(如
CopyOnWriteArrayList)。
八、总结
掌握Java并发底层原理是构建稳健高并发应用的基石。核心要点回顾:
- 线程管理:线程数量需根据任务类型(CPU/IO密集型)合理设置,并非越多越好。
- 异常处理:务必为工作线程设置未捕获异常处理器,或使用
Future/ExecutorService来捕获异常。
- 共享数据保护:正确使用
synchronized、显式Lock及原子类来保护共享资源,避免竞态条件。
- 内存可见性:
volatile关键字解决了变量修改的可见性与防止指令重排序,但不保证复合操作的原子性。
- 原子操作:对于
i++这类“读-改-写”复合操作,应使用AtomicInteger等原子类。
- 锁粒度:尽量减小临界区范围,使用专用锁对象,以提高并发度。
- 善用工具:优先使用
DelayQueue、ConcurrentHashMap等成熟的并发容器,而非自己重复实现。