虚拟线程简介
JDK21 正式引入了虚拟线程(Virtual Threads),这是由 JVM 管理的轻量级线程。它与传统的平台线程(Platform Threads)有本质区别:
- 平台线程:由 JDK 实现,直接与操作系统内核线程 1:1 绑定。其创建、切换成本高(涉及内核态/用户态切换),且受操作系统线程数限制。
- 虚拟线程:由 JVM 调度,多个虚拟线程可以挂载在同一个平台线程上执行(M:N 线程模型)。线程切换由 JVM 负责,无需操作系统参与,因此创建和切换成本极低,是一种“廉价”的并发单元。
创建和使用虚拟线程非常简单:
// 方式一:直接创建并启动
Thread.startVirtualThread(() -> {
System.out.println("虚拟线程运行: " + Thread.currentThread());
});
// 方式二:创建后手动启动
Thread virtualThread = Thread.ofVirtual().unstarted(() -> {
System.out.println("虚拟线程运行1:" + Thread.currentThread());
});
virtualThread.start();
得益于其轻量级特性,我们可以在业务中大胆使用虚拟线程来处理异步任务,而无需过多顾虑传统线程池的资源消耗问题。一个典型的应用场景是用户注册后的异步通知,例如发送短信或邮件。本文将介绍如何基于虚拟线程构建一个轻量级的事件总线(EventBus),实现发布者与订阅者的解耦。
核心代码实现
1. 定义订阅注解
首先,我们定义一个 @Subscribe 注解,用于标记事件订阅者(消费者)方法。注解中包含主题名称和可选的执行超时时间。
/**
* 事件订阅注解
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Subscribe {
/**
* 订阅的topic名称
*
* @return topic名称
*/
String[] topicName() default "";
/**
* 单个订阅者执行超时时间(秒)
* 0 表示不限制
*/
long timeout() default 0;
}
2. 自动注册订阅者
在 SpringBoot 应用启动时,我们需要自动扫描所有被 @Subscribe 注解的方法,并将它们注册到事件总线上。这可以通过实现 BeanPostProcessor 接口来完成,利用 Java 的反射机制动态调用目标方法。
/**
* 事件订阅注解解析器
*/
@Component
public class SpringMessageBroker implements BeanPostProcessor {
private final Event brokerEvent;
@Lazy
public SpringMessageBroker(BrokerEvent brokerEvent) {
this.brokerEvent = brokerEvent;
}
/**
* 扫描指定 bean 中的带有 @Subscribe 注解的方法
*/
@Override
public Object postProcessAfterInitialization(@Nullable Object bean, @Nullable String beanName) throws BeansException {
//获取 bean 实际的类类型
Class<?> type = ClassUtils.getUserClass(bean);
ReflectionUtils.doWithMethods(type, method -> {
//获取 @Subscribe 注解,判断是否存在
Subscribe subscribe =
AnnotatedElementUtils.findMergedAnnotation(method, Subscribe.class);
if (subscribe == null) {
return;
}
//注册的Topic名称
String[] topicName = subscribe.topicName();
//超时时间
long timeout = subscribe.timeout();
//注册
brokerEvent.register(new Topic(topicName, timeout, consumer -> {
//通过反射调用bean method方法,并传入consumer
ReflectionUtils.invokeMethod(method, bean, consumer);
}));
});
return bean;
}
}
3. 事件总线核心实现
事件总线接口定义了注册、发布等核心方法。
/**
* 事件推拉模式接口
*/
public interface Event {
void register(Topic topic);
void publish(TopicMessage topicMessage);
void publishWait(TopicMessage topicMessage);
}
核心实现类 BrokerEvent 管理所有订阅者,并利用虚拟线程执行任务。它提供了两种发布模式:异步的 publish 和同步等待的 publishWait。
/**
* 事件发布实现类
*/
@Slf4j
@Component
public class BrokerEvent implements Event {
/**
* 存储所有主题对应的订阅者列表
*/
private final Map<String, List<Topic>> consumers = new ConcurrentHashMap<>();
@Override
public void register(Topic topic) {
// 添加注册的主题事件
if (topic == null || topic.getName().length == 0) {
log.error("subscribe 订阅主题不能为空");
return;
}
// 添加该主题的所有消费者
for (String name : topic.getName()) {
consumers.computeIfAbsent(name, k -> new ArrayList<>()).add(topic);
}
}
@Override
public void publish(TopicMessage message) {
// 参数校验...
try {
for (String topicName : message.getTopicName()) {
// 获取该主题的所有消费者
List<Topic> topicConsumers = consumers.get(topicName);
if (CollectionUtils.isEmpty(topicConsumers)) {
return;
}
// 为每个消费者启动一个虚拟线程异步执行
topicConsumers.forEach(consumer -> {
Thread.startVirtualThread(() -> {
consumer.getConsumer().accept(message.getPayload());
});
});
}
} catch (Exception e) {
log.error("publish 发布消息异常:{}", e.getMessage(), e);
}
}
@Override
public void publishWait(TopicMessage topicMessage) {
// 创建一个结构化任务作用域,确保所有任务都执行完成
try (var scope = new StructuredTaskScope<Void>()) {
for (String topicName : topicMessage.getTopicName()) {
List<Topic> topicConsumers = consumers.get(topicName);
if (CollectionUtils.isEmpty(topicConsumers)) {
continue;
}
// 使用虚拟线程为每个消费者启动独立任务
for (Topic consumer : topicConsumers) {
scope.fork(() -> {
invokeWithTimeout(consumer, topicMessage);
return null;
});
}
}
// 阻塞等待所有任务结束
scope.join();
} catch (Exception e) {
log.error("publishWait 发布消息异常:{}", e.getMessage(), e);
}
}
/**
* 带有超时控制的订阅者执行逻辑
*/
private void invokeWithTimeout(Topic consumer, TopicMessage topicMessage) {
long timeout = consumer.getTimeoutSeconds();
if (timeout <= 0) {
safeInvoke(consumer, topicMessage);
return;
}
Instant deadline = Instant.now().plusSeconds(timeout);
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
scope.fork(() -> {
safeInvoke(consumer, topicMessage);
return null;
});
try {
scope.joinUntil(deadline);
} catch (TimeoutException e) {
log.warn("订阅者 {} 执行超时({}s)", Arrays.toString(consumer.getName()), timeout);
scope.shutdown();
return;
}
scope.throwIfFailed();
} catch (Exception e) {
log.error("订阅者 [{}] 执行异常", Arrays.toString(consumer.getName()), e);
}
}
private void safeInvoke(Topic topic, TopicMessage msg) {
try {
topic.getConsumer().accept(msg.getPayload());
} catch (Throwable t) {
log.error("订阅者 [{}] 处理失败", Arrays.toString(topic.getName()), t);
}
}
}
4. 业务层使用示例
定义服务类,其中的方法使用 @Subscribe 注解声明为特定主题的订阅者。
@Service
public class TestServiceImpl implements TestService {
@Override
@Subscribe(topicName = {"sendEmail"}, timeout = 3)
public void sendEmail(UserInfo userInfo) {
System.out.println("通过Service服务发送邮件服务响应:" + userInfo);
}
@Override
@Subscribe(topicName = "sendSms")
public void sendSms(UserInfo userInfo) {
System.out.println("通过Service服务发送短信服务响应:" + userInfo);
}
}
在控制器中,业务逻辑执行完毕后,通过事件总线发布事件,触发异步通知。
@RestController
@RequestMapping("/test")
public class TestController {
@Resource
private BrokerEvent brokerEvent;
@PostMapping("/userRegister")
public String test(@RequestBody UserInfo userInfo) {
// 保存用户信息
System.out.println("保存用户信息:" + userInfo);
// 发布事件,异步执行发送邮件和短信
brokerEvent.publish(new TopicMessage(Arrays.asList("sendEmail", "sendSms"),
userInfo));
return "用户注册成功!";
}
}
执行效果:请求接口后,发送邮件和短信的任务通过虚拟线程异步执行,不影响主流程的响应。

高级用法与配置
一对多与多对一订阅
本事件总线支持灵活的订阅关系:
- 多个方法订阅同一主题:不同的业务方法可以使用相同的
topicName(如 sendInfo),发布该主题时所有方法都会被执行。
- 单个方法订阅多个主题:一个方法可以关联多个主题名。发布其中任何一个主题,该方法都会被执行。如果一次发布多个关联主题,该方法会被执行多次。
// 示例:一个方法订阅多个主题
@Override
@Subscribe(topicName = {"sendSms","sendSms2","sendSms3"})
public void sendSms(UserInfo userInfo) {
System.out.println("通过Service服务发送短信服务响应:" + userInfo);
}
超时控制详解
@Subscribe 注解中的 timeout 参数主要用于 publishWait 方法。它限制的是单个订阅者方法的执行时间,而非所有任务的总时间。
当使用 brokerEvent.publishWait(...) 发布事件时,如果某个订阅者方法(例如一个包含远程调用的复杂业务)的执行时间超过预设的 timeout,JVM 会尝试中断该虚拟线程。这意味着超时点之后的代码可能不会被执行。例如:
public void sendEmail(User user) {
saveLog(); // 执行
callRemoteService(); // 可能在此处阻塞并超时
updateDatabase(); // 超时后将不会执行
}
因此,在业务中应用超时控制时,需要仔细考虑其影响,并进行充分测试。
总结
本文演示了如何利用 JDK21 的虚拟线程特性,构建一个与 SpringBoot 集成的轻量级事件总线。该方案实现了发布/订阅模式,能有效解耦业务逻辑,并凭借虚拟线程的轻量级优势,轻松处理高并发下的异步任务。开发者可以根据实际业务需求,灵活选用异步发布 (publish) 或同步等待发布 (publishWait) 模式。
项目源码地址:https://gitee.com/QinXianZhong/eventbus.git