
随着 Java Project Loom(即虚拟线程)的正式亮相,开发者社区里掀起了一场热烈的讨论:我们是该继续信赖久经考验的 Kotlin 协程,还是拥抱Java语言本身进化的成果?
为了探究这个问题,我决定用这两种技术分别构建一个相同的服务,并在模拟高负载的场景下进行严格的性能基准测试。结果有些出乎我的意料——和我最初的猜想并不一致。
任务目标:构建一个 UserProfileBuilder
我们计划创建一个服务,用于通过以下步骤构建完整的用户资料:
- 根据
id 获取用户基本信息。
- 获取用户信息后,并发获取其相关数据:
- 将以上所有数据聚合成一个完整的
UserProfile 对象。
我们将基于以下定义的接口,分别利用 Kotlin 协程 和 Java 虚拟线程 来实现构建逻辑。
Kotlin 接口定义
interface UserService {
suspend fun fetchUser(userId: Int): User
suspend fun fetchOrders(userId: Int): List<Order>
suspend fun fetchPreferences(userId: Int): Preferences
suspend fun fetchNotifications(userId: Int): List<Notification>
}
interface UserProfileBuilder {
suspend fun buildUserProfile(id: Int): UserProfile
}
Java 接口定义
interface UserService {
User fetchUser(int id);
List<Order> fetchOrders(int userId);
Preferences fetchPreferences(int userId);
List<Notification> fetchNotifications(int userId);
}
interface UserProfileBuilder {
UserProfile buildUserProfile(int id);
}
领域模型
两种实现共享相似的领域模型,只是语法上有所不同。
Kotlin 数据类
data class UserProfile(
val user: User,
val orders: List<Order>,
val preferences: Preferences,
val notifications: List<Notification>,
)
data class User(val id: Int, val name: String)
data class Order(val id: String, val item: String)
data class Preferences(val darkMode: Boolean, val language: String)
data class Notification(val id: String, val message: String)
Java Record 类
public record UserProfile(User user, List<Order> orders, Preferences preferences, List<Notification> notifications) {
}
public record User(int id, String name) {
}
public record Order(String id, String item) {
}
public record Preferences(Boolean darkMode, String language) {
}
public record Notification(String id, String message) {
}
服务实现
Kotlin 协程实现
在 Kotlin 中,模拟 I/O 操作(如 API 或数据库调用)的函数使用 suspend 关键字标记。这使得它们能够挂起而不阻塞底层线程,非常适合并发操作。
为了并行运行任务,我们使用 async 启动协程,并通过 await 非阻塞地等待结果,从而编写出高效、可扩展的代码。
class MockUserService : UserService {
override suspend fun fetchUser(userId: Int) = delayAndReturn(500) { User(userId, "User- $userId") }
override suspend fun fetchOrders(userId: Int) =
delayAndReturn(500) { listOf(Order("O1", "Item A"), Order("O2", "Item B")) }
override suspend fun fetchPreferences(userId: Int) = delayAndReturn(500) { Preferences(true, "en-US") }
override suspend fun fetchNotifications(userId: Int) =
delayAndReturn(500) { listOf(Notification("N1", "Welcome"), Notification("N2", "Discount")) }
private suspend fun <T> delayAndReturn(ms: Long, block: () -> T): T {
delay(ms)
return block()
}
}
class CoroutineUserProfileBuilder(private val userService: UserService) : UserProfileBuilder {
override suspend fun buildUserProfile(id: Int): UserProfile = coroutineScope {
val user = userService.fetchUser(id)
val orders = async { userService.fetchOrders(user.id) }
val prefs = async { userService.fetchPreferences(user.id) }
val notifications = async { userService.fetchNotifications(user.id) }
UserProfile(user, orders.await(), prefs.await(), notifications.await())
}
}
Java 虚拟线程实现
在 Java 21+ 中,我们可以使用 虚拟线程 来实现高并发,而无需将代码重写为非阻塞风格。
public class UserServiceImpl implements UserService {
@Override
public User fetchUser(int id) {
delay(500);
return new User(id, "User-%d".formatted(id));
}
@Override
public List<Order> fetchOrders(int id) {
delay(500);
return List.of(new Order("O1", "Item A"), new Order("O2", "Item B"));
}
@Override
public Preferences fetchPreferences(int id) {
delay(500);
return new Preferences(true, "en-US");
}
@Override
public List<Notification> fetchNotifications(int id) {
delay(500);
return List.of(new Notification("N1", "Welcome"), new Notification("N2", "Discount"));
}
private void delay(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
使用虚拟线程构建器有两种实现方式:
使用 ExecutorService (稳定版)
class UserProfileBuilderImpl implements UserProfileBuilder {
private final UserService userService;
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
UserProfileBuilderImpl(UserService service) {
this.userService = service;
}
@Override
public UserProfile buildUserProfile(int id) throws Exception {
var user = userService.fetchUser(id);
var orders = executor.submit(() -> userService.fetchOrders(user.id()));
var prefs = executor.submit(() -> userService.fetchPreferences(user.id()));
var notifs = executor.submit(() -> userService.fetchNotifications(user.id()));
return new UserProfile(user, orders.get(), prefs.get(), notifs.get());
}
}
使用 StructuredTaskScope (预览版)
@Override
public UserProfile buildUserProfile(int id) {
var user = userService.fetchUser(id);
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var orders = scope.fork(() -> userService.fetchOrders(user.id()));
var prefs = scope.fork(() -> userService.fetchPreferences(user.id()));
var notifs = scope.fork(() -> userService.fetchNotifications(user.id()));
scope.join(); // 等待所有任务
return new UserProfile(user, orders.get(), prefs.get(), notifs.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
基准测试设置与实现
基准测试模拟了规模递增的并发请求:1,000、10,000、100,000 乃至 1,000,000 个。我们关注两个核心指标:
- 完成所有用户资料构建所需的总体时间。
- 系统活跃的线程数量(包括总活跃线程数和非守护线程数)。
我们分别为 Kotlin 协程和 Java 虚拟线程编写了测试运行器,以模拟并发请求并测量性能。
Kotlin 基准测试代码
class BenchmarkRunner(
private val builder: UserProfileBuilder,
private val name: String,
private val threadDump: Boolean = true
) {
private val logger: Logger = LoggerFactory.getLogger(BenchmarkRunner::class.java)
suspend fun runBenchmark(userIds: List<Int>) = coroutineScope {
logger.info("🚀 Running benchmark: $name on ${userIds.size} Concurrent Users")
val start = System.nanoTime()
val userProfiles = userIds.map { id -> async {
try {
builder.buildUserProfile(id)
} catch (ex: Exception) {
logger.warn("❌ User $id failed: ${ex.message}")
null
}
}
}.awaitAll().filterNotNull()
val durationMs = (System.nanoTime() - start) / 1_000_000
logger.info("✅ Completed ${userProfiles.count()} profiles in $durationMs ms")
if (threadDump) {
val threads = Thread.getAllStackTraces().keys
logger.info("🧵 Active threads (total): ${threads.count { it.isAlive }}")
logger.info("🧵 Non-daemon threads: ${threads.count { it.isAlive && !it.isDaemon }}")
}
}
}
// 用法
suspend fun main() {
val builder = CoroutineUserProfileBuilder(MockUserService())
val runner = BenchmarkRunner(builder, "CoroutineUserProfileBuilder")
listOf(
(1..1_000),
(1..10_000),
(1..100_000),
(1..1_000_000),
).forEach { ids -> runner.runBenchmark(ids.toList()) }
}
Java 基准测试代码
public class BenchmarkRunner {
private final UserProfileBuilder builder;
private final String name;
private final boolean threadDump;
private final ExecutorService executor;
private static final Logger logger = LoggerFactory.getLogger(BenchmarkRunner.class);
public BenchmarkRunner(UserProfileBuilder builder, String name, boolean threadDump, ExecutorService executor) {
this.builder = builder;
this.name = name;
this.threadDump = threadDump;
this.executor = executor;
}
public void runBenchmark(List<Integer> userIds) {
logger.info("🚀 Running benchmark: {} on {} Concurrent Users", name, userIds.size());
long start = System.nanoTime();
List<Future<UserProfile>> futures = userIds.stream()
.map(id -> executor.submit(() -> {
try {
return builder.buildUserProfile(id);
} catch (Exception ex) {
logger.warn("❌ User {} failed: {}", id, ex.getMessage());
return null;
}
}))
.toList();
List<UserProfile> results = futures.stream()
.map(future -> {
try {
return future.get();
} catch (Exception e) {
return null;
}
})
.filter(Objects::nonNull)
.toList();
long durationMs = (System.nanoTime() - start) / 1_000_000;
logger.info("✅ Completed {} profiles in {} ms", results.size(), durationMs);
if (threadDump) {
var threads = Thread.getAllStackTraces().keySet();
logger.info("🧵 Active threads (total): {}", threads.stream().filter(Thread::isAlive).count());
logger.info("🧵 Non-daemon threads: {}", threads.stream().filter(t -> t.isAlive() && !t.isDaemon()).count());
}
}
}
// 用法
public static void main(String[] args) {
var builder = new UserProfileBuilderImpl(new UserServiceImpl());
var executor = Executors.newVirtualThreadPerTaskExecutor();
var runner = new BenchmarkRunner(builder, "Virtual Thread Profile Builder", true, executor);
Stream.of(
IntStream.rangeClosed(1, 1_000).boxed(),
IntStream.rangeClosed(1, 10_000).boxed(),
IntStream.rangeClosed(1, 100_000).boxed(),
IntStream.rangeClosed(1, 1_000_000).boxed()
).map(Stream::toList)
.forEach(runner::runBenchmark);
}
基准测试结果
以下是详细的性能对比数据:
✅ Kotlin 协程
| 请求数 |
耗时 (ms) |
线程数 (活跃 / 非守护) |
| 1,000 |
1,062 |
16 / 1 |
| 10,000 |
1,124 |
16 / 1 |
| 100,000 |
1,359 |
16 / 1 |
| 1,000,000 |
6,652 |
16 / 1 |
🧵 Java 虚拟线程 (ExecutorService)
| 请求数 |
耗时 (ms) |
线程数 (活跃 / 非守护) |
| 1,000 |
1,046 |
15 / 1 |
| 10,000 |
1,115 |
15 / 1 |
| 100,000 |
1,351 |
15 / 1 |
| 1,000,000 |
8,501 |
15 / 1 |
🧵 Java 虚拟线程 (StructuredTaskScope)
| 请求数 |
耗时 (ms) |
线程数 (活跃 / 非守护) |
| 1,000 |
1,045 |
15 / 1 |
| 10,000 |
1,109 |
15 / 1 |
| 100,000 |
1,369 |
15 / 1 |
| 1,000,000 |
15,110 |
15 / 1 |
🧠 注意:由于任务作用域管理和结构化取消机制,StructuredTaskScope 在百万级请求下引入了一些额外开销。然而,它对于需要分层任务生命周期的场景(如API请求树)更为健壮。
关于内存使用的说明
本文主要聚焦于性能和线程可扩展性,但内存使用情况同样至关重要,尤其是在高并发级别下。测量两者在堆分配、GC行为以及单请求内存开销上的差异,将是一个有价值的后续研究方向。
💡 超越并发:Kotlin 协程的异步优势
虽然本文聚焦于并发请求处理,但值得注意的是,Kotlin 协程 在异步和响应式工作流中表现更为出色。像 Flow 和 Channel 这样的功能让构建响应式流、处理背压和实现管道式数据处理变得轻而易举。这些特性使其在构建实时应用、UI交互或流式API时更具吸引力。
🧪 基准测试环境
- MacBook Air M1 (16核)
- Java 21
- Kotlin 2.0
结论
无论是 Kotlin 协程还是 Java 虚拟线程,在应对高并发 I/O密集型 场景时都展现出了令人印象深刻的扩展能力,远超许多人的预期。
然而,在极端负载(百万级请求)下,Kotlin 协程的表现要略微优于 Java 虚拟线程。说实话,这个结果有些让我意外,我原本以为 Java 原生的虚拟线程会占据上风。
技术的选择往往取决于具体的项目背景、团队技术栈和性能要求。无论选择哪种,都标志着我们在编写高效、可扩展的并发程序方面拥有了更强大的工具。如果你在 云栈社区 进行过类似的测试或有不同的见解,欢迎分享你的经验和数据。
原文链接:https://java-jedi.medium.com/i-built-the-same-service-with-kotlin-coroutines-and-java-virtual-threads-heres-what-i-found-188d8373a440?day=1