找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3552

积分

0

好友

488

主题
发表于 11 小时前 | 查看: 1| 回复: 0

Kotlin 协程与 Java 虚拟线程基准测试对比图

随着 Java Project Loom(即虚拟线程)的正式亮相,开发者社区里掀起了一场热烈的讨论:我们是该继续信赖久经考验的 Kotlin 协程,还是拥抱Java语言本身进化的成果?

为了探究这个问题,我决定用这两种技术分别构建一个相同的服务,并在模拟高负载的场景下进行严格的性能基准测试。结果有些出乎我的意料——和我最初的猜想并不一致。

任务目标:构建一个 UserProfileBuilder

我们计划创建一个服务,用于通过以下步骤构建完整的用户资料:

  1. 根据 id 获取用户基本信息。
  2. 获取用户信息后,并发获取其相关数据:
    • 订单历史
    • 个人偏好设置
    • 通知消息
  3. 将以上所有数据聚合成一个完整的 UserProfile 对象。

我们将基于以下定义的接口,分别利用 Kotlin 协程Java 虚拟线程 来实现构建逻辑。

Kotlin 接口定义

interface UserService {
    suspend fun fetchUser(userId: Int): User
    suspend fun fetchOrders(userId: Int): List<Order>
    suspend fun fetchPreferences(userId: Int): Preferences
    suspend fun fetchNotifications(userId: Int): List<Notification>
}

interface UserProfileBuilder {
    suspend fun buildUserProfile(id: Int): UserProfile
}

Java 接口定义

interface UserService {
    User fetchUser(int id);
    List<Order> fetchOrders(int userId);
    Preferences fetchPreferences(int userId);
    List<Notification> fetchNotifications(int userId);
}

interface UserProfileBuilder {
    UserProfile buildUserProfile(int id);
}

领域模型

两种实现共享相似的领域模型,只是语法上有所不同。

Kotlin 数据类

data class UserProfile(
    val user: User,
    val orders: List<Order>,
    val preferences: Preferences,
    val notifications: List<Notification>,
)

data class User(val id: Int, val name: String)
data class Order(val id: String, val item: String)
data class Preferences(val darkMode: Boolean, val language: String)
data class Notification(val id: String, val message: String)

Java Record 类

public record UserProfile(User user, List<Order> orders, Preferences preferences, List<Notification> notifications) {
}
public record User(int id, String name) {
}
public record Order(String id, String item) {
}
public record Preferences(Boolean darkMode, String language) {
}
public record Notification(String id, String message) {
}

服务实现

Kotlin 协程实现

在 Kotlin 中,模拟 I/O 操作(如 API 或数据库调用)的函数使用 suspend 关键字标记。这使得它们能够挂起而不阻塞底层线程,非常适合并发操作。

为了并行运行任务,我们使用 async 启动协程,并通过 await 非阻塞地等待结果,从而编写出高效、可扩展的代码。

class MockUserService : UserService {
    override suspend fun fetchUser(userId: Int) = delayAndReturn(500) { User(userId, "User- $userId") }
    override suspend fun fetchOrders(userId: Int) =
        delayAndReturn(500) { listOf(Order("O1", "Item A"), Order("O2", "Item B")) }
    override suspend fun fetchPreferences(userId: Int) = delayAndReturn(500) { Preferences(true, "en-US") }
    override suspend fun fetchNotifications(userId: Int) =
        delayAndReturn(500) { listOf(Notification("N1", "Welcome"), Notification("N2", "Discount")) }

    private suspend fun <T> delayAndReturn(ms: Long, block: () -> T): T {
        delay(ms)
        return block()
    }
}

class CoroutineUserProfileBuilder(private val userService: UserService) : UserProfileBuilder {
    override suspend fun buildUserProfile(id: Int): UserProfile = coroutineScope {
        val user = userService.fetchUser(id)
        val orders = async { userService.fetchOrders(user.id) }
        val prefs = async { userService.fetchPreferences(user.id) }
        val notifications = async { userService.fetchNotifications(user.id) }
        UserProfile(user, orders.await(), prefs.await(), notifications.await())
    }
}

Java 虚拟线程实现

在 Java 21+ 中,我们可以使用 虚拟线程 来实现高并发,而无需将代码重写为非阻塞风格。

public class UserServiceImpl implements UserService {
    @Override
    public User fetchUser(int id) {
        delay(500);
        return new User(id, "User-%d".formatted(id));
    }

    @Override
    public List<Order> fetchOrders(int id) {
        delay(500);
        return List.of(new Order("O1", "Item A"), new Order("O2", "Item B"));
    }

    @Override
    public Preferences fetchPreferences(int id) {
        delay(500);
        return new Preferences(true, "en-US");
    }

    @Override
    public List<Notification> fetchNotifications(int id) {
        delay(500);
        return List.of(new Notification("N1", "Welcome"), new Notification("N2", "Discount"));
    }

    private void delay(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

使用虚拟线程构建器有两种实现方式:

使用 ExecutorService (稳定版)

class UserProfileBuilderImpl implements UserProfileBuilder {
    private final UserService userService;
    private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

    UserProfileBuilderImpl(UserService service) {
        this.userService = service;
    }

    @Override
    public UserProfile buildUserProfile(int id) throws Exception {
        var user = userService.fetchUser(id);
        var orders = executor.submit(() -> userService.fetchOrders(user.id()));
        var prefs = executor.submit(() -> userService.fetchPreferences(user.id()));
        var notifs = executor.submit(() -> userService.fetchNotifications(user.id()));
        return new UserProfile(user, orders.get(), prefs.get(), notifs.get());
    }
}

使用 StructuredTaskScope (预览版)

@Override
public UserProfile buildUserProfile(int id) {
    var user = userService.fetchUser(id);
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        var orders = scope.fork(() -> userService.fetchOrders(user.id()));
        var prefs = scope.fork(() -> userService.fetchPreferences(user.id()));
        var notifs = scope.fork(() -> userService.fetchNotifications(user.id()));
        scope.join(); // 等待所有任务
        return new UserProfile(user, orders.get(), prefs.get(), notifs.get());
    } catch (InterruptedException | ExecutionException e) {
        throw new RuntimeException(e);
    }
}

基准测试设置与实现

基准测试模拟了规模递增的并发请求:1,000、10,000、100,000 乃至 1,000,000 个。我们关注两个核心指标:

  • 完成所有用户资料构建所需的总体时间。
  • 系统活跃的线程数量(包括总活跃线程数和非守护线程数)。

我们分别为 Kotlin 协程和 Java 虚拟线程编写了测试运行器,以模拟并发请求并测量性能。

Kotlin 基准测试代码

class BenchmarkRunner(
    private val builder: UserProfileBuilder,
    private val name: String,
    private val threadDump: Boolean = true
) {
    private val logger: Logger = LoggerFactory.getLogger(BenchmarkRunner::class.java)

    suspend fun runBenchmark(userIds: List<Int>) = coroutineScope {
        logger.info("🚀 Running benchmark:  $name on  ${userIds.size} Concurrent Users")
        val start = System.nanoTime()
        val userProfiles = userIds.map { id -> async {
                try {
                    builder.buildUserProfile(id)
                } catch (ex: Exception) {
                    logger.warn("❌ User  $id failed:  ${ex.message}")
                    null
                }
            }
        }.awaitAll().filterNotNull()
        val durationMs = (System.nanoTime() - start) / 1_000_000
        logger.info("✅ Completed  ${userProfiles.count()} profiles in  $durationMs ms")
        if (threadDump) {
            val threads = Thread.getAllStackTraces().keys
            logger.info("🧵 Active threads (total):  ${threads.count { it.isAlive }}")
            logger.info("🧵 Non-daemon threads:  ${threads.count { it.isAlive && !it.isDaemon }}")
        }
    }
}

// 用法
suspend fun main() {
    val builder = CoroutineUserProfileBuilder(MockUserService())
    val runner = BenchmarkRunner(builder, "CoroutineUserProfileBuilder")
    listOf(
        (1..1_000),
        (1..10_000),
        (1..100_000),
        (1..1_000_000),
    ).forEach { ids -> runner.runBenchmark(ids.toList()) }
}

Java 基准测试代码

public class BenchmarkRunner {
    private final UserProfileBuilder builder;
    private final String name;
    private final boolean threadDump;
    private final ExecutorService executor;
    private static final Logger logger = LoggerFactory.getLogger(BenchmarkRunner.class);

    public BenchmarkRunner(UserProfileBuilder builder, String name, boolean threadDump, ExecutorService executor) {
        this.builder = builder;
        this.name = name;
        this.threadDump = threadDump;
        this.executor = executor;
    }

    public void runBenchmark(List<Integer> userIds) {
        logger.info("🚀 Running benchmark: {} on {} Concurrent Users", name, userIds.size());
        long start = System.nanoTime();
        List<Future<UserProfile>> futures = userIds.stream()
                .map(id -> executor.submit(() -> {
                    try {
                        return builder.buildUserProfile(id);
                    } catch (Exception ex) {
                        logger.warn("❌ User {} failed: {}", id, ex.getMessage());
                        return null;
                    }
                }))
                .toList();

        List<UserProfile> results = futures.stream()
                .map(future -> {
                    try {
                        return future.get();
                    } catch (Exception e) {
                        return null;
                    }
                })
                .filter(Objects::nonNull)
                .toList();

        long durationMs = (System.nanoTime() - start) / 1_000_000;
        logger.info("✅ Completed {} profiles in {} ms", results.size(), durationMs);

        if (threadDump) {
            var threads = Thread.getAllStackTraces().keySet();
            logger.info("🧵 Active threads (total): {}", threads.stream().filter(Thread::isAlive).count());
            logger.info("🧵 Non-daemon threads: {}", threads.stream().filter(t -> t.isAlive() && !t.isDaemon()).count());
        }
    }
}

// 用法
public static void main(String[] args) {
    var builder = new UserProfileBuilderImpl(new UserServiceImpl());
    var executor = Executors.newVirtualThreadPerTaskExecutor();
    var runner = new BenchmarkRunner(builder, "Virtual Thread Profile Builder", true, executor);
    Stream.of(
                    IntStream.rangeClosed(1, 1_000).boxed(),
                    IntStream.rangeClosed(1, 10_000).boxed(),
                    IntStream.rangeClosed(1, 100_000).boxed(),
                    IntStream.rangeClosed(1, 1_000_000).boxed()
            ).map(Stream::toList)
            .forEach(runner::runBenchmark);
}

基准测试结果

以下是详细的性能对比数据:

✅ Kotlin 协程

请求数 耗时 (ms) 线程数 (活跃 / 非守护)
1,000 1,062 16 / 1
10,000 1,124 16 / 1
100,000 1,359 16 / 1
1,000,000 6,652 16 / 1

🧵 Java 虚拟线程 (ExecutorService)

请求数 耗时 (ms) 线程数 (活跃 / 非守护)
1,000 1,046 15 / 1
10,000 1,115 15 / 1
100,000 1,351 15 / 1
1,000,000 8,501 15 / 1

🧵 Java 虚拟线程 (StructuredTaskScope)

请求数 耗时 (ms) 线程数 (活跃 / 非守护)
1,000 1,045 15 / 1
10,000 1,109 15 / 1
100,000 1,369 15 / 1
1,000,000 15,110 15 / 1

🧠 注意:由于任务作用域管理和结构化取消机制,StructuredTaskScope 在百万级请求下引入了一些额外开销。然而,它对于需要分层任务生命周期的场景(如API请求树)更为健壮。

关于内存使用的说明
本文主要聚焦于性能和线程可扩展性,但内存使用情况同样至关重要,尤其是在高并发级别下。测量两者在堆分配GC行为以及单请求内存开销上的差异,将是一个有价值的后续研究方向。

💡 超越并发:Kotlin 协程的异步优势
虽然本文聚焦于并发请求处理,但值得注意的是,Kotlin 协程 在异步和响应式工作流中表现更为出色。像 FlowChannel 这样的功能让构建响应式流、处理背压和实现管道式数据处理变得轻而易举。这些特性使其在构建实时应用、UI交互或流式API时更具吸引力。

🧪 基准测试环境

  • MacBook Air M1 (16核)
  • Java 21
  • Kotlin 2.0

结论

无论是 Kotlin 协程还是 Java 虚拟线程,在应对高并发 I/O密集型 场景时都展现出了令人印象深刻的扩展能力,远超许多人的预期。

然而,在极端负载(百万级请求)下,Kotlin 协程的表现要略微优于 Java 虚拟线程。说实话,这个结果有些让我意外,我原本以为 Java 原生的虚拟线程会占据上风。

技术的选择往往取决于具体的项目背景、团队技术栈和性能要求。无论选择哪种,都标志着我们在编写高效、可扩展的并发程序方面拥有了更强大的工具。如果你在 云栈社区 进行过类似的测试或有不同的见解,欢迎分享你的经验和数据。

原文链接:https://java-jedi.medium.com/i-built-the-same-service-with-kotlin-coroutines-and-java-virtual-threads-heres-what-i-found-188d8373a440?day=1




上一篇:firewalld防火墙配置指南:Linux服务器安全基础
下一篇:图解 Rust 生命周期:以图解与实例澄清标注的意义
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-2-26 16:41 , Processed in 0.376739 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表