找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1709

积分

1

好友

242

主题
发表于 昨天 22:58 | 查看: 2| 回复: 0

虚拟线程简介

JDK21 正式引入了虚拟线程(Virtual Threads),这是由 JVM 管理的轻量级线程。它与传统的平台线程(Platform Threads)有本质区别:

  • 平台线程:由 JDK 实现,直接与操作系统内核线程 1:1 绑定。其创建、切换成本高(涉及内核态/用户态切换),且受操作系统线程数限制。
  • 虚拟线程:由 JVM 调度,多个虚拟线程可以挂载在同一个平台线程上执行(M:N 线程模型)。线程切换由 JVM 负责,无需操作系统参与,因此创建和切换成本极低,是一种“廉价”的并发单元。

创建和使用虚拟线程非常简单:

// 方式一:直接创建并启动
Thread.startVirtualThread(() -> {
    System.out.println("虚拟线程运行: " + Thread.currentThread());
});

// 方式二:创建后手动启动
Thread virtualThread = Thread.ofVirtual().unstarted(() -> {
     System.out.println("虚拟线程运行1:" + Thread.currentThread());
});
virtualThread.start();

得益于其轻量级特性,我们可以在业务中大胆使用虚拟线程来处理异步任务,而无需过多顾虑传统线程池的资源消耗问题。一个典型的应用场景是用户注册后的异步通知,例如发送短信或邮件。本文将介绍如何基于虚拟线程构建一个轻量级的事件总线(EventBus),实现发布者与订阅者的解耦。

核心代码实现

1. 定义订阅注解

首先,我们定义一个 @Subscribe 注解,用于标记事件订阅者(消费者)方法。注解中包含主题名称和可选的执行超时时间。

/**
 * 事件订阅注解
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Subscribe {
    /**
     * 订阅的topic名称
     *
     * @return topic名称
     */
    String[] topicName() default "";
    /**
     * 单个订阅者执行超时时间(秒)
     * 0 表示不限制
     */
    long timeout() default 0;
}

2. 自动注册订阅者

SpringBoot 应用启动时,我们需要自动扫描所有被 @Subscribe 注解的方法,并将它们注册到事件总线上。这可以通过实现 BeanPostProcessor 接口来完成,利用 Java 的反射机制动态调用目标方法。

/**
 * 事件订阅注解解析器
 */
@Component
public class SpringMessageBroker implements BeanPostProcessor {
    private final Event brokerEvent;
    @Lazy
    public SpringMessageBroker(BrokerEvent brokerEvent) {
        this.brokerEvent = brokerEvent;
    }
    /**
     * 扫描指定 bean 中的带有 @Subscribe 注解的方法
     */
    @Override
    public Object postProcessAfterInitialization(@Nullable Object bean, @Nullable String beanName) throws BeansException {
        //获取 bean 实际的类类型
        Class<?> type = ClassUtils.getUserClass(bean);
        ReflectionUtils.doWithMethods(type, method -> {
            //获取 @Subscribe 注解,判断是否存在
            Subscribe subscribe =
                    AnnotatedElementUtils.findMergedAnnotation(method, Subscribe.class);
            if (subscribe == null) {
                return;
            }
            //注册的Topic名称
            String[] topicName = subscribe.topicName();
            //超时时间
            long timeout = subscribe.timeout();
            //注册
            brokerEvent.register(new Topic(topicName, timeout, consumer -> {
                //通过反射调用bean method方法,并传入consumer
                ReflectionUtils.invokeMethod(method, bean, consumer);
            }));
        });
        return bean;
    }
}

3. 事件总线核心实现

事件总线接口定义了注册、发布等核心方法。

/**
 * 事件推拉模式接口
 */
public interface Event {
    void register(Topic topic);
    void publish(TopicMessage topicMessage);
    void publishWait(TopicMessage topicMessage);
}

核心实现类 BrokerEvent 管理所有订阅者,并利用虚拟线程执行任务。它提供了两种发布模式:异步的 publish 和同步等待的 publishWait

/**
 * 事件发布实现类
 */
@Slf4j
@Component
public class BrokerEvent implements Event {
    /**
     * 存储所有主题对应的订阅者列表
     */
    private final Map<String, List<Topic>> consumers = new ConcurrentHashMap<>();
    @Override
    public void register(Topic topic) {
        // 添加注册的主题事件
        if (topic == null || topic.getName().length == 0) {
            log.error("subscribe 订阅主题不能为空");
            return;
        }
        // 添加该主题的所有消费者
        for (String name : topic.getName()) {
            consumers.computeIfAbsent(name, k -> new ArrayList<>()).add(topic);
        }
    }
    @Override
    public void publish(TopicMessage message) {
        // 参数校验...
        try {
            for (String topicName : message.getTopicName()) {
                // 获取该主题的所有消费者
                List<Topic> topicConsumers = consumers.get(topicName);
                if (CollectionUtils.isEmpty(topicConsumers)) {
                    return;
                }
                // 为每个消费者启动一个虚拟线程异步执行
                topicConsumers.forEach(consumer -> {
                    Thread.startVirtualThread(() -> {
                        consumer.getConsumer().accept(message.getPayload());
                    });
                });
            }
        } catch (Exception e) {
            log.error("publish 发布消息异常:{}", e.getMessage(), e);
        }
    }
    @Override
    public void publishWait(TopicMessage topicMessage) {
        // 创建一个结构化任务作用域,确保所有任务都执行完成
        try (var scope = new StructuredTaskScope<Void>()) {
            for (String topicName : topicMessage.getTopicName()) {
                List<Topic> topicConsumers = consumers.get(topicName);
                if (CollectionUtils.isEmpty(topicConsumers)) {
                    continue;
                }
                // 使用虚拟线程为每个消费者启动独立任务
                for (Topic consumer : topicConsumers) {
                    scope.fork(() -> {
                        invokeWithTimeout(consumer, topicMessage);
                        return null;
                    });
                }
            }
            // 阻塞等待所有任务结束
            scope.join();
        } catch (Exception e) {
            log.error("publishWait 发布消息异常:{}", e.getMessage(), e);
        }
    }
    /**
     * 带有超时控制的订阅者执行逻辑
     */
    private void invokeWithTimeout(Topic consumer, TopicMessage topicMessage) {
        long timeout = consumer.getTimeoutSeconds();
        if (timeout <= 0) {
            safeInvoke(consumer, topicMessage);
            return;
        }
        Instant deadline = Instant.now().plusSeconds(timeout);
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            scope.fork(() -> {
                safeInvoke(consumer, topicMessage);
                return null;
            });
            try {
                scope.joinUntil(deadline);
            } catch (TimeoutException e) {
                log.warn("订阅者 {} 执行超时({}s)", Arrays.toString(consumer.getName()), timeout);
                scope.shutdown();
                return;
            }
            scope.throwIfFailed();
        } catch (Exception e) {
            log.error("订阅者 [{}] 执行异常", Arrays.toString(consumer.getName()), e);
        }
    }
    private void safeInvoke(Topic topic, TopicMessage msg) {
        try {
            topic.getConsumer().accept(msg.getPayload());
        } catch (Throwable t) {
            log.error("订阅者 [{}] 处理失败", Arrays.toString(topic.getName()), t);
        }
    }
}

4. 业务层使用示例

定义服务类,其中的方法使用 @Subscribe 注解声明为特定主题的订阅者。

@Service
public class TestServiceImpl implements TestService {
    @Override
    @Subscribe(topicName = {"sendEmail"}, timeout = 3)
    public void sendEmail(UserInfo userInfo) {
        System.out.println("通过Service服务发送邮件服务响应:" + userInfo);
    }
    @Override
    @Subscribe(topicName = "sendSms")
    public void sendSms(UserInfo userInfo) {
        System.out.println("通过Service服务发送短信服务响应:" + userInfo);
    }
}

在控制器中,业务逻辑执行完毕后,通过事件总线发布事件,触发异步通知。

@RestController
@RequestMapping("/test")
public class TestController {
    @Resource
    private BrokerEvent brokerEvent;
    @PostMapping("/userRegister")
    public String test(@RequestBody UserInfo userInfo) {
        // 保存用户信息
        System.out.println("保存用户信息:" + userInfo);
        // 发布事件,异步执行发送邮件和短信
        brokerEvent.publish(new TopicMessage(Arrays.asList("sendEmail", "sendSms"),
                userInfo));
        return "用户注册成功!";
    }
}

执行效果:请求接口后,发送邮件和短信的任务通过虚拟线程异步执行,不影响主流程的响应。

事件总线执行动图

高级用法与配置

一对多与多对一订阅

本事件总线支持灵活的订阅关系:

  1. 多个方法订阅同一主题:不同的业务方法可以使用相同的 topicName(如 sendInfo),发布该主题时所有方法都会被执行。
  2. 单个方法订阅多个主题:一个方法可以关联多个主题名。发布其中任何一个主题,该方法都会被执行。如果一次发布多个关联主题,该方法会被执行多次。
// 示例:一个方法订阅多个主题
@Override
@Subscribe(topicName = {"sendSms","sendSms2","sendSms3"})
public void sendSms(UserInfo userInfo) {
    System.out.println("通过Service服务发送短信服务响应:" + userInfo);
}

超时控制详解

@Subscribe 注解中的 timeout 参数主要用于 publishWait 方法。它限制的是单个订阅者方法的执行时间,而非所有任务的总时间。

当使用 brokerEvent.publishWait(...) 发布事件时,如果某个订阅者方法(例如一个包含远程调用的复杂业务)的执行时间超过预设的 timeoutJVM 会尝试中断该虚拟线程。这意味着超时点之后的代码可能不会被执行。例如:

public void sendEmail(User user) {
    saveLog(); // 执行
    callRemoteService(); // 可能在此处阻塞并超时
    updateDatabase(); // 超时后将不会执行
}

因此,在业务中应用超时控制时,需要仔细考虑其影响,并进行充分测试。

总结

本文演示了如何利用 JDK21 的虚拟线程特性,构建一个与 SpringBoot 集成的轻量级事件总线。该方案实现了发布/订阅模式,能有效解耦业务逻辑,并凭借虚拟线程的轻量级优势,轻松处理高并发下的异步任务。开发者可以根据实际业务需求,灵活选用异步发布 (publish) 或同步等待发布 (publishWait) 模式。

项目源码地址:https://gitee.com/QinXianZhong/eventbus.git




上一篇:一名软件工程师的困惑与幼稚
下一篇:Kafka消费积压排查实战:高并发场景下的线上故障定位与优化
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-24 17:08 , Processed in 0.227549 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表