找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2297

积分

0

好友

331

主题
发表于 16 小时前 | 查看: 2| 回复: 0

在 Flink 的旧线程模型中,开发者需要依赖 checkpointLock 来隔离不同线程对任务内部状态的并发修改。这种方式要求代码中出现大量的 synchronize(lock) 语句,不仅不利于阅读和调试,也容易因锁的误用而导致线程安全问题。

为了解决这些问题,Flink 社区引入了基于 Mailbox 的线程模型。其核心思想是通过单线程配合阻塞队列来处理所有事件。这样,对内部状态的修改都由同一个线程串行完成,从而避免了复杂的锁竞争。在旧模型中,checkpointLock 主要用于保护三个关键区域:

  • 事件处理:包括数据事件(event)、水位线(watermark)、屏障(barrier)的处理与发送。
  • 检查点:包括 Checkpoint 的触发和完成通知。
  • 处理时间定时器:ProcessTime 定时器的回调通常会涉及状态修改。

而在 Mailbox 模型中,所有需要处理的事件都被封装成 Mail 投递到邮箱队列中,由指定的单线程按顺序取出并执行。

相关定义与核心类

下面我们深入 Mailbox 的具体实现,首先来了解几个核心的类与其作用。

Mailbox线程模型核心类图

我们来逐个剖析这些类的定义和职责。

Mail

Mail 是线程模型中最基础的单元,它封装了需要执行的动作。无论是 Checkpoint 的触发还是 ProcessTime 定时器的触发,最终都会转化为一个 Mail。Mail 包含以下关键属性:

// 选项,包括两个选项:isUrgent 和 deferrable
private final MailOptionsImpl mailOptions;

// 要执行的动作
private final ThrowingRunnable<? extends Exception> runnable;

// 优先级,这里的优先级不决定执行顺序,而是避免上下游之间的死锁问题
private final int priority;

// 描述信息
private final String descriptionFormat;
private final Object[] descriptionArgs;

// 用于执行 runnable 的执行器
private final StreamTaskActionExecutor actionExecutor;

TaskMailbox

有了 Mail,就需要一个容器来存储它,这就是 TaskMailbox。其具体实现类 TaskMailboxImpl 内部维护了一个 Deque 队列。当需要执行时,再从队列中取出 Mail。

// 内部对于 queue 和 state 的并发访问都需要被这个锁保护
private final ReentrantLock lock = new ReentrantLock();

// 实际存储 Mail 的队列
@GuardedBy("lock")
private final Deque<Mail> queue = new ArrayDeque<>();

// 与 lock 关联的 Condition,主要用于队列从空变为非空时唤醒等待获取 Mail 的线程
@GuardedBy("lock")
private final Condition notEmpty = lock.newCondition();

// 状态,包括 OPEN/QUIESCED/CLOSED
@GuardedBy("lock")
private State state = OPEN;

// 指定的邮箱线程的引用
@Nonnull
private final Thread taskMailboxThread;

// 用于性能优化的设计
private final Deque<Mail> batch = new ArrayDeque<>();

// queue队列是否为空,用于性能优化,避免频繁访问主队列
private volatile boolean hasNewMail = false;

// 是否有紧急邮件,同样用于性能优化,减少检查队列中是否有紧急邮件的次数
private volatile boolean hasNewUrgentMail = false;

从属性可以看出,TaskMailbox 底层使用 ArrayDeque 存储 Mail,并通过 lock 保护对队列和状态(state)的并发访问。状态分为三种:

  • OPEN:邮箱正常工作,可以接收和取出 Mail。
  • QUIESCED:静默状态,不再接收新的 Mail,但已有的 Mail 仍可被取出处理。
  • CLOSED:关闭状态,不能进行任何操作。

为了提升性能,TaskMailbox 内部还设计了 batch 队列。处理 Mail 时,会先将一批 Mail 从主队列(queue)转移到 batch 队列,之后优先从 batch 中获取。这减少了直接访问主队列的次数,有效缓解了锁竞争压力。

MailboxProcessor

MailboxProcessor 可以看作是整个 Mailbox 模型的核心控制器。它的核心方法是运行一个事件循环,在这个循环中不断从 TaskMailbox 中取出 Mail 执行,并在没有 Mail 时执行一个默认动作(MailboxDefaultAction),例如处理输入数据。

此外,MailboxProcessor 对外提供了 MailboxExecutor,其他组件可以通过它来提交事件,实现与主线程的通信。这种设计巧妙地将并发控制内部化,对外提供清晰的执行接口,是理解 Flink 线程模型 并发机制的关键。

MailboxExecutor

MailboxExecutor 的实现类是 MailboxExecutorImpl,其主要职责是向 TaskMailbox 投递 Mail。它的核心属性包括:

// 实际存储的 mailbox 实例
@Nonnull
private final TaskMailbox mailbox;

// 优先级,MailboxExecutor 提供的默认优先级,提交 mail 时会带上这个字段
private final int priority;

// 执行器,运行 mail 的动作
private final StreamTaskActionExecutor actionExecutor;

// 执行 MailboxProcessor,主要用于 isIdle 方法
private final MailboxProcessor mailboxProcessor;

MailboxExecutor 的核心方法是 execute,它可以在任意线程中被调用,因为内部的 mailbox 已控制了并发。

public void execute(
        MailOptions mailOptions,
        final ThrowingRunnable<? extends Exception> command,
        final String descriptionFormat,
        final Object... descriptionArgs) {
    try {
        mailbox.put(
                new Mail(
                        mailOptions,
                        command,
                        priority,
                        actionExecutor,
                        descriptionFormat,
                        descriptionArgs));
    } catch (MailboxClosedException mbex) {
        throw new RejectedExecutionException(mbex);
    }
}

除了 executeMailboxExecutor 还有一个重要的 yield 方法。

public void yield() throws InterruptedException {
    Mail mail = mailbox.take(priority);
    try {
        mail.run();
    } catch (Exception ex) {
        throw WrappingRuntimeException.wrapIfNecessary(ex);
    }
}

yield 方法的主要目的是让出当前事件的处理权,这有两个原因:

  1. 如果不考虑优先级,Mailbox 队列是 FIFO 的。如果当前事件的处理依赖于后续事件的完成,可能会造成逻辑上的“死锁”。
  2. 当前事件处理耗时过长,会阻塞其他事件的执行。让出执行权可以让相同或更高优先级的事件得到处理机会。
    请注意yield 方法只能由 mailbox 线程自身调用。Flink 也提供了非阻塞版本的 tryYield 方法。

执行流程剖析

主循环流程

在创建 StreamTask 时,会初始化 mailboxProcessor,并获取一个 mainMailboxExecutor

new TaskMailboxImpl(Thread.currentThread()));

...
this.mailboxProcessor =
        new MailboxProcessor(
                this::processInput, mailbox, actionExecutor, mailboxMetricsControl);

...

this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor();

这里将 processInput 方法作为默认动作(MailboxDefaultAction)传入了 MailboxProcessor。当 StreamTask 启动时,会调用 MailboxProcessor 的核心方法 runMailboxLoop

public final void invoke() throws Exception {
    // ... 省略初始化与检查代码

    runMailboxLoop();

    // ... 省略收尾代码
}

public void runMailboxLoop() throws Exception {
    mailboxProcessor.runMailboxLoop();
}

runMailboxLoop 的核心逻辑是一个 while 循环,循环中处理 Mail 并执行默认动作。

public void runMailboxLoop() throws Exception {
    suspended = !mailboxLoopRunning;

    final TaskMailbox localMailbox = mailbox;

    checkState(
            localMailbox.isMailboxThread(),
            "Method must be executed by declared mailbox thread!");

    assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";

    final MailboxController mailboxController = new MailboxController(this);

    while (isNextLoopPossible()) {
        // The blocking `processMail` call will not return until default action is available.
        processMail(localMailbox, false);
        if (isNextLoopPossible()) {
            mailboxDefaultAction.runDefaultAction(
                    mailboxController); // lock is acquired inside default action as needed
        }
    }
}

private boolean isNextLoopPossible() {
    // 'Suspended' can be false only when 'mailboxLoopRunning' is true.
    return !suspended;
}

代码首先进行前置检查:确保当前线程是指定的 mailbox 线程,且 TaskMailbox 处于 OPEN 状态。接着创建 MailboxController 用于默认动作与处理器的交互。

随后进入 while (isNextLoopPossible()) 循环。循环中先调用 processMail 处理邮箱中的待执行任务,然后如果条件允许,就执行默认动作 mailboxDefaultAction.runDefaultAction,即处理数据输入。

processMail 方法负责具体的 Mail 处理:

private boolean processMail(TaskMailbox mailbox, boolean singleStep) throws Exception {
    // Doing this check is an optimization to only have a volatile read in the expected hot
    // path, locks are only acquired after this point.
    boolean isBatchAvailable = mailbox.createBatch();

    // Take mails in a non-blockingly and execute them.
    boolean processed = isBatchAvailable && processMailsNonBlocking(singleStep);
    if (singleStep) {
        return processed;
    }

    // If the default action is currently not available, we can run a blocking mailbox execution
    // until the default action becomes available again.
    processed |= processMailsWhenDefaultActionUnavailable();

    return processed;
}

它先尝试创建 batch,然后非阻塞地处理这批 Mail(processMailsNonBlocking)。如果默认动作暂时不可用,则会进入 processMailsWhenDefaultActionUnavailable 方法,该方法会阻塞等待,直到有新的 Mail 到达或默认动作恢复可用。

processMailsNonBlocking 方法循环从 batch 中取 Mail 并执行:

private boolean processMailsNonBlocking(boolean singleStep) throws Exception {
    long processedMails = 0;
    Optional<Mail> maybeMail;

    while (isNextLoopPossible() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent()) {
        if (processedMails++ == 0) {
            maybePauseIdleTimer();
        }
        runMail(maybeMail.get());
        if (singleStep) {
            break;
        }
    }
    if (processedMails > 0) {
        maybeRestartIdleTimer();
        return true;
    } else {
        return false;
    }
}

private void runMail(Mail mail) throws Exception {
    mailboxMetricsControl.getMailCounter().inc();
    mail.run();
    // ... 省略性能度量相关代码
}

最终,runMail 方法调用了 mail.run() 来执行 Mail 中封装的具体动作。

当默认动作可用时,就会执行它,也就是 Stream.processInput,这里是实际处理流记录(StreamRecord)的地方。

protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
    DataInputStatus status = inputProcessor.processInput();
    switch (status) {
        case MORE_AVAILABLE:
            if (taskIsAvailable()) {
                return;
            }
            break;
        case NOTHING_AVAILABLE:
            break;
        case END_OF_RECOVERY:
            throw new IllegalStateException("We should not receive this event here.");
        case STOPPED:
            endData(StopMode.NO_DRAIN);
            return;
        case END_OF_DATA:
            endData(StopMode.DRAIN);
            notifyEndOfData();
            return;
        case END_OF_INPUT:
            // Suspend the mailbox processor...
            controller.suspendDefaultAction();
            mailboxProcessor.suspend();
            return;
    }
    // ...
}

当状态为 MORE_AVAILABLE 时,表示还有数据可立即处理,若任务可用则直接返回,准备处理下一个数据。当状态为 END_OF_INPUT 时,表示所有输入已结束,此时会暂停默认动作和邮箱处理器的事件循环。

Checkpoint 触发流程

Checkpoint 的触发是通过调用 Stream.triggerCheckpointAsync 方法实现的。

public CompletableFuture<Boolean> triggerCheckpointAsync(
        CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
    checkForcedFullSnapshotSupport(checkpointOptions);

    MailboxExecutor.MailOptions mailOptions =
            CheckpointOptions.AlignmentType.UNALIGNED == checkpointOptions.getAlignment()
                    ? MailboxExecutor.MailOptions.urgent()
                    : MailboxExecutor.MailOptions.options();

    CompletableFuture<Boolean> result = new CompletableFuture<>();
    mainMailboxExecutor.execute(
            mailOptions,
            () -> {
                try {
                    boolean noUnfinishedInputGates =
                            Arrays.stream(getEnvironment().getAllInputGates())
                                    .allMatch(InputGate::isFinished);

                    if (noUnfinishedInputGates) {
                        result.complete(
                                triggerCheckpointAsyncInMailbox(
                                        checkpointMetaData, checkpointOptions));
                    } else {
                        result.complete(
                                triggerUnfinishedChannelsCheckpoint(
                                        checkpointMetaData, checkpointOptions));
                    }
                } catch (Exception ex) {
                    // Report the failure both via the Future result but also to the mailbox
                    result.completeExceptionally(ex);
                    throw ex;
                }
            },
            "checkpoint %s with %s",
            checkpointMetaData,
            checkpointOptions);
    return result;
}

可以看到,它通过 mainMailboxExecutor.execute 方法向 Mailbox 提交了一个 Mail。如果是非对齐检查点(UNALIGNED),则会使用紧急(urgent)选项。Checkpoint 完成的通知也是通过 Mailbox 执行的,并且通常会提交一个更高优先级的操作。

private Future<Void> notifyCheckpointOperation(
        RunnableWithException runnable, String description) {
    CompletableFuture<Void> result = new CompletableFuture<>();
    mailboxProcessor
            .getMailboxExecutor(TaskMailbox.MAX_PRIORITY)
            .execute(
                    () -> {
                        try {
                            runnable.run();
                        } catch (Exception ex) {
                            result.completeExceptionally(ex);
                            throw ex;
                        }
                        result.complete(null);
                    },
                    description);
    return result;
}

总结

本文梳理了 Apache Flink 中 Mailbox 线程模型的核心源码。该模型通过将事件封装为 Mail,并由单线程顺序处理,极大地简化了并发控制逻辑,替代了旧模型中繁琐的 checkpointLock。我们首先了解了 MailTaskMailboxMailboxProcessorMailboxExecutor 这几个核心类的设计与作用,然后详细剖析了事件处理的主循环流程以及 Checkpoint 等关键操作的触发机制。理解这套模型,对于深入掌握 Flink 的任务执行与并发控制原理至关重要,也是进行更高级 源码分析 和性能调优的基础。




上一篇:RPC vs HTTP:微服务架构下远程调用技术选型深度解析
下一篇:Lazarus C2基础设施深度分析:从VSCode漏洞到多层加密后门
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-16 19:54 , Processed in 0.302909 second(s), 38 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表