找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

4719

积分

0

好友

645

主题
发表于 1 小时前 | 查看: 2| 回复: 0

你是否遇到过这样的场景?线上服务需要紧急调整某个开关,比如临时关闭某个非核心功能以应对流量高峰。传统的做法是修改配置文件,然后一台台服务器去重启。这个过程不仅耗时,更可能在重启期间引发服务中断,造成“配置改一次,服务崩一天”的窘境。

分布式动态配置中心就是为了解决这类问题而生的。它允许你在一个中心化的地方修改配置,所有订阅了该配置的应用节点都能实时、自动地更新,整个过程无需重启服务。今天,我们就来聊聊如何利用 ZooKeeper 和 Spring Boot,快速搭建一个轻量级但功能强大的动态配置中心。

动态配置中心是什么?一个生动的比喻

想象一下,你管理着10家连锁超市。每家超市门口都挂着一个“营业时间”的牌子。现在,总部决定将营业时间统一改为“24小时营业”。

  • 传统静态配置方式:你需要派人跑遍10家店,挨个更换牌子。费时费力,还可能漏掉某一家。
  • 动态配置中心方式:你只需要在总部的电脑系统里,把“营业时间”这个配置项的值改成“24小时”。一瞬间,10家超市门口的电子显示屏(或智能标牌)自动更新为新的营业时间。

映射到我们的分布式系统:10家超市就是10台应用服务器,“营业时间”就是我们需要动态调整的配置项(例如降级开关 degradeSwitch)。动态配置中心就是那个“总部系统”,而 ZooKeeper 则充当了可靠、高效的通知与存储中间件。

整体架构一览

整个方案的核心架构如下图所示,清晰地展示了从配置存储、Spring应用初始化到业务Bean使用的数据流:

基于ZooKeeper与Spring Boot的分布式动态配置中心架构图

其核心流程可以概括为以下几步:

  1. 配置存储:使用 ZooKeeper 持久化存储配置项,并利用其 Watch 机制监听节点变化。
  2. 注解驱动:自定义 @DCCValue 注解,标记哪些字段需要从动态配置中心获取值。
  3. Bean后置处理:实现 Spring 的 BeanPostProcessor 接口。在 Bean 初始化后,扫描所有带 @DCCValue 注解的字段,从 ZooKeeper 读取初始值并注入。
  4. 动态监听:使用 Curator 框架的 CuratorCache 监听 ZooKeeper 节点。当配置被修改(NODE_CHANGED 事件),通过反射机制动态更新对应 Bean 的字段值。
  5. 管理接口:提供一个 Controller 接口,用于修改 ZooKeeper 中的配置值,从而触发整个集群的自动同步。

接下来,我们深入代码,看看每一部分是如何实现的。

核心代码实现

1. 自定义注解

首先,我们定义一个注解,用来标记需要动态配置的字段。

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
@Documented
public @interface DCCValue {

    String value() default "";

}
  • value 属性的格式为 key:defaultValue,例如 degradeSwitch:open
  • 这个注解用在类的字段上,表示该字段的值应从 ZooKeeper 动态获取。

2. ZooKeeper 客户端配置

我们需要从 application.yml 读取 ZooKeeper 的连接信息。

@Data
@ConfigurationProperties(prefix = "zookeeper.sdk.config", ignoreInvalidFields = true)
public class ZookeeperClientConfigProperties {
    private String connectString;        // Zookeeper地址
    private int baseSleepTimeMs;         // 重试基础等待时间
    private int maxRetries;              // 最大重试次数
    private int sessionTimeoutMs;        // 会话超时时间
    private int connectionTimeoutMs;     // 连接超时时间
}

然后,使用这些属性来创建并配置 ZooKeeper 客户端。这里我们使用 Netflix 开源的 Curator 框架,它简化了 ZooKeeper 客户端的操作。

@Configuration
@EnableConfigurationProperties(ZookeeperClientConfigProperties.class)
public class ZooKeeperClientConfig {

    @Bean(name = "zookeeperClient")
    public CuratorFramework createWithOptions(ZookeeperClientConfigProperties properties) {
        // 重试策略:指数退避
        ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(
            properties.getBaseSleepTimeMs(),
            properties.getMaxRetries()
        );

        // 创建客户端
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(properties.getConnectString())
                .retryPolicy(backoffRetry)
                .sessionTimeoutMs(properties.getSessionTimeoutMs())
                .connectionTimeoutMs(properties.getConnectionTimeoutMs())
                .build();

        client.start();  // 启动客户端
        return client;
    }
}

3. 核心工厂类(实现动态注入与监听)

这是整个动态配置中心的“心脏”,它实现了 BeanPostProcessor 接口,负责在 Bean 初始化时注入配置值,并监听 ZooKeeper 的变更。

@Slf4j
@Configuration
public class DCCValueBeanFactory implements BeanPostProcessor {

    private static final String BASE_CONFIG_PATH = "/big-market-dcc";
    private static final String BASE_CONFIG_PATH_CONFIG = BASE_CONFIG_PATH + "/config";

    private final CuratorFramework client;

    // 记录哪个配置对应哪个Bean(路径->Bean对象)。配置变化时,知道要更新哪个Bean的哪个字段
    private final Map<String, Object> dccObjGroup = new HashMap<>();

    public DCCValueBeanFactory(CuratorFramework client) throws Exception {
        this.client = client;

        // 第1步:检查并创建基础节点
        if (null == client.checkExists().forPath(BASE_CONFIG_PATH_CONFIG)) {
            client.create().creatingParentsIfNeeded().forPath(BASE_CONFIG_PATH_CONFIG);
            log.info("DCC 节点监听 base node {} not absent create new done!", BASE_CONFIG_PATH_CONFIG);
        }

        // 第2步:创建缓存监听器,监听 /big-market-dcc/config 下的所有节点,有变化时会触发监听器回调
        CuratorCache curatorCache = CuratorCache.build(client, BASE_CONFIG_PATH_CONFIG);
        curatorCache.start();

        // 第3步:添加监听器
        curatorCache.listenable().addListener((type, oldData, data) -> {
            switch (type) {
                // 只处理 NODE_CHANGED(节点数据变化)
                case NODE_CHANGED:
                    // 获取变化的节点路径
                    String dccValuePath = data.getPath();
                    // 从缓存中找到对应的Bean
                    Object objBean = dccObjGroup.get(dccValuePath);
                    if (null == objBean) return;
                    try {
                        // 1. getDeclaredField 方法用于获取指定类中声明的所有字段,包括私有字段、受保护字段和公共字段。
                        // 2. getField 方法用于获取指定类中的公共字段,即只能获取到公共访问修饰符(public)的字段。
                        Field field = objBean.getClass().getDeclaredField(dccValuePath.substring(dccValuePath.lastIndexOf("/") + 1));
                        field.setAccessible(true);
                        // 更新字段值
                        field.set(objBean, new String(data.getData()));
                        field.setAccessible(false);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                    break;
                default:
                    break;
            }
        });
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // 获取Bean的所有字段
        Class<?> beanClass = bean.getClass();
        Field[] fields = beanClass.getDeclaredFields();
        // 遍历所有字段,找@DCCValue注解
        for (Field field : fields) {
            if (!field.isAnnotationPresent(DCCValue.class)) {
                continue;
            }

            DCCValue dccValue = field.getAnnotation(DCCValue.class);
            // 解析@DCCValue注解的值
            String value = dccValue.value();
            if (StringUtils.isBlank(value)) {
                throw new RuntimeException(field.getName() + " @DCCValue is not config value config case 「isSwitch/isSwitch:1」");
            }

            String[] splits = value.split(":");
            String key = splits[0];
            String defaultValue = splits.length == 2 ? splits[1] : null;

            try {
                // 判断当前节点是否存在,不存在则创建出 Zookeeper 节点
                String keyPath = BASE_CONFIG_PATH_CONFIG.concat("/").concat(key);
                if (null == client.checkExists().forPath(keyPath)) {
                    client.create().creatingParentsIfNeeded().forPath(keyPath);
                    // 使用默认值设置到字段
                    if (StringUtils.isNotBlank(defaultValue)) {
                        field.setAccessible(true);
                        field.set(bean, defaultValue);
                        field.setAccessible(false);
                    }
                    log.info("DCC 节点监听 创建节点 {}", keyPath);
                } else { // 节点存在,则读取Zookeeper的值
                    String configValue = new String(client.getData().forPath(keyPath));
                    if (StringUtils.isNotBlank(configValue)) {
                        field.setAccessible(true);
                        field.set(bean, configValue); // 设置Zookeeper中的值
                        field.setAccessible(false);
                        log.info("DCC 节点监听 设置配置 {} {} {}", keyPath, field.getName(), configValue);
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            // 记录到缓存
            dccObjGroup.put(BASE_CONFIG_PATH_CONFIG.concat("/").concat(key), bean);
        }
        return bean;
    }

}

4. 配置管理接口

最后,我们需要一个接口来动态修改 ZooKeeper 中的配置值,从而触发整个监听更新链条。

@Slf4j
@RestController()
@CrossOrigin("${app.config.cross-origin}")
@RequestMapping("/api/${app.config.api-version}/raffle/dcc/")
public class DCCController implements IDCCService {

    @Resource
    private CuratorFramework client;

    private static final String BASE_CONFIG_PATH = "/big-market-dcc";
    private static final String BASE_CONFIG_PATH_CONFIG = BASE_CONFIG_PATH + "/config";

    /**
     * 更新配置
     * <p>
     * curl --request GET --url 'http://localhost:8091/api/v1/raffle/dcc/update_config?key=degradeSwitch&value=close'
     */
    @RequestMapping(value = "update_config", method = RequestMethod.GET)
    @Override
    public Response<Boolean> updateConfig(@RequestParam String key, @RequestParam String value) {
        try {
            log.info("DCC 动态配置值变更开始 key:{} value:{}", key, value);
            // 拼接Zookeeper的路径
            String keyPath = BASE_CONFIG_PATH_CONFIG.concat("/").concat(key);
            // 节点不存在,创建
            if (null == client.checkExists().forPath(keyPath)) {
                client.create().creatingParentsIfNeeded().forPath(keyPath);
                log.info("DCC 节点监听 base node {} not absent create new done!", keyPath);
            }
            // 设置数据
            Stat stat = client.setData().forPath(keyPath, value.getBytes(StandardCharsets.UTF_8));
            log.info("DCC 动态配置值变更完成 key:{} value:{} time:{}", key, value, stat.getCtime());
            return Response.<Boolean>builder()
                    .code(ResponseCode.SUCCESS.getCode())
                    .info(ResponseCode.SUCCESS.getInfo())
                    .build();
        } catch (Exception e) {
            log.error("DCC 动态配置值变更失败 key:{} value:{}", key, value, e);
            return Response.<Boolean>builder()
                    .code(ResponseCode.UN_ERROR.getCode())
                    .info(ResponseCode.UN_ERROR.getInfo())
                    .build();
        }
    }

}

使用示例:动态降级开关

现在,我们可以在业务代码中轻松使用这个动态配置中心了。以下是一个抽奖活动控制器的例子:

@RestController
public class RaffleActivityController {

    @Resource
    private IActivityArmory activityArmory;

    // dcc 统一配置中心动态配置降级开关
    @DCCValue("degradeSwitch:open")
    private String degradeSwitch;

    // 在业务方法中使用
    public void someMethod() {
        if ("close".equals(degradeSwitch)) {
            // 降级逻辑
            return;
        }
        // 正常逻辑
    }
}

运作流程

  1. 应用启动时,DCCValueBeanFactory 会扫描到 RaffleActivityController 中的 degradeSwitch 字段。
  2. 它会检查 ZooKeeper/big-market-dcc/config/degradeSwitch 节点。如果节点不存在,则创建并用注解的默认值 “open” 初始化该字段;如果节点已存在,则读取其中的值来初始化字段。
  3. 当我们需要关闭抽奖功能时,只需调用 DCCControllerupdate_config 接口,将 degradeSwitch 的值更新为 “close”
  4. ZooKeeper 节点的数据变化会被 CuratorCache 监听到,DCCValueBeanFactory 中的监听器会立即触发,通过反射将 RaffleActivityController 实例中的 degradeSwitch 字段值更新为 “close”
  5. 此后,所有新的业务请求进入 someMethod() 时,都会走降级逻辑。整个过程,服务没有重启,配置实时生效。

总结

通过结合 ZooKeeper 的 Watch 机制与 Spring Boot 的扩展点(BeanPostProcessor),我们实现了一个轻量但实用的分布式动态配置中心。它特别适用于需要频繁调整的开关型配置,如熔断降级、黑白名单、功能开关等。

这个方案的优势在于:

  • 实时性:配置变更秒级同步到所有应用节点。
  • 无侵入:业务代码只需添加一个注解,对原有逻辑影响极小。
  • 高可用:依赖于 ZooKeeper 集群的高可用性。
  • 解耦:配置管理与业务应用分离。

当然,在生产环境中,你可以考虑在此基础上增加配置加密、权限控制、历史版本回溯等更高级的功能。希望这个实践能为你构建更灵活、更可靠的分布式系统提供一种思路。如果你对 ZooKeeper 在服务发现、分布式锁等其他场景的应用也感兴趣,欢迎在 云栈社区数据库/中间件/技术栈 板块与我们深入探讨。




上一篇:NextDocs AI文档生成工具流量暴增1541%的产品增长分析
下一篇:Redis实战:分布式锁优化与发布订阅在DDD订单场景中的应用
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-4-11 04:37 , Processed in 0.691529 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表