在 Flink 的旧线程模型中,开发者需要依赖 checkpointLock 来隔离不同线程对任务内部状态的并发修改。这种方式要求代码中出现大量的 synchronize(lock) 语句,不仅不利于阅读和调试,也容易因锁的误用而导致线程安全问题。
为了解决这些问题,Flink 社区引入了基于 Mailbox 的线程模型。其核心思想是通过单线程配合阻塞队列来处理所有事件。这样,对内部状态的修改都由同一个线程串行完成,从而避免了复杂的锁竞争。在旧模型中,checkpointLock 主要用于保护三个关键区域:
- 事件处理:包括数据事件(event)、水位线(watermark)、屏障(barrier)的处理与发送。
- 检查点:包括 Checkpoint 的触发和完成通知。
- 处理时间定时器:ProcessTime 定时器的回调通常会涉及状态修改。
而在 Mailbox 模型中,所有需要处理的事件都被封装成 Mail 投递到邮箱队列中,由指定的单线程按顺序取出并执行。
相关定义与核心类
下面我们深入 Mailbox 的具体实现,首先来了解几个核心的类与其作用。

我们来逐个剖析这些类的定义和职责。
Mail
Mail 是线程模型中最基础的单元,它封装了需要执行的动作。无论是 Checkpoint 的触发还是 ProcessTime 定时器的触发,最终都会转化为一个 Mail。Mail 包含以下关键属性:
// 选项,包括两个选项:isUrgent 和 deferrable
private final MailOptionsImpl mailOptions;
// 要执行的动作
private final ThrowingRunnable<? extends Exception> runnable;
// 优先级,这里的优先级不决定执行顺序,而是避免上下游之间的死锁问题
private final int priority;
// 描述信息
private final String descriptionFormat;
private final Object[] descriptionArgs;
// 用于执行 runnable 的执行器
private final StreamTaskActionExecutor actionExecutor;
TaskMailbox
有了 Mail,就需要一个容器来存储它,这就是 TaskMailbox。其具体实现类 TaskMailboxImpl 内部维护了一个 Deque 队列。当需要执行时,再从队列中取出 Mail。
// 内部对于 queue 和 state 的并发访问都需要被这个锁保护
private final ReentrantLock lock = new ReentrantLock();
// 实际存储 Mail 的队列
@GuardedBy("lock")
private final Deque<Mail> queue = new ArrayDeque<>();
// 与 lock 关联的 Condition,主要用于队列从空变为非空时唤醒等待获取 Mail 的线程
@GuardedBy("lock")
private final Condition notEmpty = lock.newCondition();
// 状态,包括 OPEN/QUIESCED/CLOSED
@GuardedBy("lock")
private State state = OPEN;
// 指定的邮箱线程的引用
@Nonnull
private final Thread taskMailboxThread;
// 用于性能优化的设计
private final Deque<Mail> batch = new ArrayDeque<>();
// queue队列是否为空,用于性能优化,避免频繁访问主队列
private volatile boolean hasNewMail = false;
// 是否有紧急邮件,同样用于性能优化,减少检查队列中是否有紧急邮件的次数
private volatile boolean hasNewUrgentMail = false;
从属性可以看出,TaskMailbox 底层使用 ArrayDeque 存储 Mail,并通过 lock 保护对队列和状态(state)的并发访问。状态分为三种:
- OPEN:邮箱正常工作,可以接收和取出 Mail。
- QUIESCED:静默状态,不再接收新的 Mail,但已有的 Mail 仍可被取出处理。
- CLOSED:关闭状态,不能进行任何操作。
为了提升性能,TaskMailbox 内部还设计了 batch 队列。处理 Mail 时,会先将一批 Mail 从主队列(queue)转移到 batch 队列,之后优先从 batch 中获取。这减少了直接访问主队列的次数,有效缓解了锁竞争压力。
MailboxProcessor
MailboxProcessor 可以看作是整个 Mailbox 模型的核心控制器。它的核心方法是运行一个事件循环,在这个循环中不断从 TaskMailbox 中取出 Mail 执行,并在没有 Mail 时执行一个默认动作(MailboxDefaultAction),例如处理输入数据。
此外,MailboxProcessor 对外提供了 MailboxExecutor,其他组件可以通过它来提交事件,实现与主线程的通信。这种设计巧妙地将并发控制内部化,对外提供清晰的执行接口,是理解 Flink 线程模型 并发机制的关键。
MailboxExecutor
MailboxExecutor 的实现类是 MailboxExecutorImpl,其主要职责是向 TaskMailbox 投递 Mail。它的核心属性包括:
// 实际存储的 mailbox 实例
@Nonnull
private final TaskMailbox mailbox;
// 优先级,MailboxExecutor 提供的默认优先级,提交 mail 时会带上这个字段
private final int priority;
// 执行器,运行 mail 的动作
private final StreamTaskActionExecutor actionExecutor;
// 执行 MailboxProcessor,主要用于 isIdle 方法
private final MailboxProcessor mailboxProcessor;
MailboxExecutor 的核心方法是 execute,它可以在任意线程中被调用,因为内部的 mailbox 已控制了并发。
public void execute(
MailOptions mailOptions,
final ThrowingRunnable<? extends Exception> command,
final String descriptionFormat,
final Object... descriptionArgs) {
try {
mailbox.put(
new Mail(
mailOptions,
command,
priority,
actionExecutor,
descriptionFormat,
descriptionArgs));
} catch (MailboxClosedException mbex) {
throw new RejectedExecutionException(mbex);
}
}
除了 execute,MailboxExecutor 还有一个重要的 yield 方法。
public void yield() throws InterruptedException {
Mail mail = mailbox.take(priority);
try {
mail.run();
} catch (Exception ex) {
throw WrappingRuntimeException.wrapIfNecessary(ex);
}
}
yield 方法的主要目的是让出当前事件的处理权,这有两个原因:
- 如果不考虑优先级,Mailbox 队列是 FIFO 的。如果当前事件的处理依赖于后续事件的完成,可能会造成逻辑上的“死锁”。
- 当前事件处理耗时过长,会阻塞其他事件的执行。让出执行权可以让相同或更高优先级的事件得到处理机会。
请注意:yield 方法只能由 mailbox 线程自身调用。Flink 也提供了非阻塞版本的 tryYield 方法。
执行流程剖析
主循环流程
在创建 StreamTask 时,会初始化 mailboxProcessor,并获取一个 mainMailboxExecutor。
new TaskMailboxImpl(Thread.currentThread()));
...
this.mailboxProcessor =
new MailboxProcessor(
this::processInput, mailbox, actionExecutor, mailboxMetricsControl);
...
this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor();
这里将 processInput 方法作为默认动作(MailboxDefaultAction)传入了 MailboxProcessor。当 StreamTask 启动时,会调用 MailboxProcessor 的核心方法 runMailboxLoop。
public final void invoke() throws Exception {
// ... 省略初始化与检查代码
runMailboxLoop();
// ... 省略收尾代码
}
public void runMailboxLoop() throws Exception {
mailboxProcessor.runMailboxLoop();
}
runMailboxLoop 的核心逻辑是一个 while 循环,循环中处理 Mail 并执行默认动作。
public void runMailboxLoop() throws Exception {
suspended = !mailboxLoopRunning;
final TaskMailbox localMailbox = mailbox;
checkState(
localMailbox.isMailboxThread(),
"Method must be executed by declared mailbox thread!");
assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";
final MailboxController mailboxController = new MailboxController(this);
while (isNextLoopPossible()) {
// The blocking `processMail` call will not return until default action is available.
processMail(localMailbox, false);
if (isNextLoopPossible()) {
mailboxDefaultAction.runDefaultAction(
mailboxController); // lock is acquired inside default action as needed
}
}
}
private boolean isNextLoopPossible() {
// 'Suspended' can be false only when 'mailboxLoopRunning' is true.
return !suspended;
}
代码首先进行前置检查:确保当前线程是指定的 mailbox 线程,且 TaskMailbox 处于 OPEN 状态。接着创建 MailboxController 用于默认动作与处理器的交互。
随后进入 while (isNextLoopPossible()) 循环。循环中先调用 processMail 处理邮箱中的待执行任务,然后如果条件允许,就执行默认动作 mailboxDefaultAction.runDefaultAction,即处理数据输入。
processMail 方法负责具体的 Mail 处理:
private boolean processMail(TaskMailbox mailbox, boolean singleStep) throws Exception {
// Doing this check is an optimization to only have a volatile read in the expected hot
// path, locks are only acquired after this point.
boolean isBatchAvailable = mailbox.createBatch();
// Take mails in a non-blockingly and execute them.
boolean processed = isBatchAvailable && processMailsNonBlocking(singleStep);
if (singleStep) {
return processed;
}
// If the default action is currently not available, we can run a blocking mailbox execution
// until the default action becomes available again.
processed |= processMailsWhenDefaultActionUnavailable();
return processed;
}
它先尝试创建 batch,然后非阻塞地处理这批 Mail(processMailsNonBlocking)。如果默认动作暂时不可用,则会进入 processMailsWhenDefaultActionUnavailable 方法,该方法会阻塞等待,直到有新的 Mail 到达或默认动作恢复可用。
processMailsNonBlocking 方法循环从 batch 中取 Mail 并执行:
private boolean processMailsNonBlocking(boolean singleStep) throws Exception {
long processedMails = 0;
Optional<Mail> maybeMail;
while (isNextLoopPossible() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent()) {
if (processedMails++ == 0) {
maybePauseIdleTimer();
}
runMail(maybeMail.get());
if (singleStep) {
break;
}
}
if (processedMails > 0) {
maybeRestartIdleTimer();
return true;
} else {
return false;
}
}
private void runMail(Mail mail) throws Exception {
mailboxMetricsControl.getMailCounter().inc();
mail.run();
// ... 省略性能度量相关代码
}
最终,runMail 方法调用了 mail.run() 来执行 Mail 中封装的具体动作。
当默认动作可用时,就会执行它,也就是 Stream.processInput,这里是实际处理流记录(StreamRecord)的地方。
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
DataInputStatus status = inputProcessor.processInput();
switch (status) {
case MORE_AVAILABLE:
if (taskIsAvailable()) {
return;
}
break;
case NOTHING_AVAILABLE:
break;
case END_OF_RECOVERY:
throw new IllegalStateException("We should not receive this event here.");
case STOPPED:
endData(StopMode.NO_DRAIN);
return;
case END_OF_DATA:
endData(StopMode.DRAIN);
notifyEndOfData();
return;
case END_OF_INPUT:
// Suspend the mailbox processor...
controller.suspendDefaultAction();
mailboxProcessor.suspend();
return;
}
// ...
}
当状态为 MORE_AVAILABLE 时,表示还有数据可立即处理,若任务可用则直接返回,准备处理下一个数据。当状态为 END_OF_INPUT 时,表示所有输入已结束,此时会暂停默认动作和邮箱处理器的事件循环。
Checkpoint 触发流程
Checkpoint 的触发是通过调用 Stream.triggerCheckpointAsync 方法实现的。
public CompletableFuture<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
checkForcedFullSnapshotSupport(checkpointOptions);
MailboxExecutor.MailOptions mailOptions =
CheckpointOptions.AlignmentType.UNALIGNED == checkpointOptions.getAlignment()
? MailboxExecutor.MailOptions.urgent()
: MailboxExecutor.MailOptions.options();
CompletableFuture<Boolean> result = new CompletableFuture<>();
mainMailboxExecutor.execute(
mailOptions,
() -> {
try {
boolean noUnfinishedInputGates =
Arrays.stream(getEnvironment().getAllInputGates())
.allMatch(InputGate::isFinished);
if (noUnfinishedInputGates) {
result.complete(
triggerCheckpointAsyncInMailbox(
checkpointMetaData, checkpointOptions));
} else {
result.complete(
triggerUnfinishedChannelsCheckpoint(
checkpointMetaData, checkpointOptions));
}
} catch (Exception ex) {
// Report the failure both via the Future result but also to the mailbox
result.completeExceptionally(ex);
throw ex;
}
},
"checkpoint %s with %s",
checkpointMetaData,
checkpointOptions);
return result;
}
可以看到,它通过 mainMailboxExecutor.execute 方法向 Mailbox 提交了一个 Mail。如果是非对齐检查点(UNALIGNED),则会使用紧急(urgent)选项。Checkpoint 完成的通知也是通过 Mailbox 执行的,并且通常会提交一个更高优先级的操作。
private Future<Void> notifyCheckpointOperation(
RunnableWithException runnable, String description) {
CompletableFuture<Void> result = new CompletableFuture<>();
mailboxProcessor
.getMailboxExecutor(TaskMailbox.MAX_PRIORITY)
.execute(
() -> {
try {
runnable.run();
} catch (Exception ex) {
result.completeExceptionally(ex);
throw ex;
}
result.complete(null);
},
description);
return result;
}
总结
本文梳理了 Apache Flink 中 Mailbox 线程模型的核心源码。该模型通过将事件封装为 Mail,并由单线程顺序处理,极大地简化了并发控制逻辑,替代了旧模型中繁琐的 checkpointLock。我们首先了解了 Mail、TaskMailbox、MailboxProcessor、MailboxExecutor 这几个核心类的设计与作用,然后详细剖析了事件处理的主循环流程以及 Checkpoint 等关键操作的触发机制。理解这套模型,对于深入掌握 Flink 的任务执行与并发控制原理至关重要,也是进行更高级 源码分析 和性能调优的基础。