找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1679

积分

0

好友

215

主题
发表于 4 天前 | 查看: 14| 回复: 0

MySQL读写分离实战:基于ProxySQL实现性能翻倍

一、概述

1.1 背景介绍

数据库往往是整个系统架构中的性能瓶颈。在我负责的某金融交易系统中,随着业务量的增长,单台MySQL服务器逐渐不堪重负:高峰期CPU使用率超过90%,慢查询数量激增,用户投诉响应慢的工单越来越多。

经过分析发现,系统的读写比例约为8:2,大量的查询请求集中在主库上,而主库还要同时处理写入操作。这是典型的读多写少场景,最有效的优化方案就是读写分离

读写分离的核心思想是:将数据库的写操作(INSERT、UPDATE、DELETE)路由到主库,将读操作(SELECT)路由到从库。通过这种方式,可以将读负载分散到多个从库,主库专注于处理写操作,从而大幅提升系统整体的吞吐能力。

实施读写分离后,我们的系统性能提升了约2.5倍,主库CPU使用率降到30%以下,慢查询减少了80%,真正实现了数据库性能翻倍的目标。

1.2 技术特点

读写分离架构具有以下核心特点:

负载均衡

  • 读请求分散到多个从库,减轻主库压力
  • 支持按权重分配流量,灵活调整负载比例
  • 可根据从库性能差异进行差异化配置

线性扩展

  • 通过增加从库数量横向扩展读能力
  • 理论上读性能可以无限扩展
  • 新增从库对业务无影响

高可用保障

  • 从库故障自动摘除,不影响服务
  • 主库故障可快速切换从库为主库
  • 支持多种故障检测和切换机制

数据一致性

  • 基于MySQL主从复制实现数据同步
  • 支持半同步复制降低数据丢失风险
  • 可配置强制读主库保证数据一致性

1.3 适用场景

场景类型 读写比例 是否适合 说明
内容展示类 读>95% 非常适合 新闻、博客、电商商品详情
报表分析类 读>90% 非常适合 数据分析、报表查询
社交互动类 读80% 适合 社交动态、评论列表
交易系统类 读60% 较适合 订单查询、账户余额
实时竞价类 读50% 需评估 需要考虑延迟问题
强一致性场景 - 不适合 金融核心交易

不适用场景:

  • 对数据一致性要求极高的场景(如银行转账的余额读取)
  • 写多读少的场景(如日志系统)
  • 读写请求高度耦合的事务操作

1.4 环境要求

组件 版本要求 说明
MySQL 8.0.35+ / 8.4 LTS 本文基于8.0.35版本
操作系统 Rocky Linux 9 / Ubuntu 24.04 推荐Rocky Linux 9
ProxySQL 2.6.0+ 本文使用的中间件方案
MyCat 2.0+ 备选中间件方案
内存 8GB+ 中间件节点4GB+,数据库节点8GB+
磁盘 SSD 数据库节点必须使用SSD

服务器规划示例:

角色 IP地址 配置 说明
Master 192.168.1.11 8C16G MySQL主库
Slave1 192.168.1.12 8C16G MySQL从库1
Slave2 192.168.1.13 8C16G MySQL从库2
Proxy1 192.168.1.21 4C8G ProxySQL主
Proxy2 192.168.1.22 4C8G ProxySQL备

二、详细步骤

2.1 准备工作

2.1.1 MySQL主从复制搭建

读写分离的基础是MySQL主从复制,先搭建一主两从的复制架构:

#!/bin/bash
# MySQL 8.0 安装脚本(所有节点执行)

# 安装MySQL 8.0
dnf install -y mysql-server mysql

# 启动MySQL
systemctl start mysqld
systemctl enable mysqld

# 初始化root密码
mysql -e "ALTER USER 'root'@'localhost' IDENTIFIED BY 'RootP@ss2024!';"

主库配置(192.168.1.11):

# 编辑MySQL配置文件
cat > /etc/my.cnf.d/master.cnf << 'EOF'
[mysqld]
# 基础配置
server-id = 1
port = 3306
datadir = /var/lib/mysql
socket = /var/lib/mysql/mysql.sock
log-error = /var/log/mysql/mysqld.log
pid-file = /var/run/mysqld/mysqld.pid

# 字符集
character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci

# 二进制日志(主从复制必须)
log-bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
max_binlog_size = 1G
binlog_expire_logs_seconds = 604800
sync_binlog = 1

# GTID配置(推荐使用GTID模式)
gtid_mode = ON
enforce_gtid_consistency = ON
binlog_gtid_simple_recovery = ON

# 性能配置
innodb_buffer_pool_size = 8G
innodb_log_file_size = 1G
innodb_flush_log_at_trx_commit = 1
innodb_flush_method = O_DIRECT

# 连接配置
max_connections = 2000
max_connect_errors = 100000

# 半同步复制(可选但推荐)
plugin-load-add = semisync_source.so
rpl_semi_sync_source_enabled = 1
rpl_semi_sync_source_timeout = 1000
EOF

# 重启MySQL
systemctl restart mysqld

从库配置(192.168.1.12和192.168.1.13):

# 从库1配置 (192.168.1.12)
cat > /etc/my.cnf.d/slave.cnf << 'EOF'
[mysqld]
# 基础配置
server-id = 2
port = 3306
datadir = /var/lib/mysql
socket = /var/lib/mysql/mysql.sock
log-error = /var/log/mysql/mysqld.log
pid-file = /var/run/mysqld/mysqld.pid

# 字符集
character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci

# 二进制日志(从库也开启,方便级联复制或故障切换)
log-bin = mysql-bin
binlog_format = ROW
relay_log = relay-bin
log_replica_updates = ON

# GTID配置
gtid_mode = ON
enforce_gtid_consistency = ON

# 从库只读
read_only = ON
super_read_only = ON

# 性能配置
innodb_buffer_pool_size = 8G
innodb_log_file_size = 1G

# 并行复制(提高复制速度)
replica_parallel_type = LOGICAL_CLOCK
replica_parallel_workers = 8
replica_preserve_commit_order = ON

# 半同步复制
plugin-load-add = semisync_replica.so
rpl_semi_sync_replica_enabled = 1
EOF

# 从库2配置 (192.168.1.13) - 修改server-id为3
# 其他配置相同
2.1.2 配置主从复制

在主库创建复制用户:

-- 登录主库
mysql -uroot -p'RootP@ss2024!'

-- 创建复制用户
CREATE USER 'repl'@'192.168.1.%' IDENTIFIED BY 'ReplP@ss2024!';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'repl'@'192.168.1.%';
FLUSH PRIVILEGES;

-- 查看主库状态
SHOW MASTER STATUS\G
-- 记录File和Position,或者使用GTID模式则不需要

在从库配置复制:

-- 登录从库
mysql -uroot -p'RootP@ss2024!'

-- 配置复制源(GTID模式)
CHANGE REPLICATION SOURCE TO
    SOURCE_HOST='192.168.1.11',
    SOURCE_PORT=3306,
    SOURCE_USER='repl',
    SOURCE_PASSWORD='ReplP@ss2024!',
    SOURCE_AUTO_POSITION=1,
    GET_SOURCE_PUBLIC_KEY=1;

-- 启动复制
START REPLICA;

-- 查看复制状态
SHOW REPLICA STATUS\G

-- 关键检查项:
-- Replica_IO_Running: Yes
-- Replica_SQL_Running: Yes
-- Seconds_Behind_Source: 0
2.1.3 验证主从复制
-- 在主库创建测试数据
CREATE DATABASE test_rw;
USE test_rw;
CREATE TABLE test (id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(100));
INSERT INTO test (name) VALUES ('主库写入测试');

-- 在从库查询验证
SELECT * FROM test_rw.test;
-- 应该能看到"主库写入测试"的数据

2.2 核心配置

本文使用ProxySQL作为读写分离中间件,它是目前最主流的MySQL代理工具,具有高性能、灵活配置、丰富监控等优点。

2.2.1 安装ProxySQL
# 在Proxy节点安装ProxySQL (192.168.1.21, 192.168.1.22)

# 添加ProxySQL仓库
cat > /etc/yum.repos.d/proxysql.repo << 'EOF'
[proxysql_repo]
name=ProxySQL YUM repository
baseurl=https://repo.proxysql.com/ProxySQL/proxysql-2.6.x/centos/9
gpgcheck=1
gpgkey=https://repo.proxysql.com/ProxySQL/proxysql-2.6.x/repo_pub_key
EOF

# 安装
dnf install -y proxysql

# 启动
systemctl start proxysql
systemctl enable proxysql

# 验证
proxysql --version
# ProxySQL version 2.6.0-...
2.2.2 配置ProxySQL

ProxySQL使用管理端口6032进行配置,应用连接端口为6033:

# 连接ProxySQL管理接口
mysql -uadmin -padmin -h127.0.0.1 -P6032 --prompt 'ProxySQL> '

配置后端MySQL服务器:

-- 添加MySQL服务器
-- hostgroup_id: 10=写组(主库),20=读组(从库)

-- 添加主库到写组
INSERT INTO mysql_servers (hostgroup_id, hostname, port, weight, comment)
VALUES (10, '192.168.1.11', 3306, 1, 'Master');

-- 添加从库到读组
INSERT INTO mysql_servers (hostgroup_id, hostname, port, weight, comment)
VALUES (20, '192.168.1.12', 3306, 100, 'Slave1');

INSERT INTO mysql_servers (hostgroup_id, hostname, port, weight, comment)
VALUES (20, '192.168.1.13', 3306, 100, 'Slave2');

-- 查看配置
SELECT * FROM mysql_servers;

-- 加载配置到运行时
LOAD MYSQL SERVERS TO RUNTIME;

-- 保存配置到磁盘
SAVE MYSQL SERVERS TO DISK;

配置MySQL用户:

-- 添加ProxySQL监控用户(需要在MySQL中创建)
UPDATE global_variables SET variable_value='monitor' WHERE variable_name='mysql-monitor_username';
UPDATE global_variables SET variable_value='MonitorP@ss2024!' WHERE variable_name='mysql-monitor_password';

-- 添加应用访问用户
INSERT INTO mysql_users (username, password, default_hostgroup, transaction_persistent)
VALUES ('app_user', 'AppP@ss2024!', 10, 1);

-- transaction_persistent=1 表示事务期间所有查询都路由到同一组

LOAD MYSQL USERS TO RUNTIME;
SAVE MYSQL USERS TO DISK;
LOAD MYSQL VARIABLES TO RUNTIME;
SAVE MYSQL VARIABLES TO DISK;

在MySQL主库创建相应用户:

-- 登录MySQL主库
mysql -uroot -p'RootP@ss2024!'

-- 创建监控用户
CREATE USER 'monitor'@'192.168.1.%' IDENTIFIED BY 'MonitorP@ss2024!';
GRANT REPLICATION CLIENT, PROCESS ON *.* TO 'monitor'@'192.168.1.%';

-- 创建应用用户
CREATE USER 'app_user'@'192.168.1.%' IDENTIFIED BY 'AppP@ss2024!';
GRANT SELECT, INSERT, UPDATE, DELETE ON *.* TO 'app_user'@'192.168.1.%';

FLUSH PRIVILEGES;
2.2.3 配置读写分离规则
-- 连接ProxySQL管理接口
mysql -uadmin -padmin -h127.0.0.1 -P6032 --prompt 'ProxySQL> '

-- 配置读写分离规则组
INSERT INTO mysql_replication_hostgroups (writer_hostgroup, reader_hostgroup, comment)
VALUES (10, 20, 'RW Split Cluster');

LOAD MYSQL SERVERS TO RUNTIME;
SAVE MYSQL SERVERS TO DISK;

-- 配置查询路由规则
-- 规则1:SELECT语句路由到读组(从库)
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (100, 1, '^SELECT.*', 20, 1);

-- 规则2:SELECT ... FOR UPDATE 路由到写组(主库)
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (90, 1, '^SELECT.*FOR UPDATE', 10, 1);

-- 规则3:需要强一致性的查询路由到主库(通过SQL注释标记)
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (80, 1, '/\\*master\\*/', 10, 1);

-- 规则4:事务中的查询路由到主库
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (70, 1, '^BEGIN', 10, 1);

INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (71, 1, '^START TRANSACTION', 10, 1);

-- 规则5:写操作路由到主库
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (200, 1, '^INSERT', 10, 1);

INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (201, 1, '^UPDATE', 10, 1);

INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (202, 1, '^DELETE', 10, 1);

INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (203, 1, '^REPLACE', 10, 1);

-- 查看规则
SELECT rule_id, active, match_pattern, destination_hostgroup FROM mysql_query_rules ORDER BY rule_id;

-- 加载规则
LOAD MYSQL QUERY RULES TO RUNTIME;
SAVE MYSQL QUERY RULES TO DISK;
2.2.4 高级配置
-- 配置连接池
UPDATE global_variables SET variable_value=2000 WHERE variable_name='mysql-max_connections';
UPDATE global_variables SET variable_value=100 WHERE variable_name='mysql-default_max_latency_ms';

-- 配置健康检查
UPDATE global_variables SET variable_value=2000 WHERE variable_name='mysql-monitor_connect_interval';
UPDATE global_variables SET variable_value=1000 WHERE variable_name='mysql-monitor_ping_interval';
UPDATE global_variables SET variable_value=2000 WHERE variable_name='mysql-monitor_read_only_interval';
UPDATE global_variables SET variable_value=1000 WHERE variable_name='mysql-monitor_replication_lag_interval';
UPDATE global_variables SET variable_value=30 WHERE variable_name='mysql-monitor_replication_lag_timeout';

-- 配置复制延迟阈值(超过此延迟的从库自动摘除)
UPDATE global_variables SET variable_value=10 WHERE variable_name='mysql-monitor_slave_lag_when_null';

-- 加载配置
LOAD MYSQL VARIABLES TO RUNTIME;
SAVE MYSQL VARIABLES TO DISK;

2.3 启动和验证

2.3.1 验证后端服务器状态
-- 连接ProxySQL管理接口
mysql -uadmin -padmin -h127.0.0.1 -P6032

-- 查看后端服务器状态
SELECT hostgroup_id, hostname, port, status, weight, max_replication_lag
FROM mysql_servers;

-- 查看服务器运行时状态
SELECT * FROM runtime_mysql_servers;

-- 查看连接池状态
SELECT hostgroup, srv_host, srv_port, status, ConnUsed, ConnFree, ConnOK, ConnERR
FROM stats_mysql_connection_pool;

-- 查看复制延迟监控
SELECT hostname, port, time_start_us, replication_lag
FROM mysql_server_replication_lag_log
ORDER BY time_start_us DESC LIMIT 10;
2.3.2 验证读写分离
# 通过ProxySQL连接MySQL(使用6033端口)
mysql -uapp_user -p'AppP@ss2024!' -h192.168.1.21 -P6033

# 测试写操作(应该路由到主库)
USE test_rw;
INSERT INTO test (name) VALUES ('通过ProxySQL写入');

# 测试读操作(应该路由到从库)
SELECT * FROM test;

查看路由统计:

-- 连接ProxySQL管理接口
mysql -uadmin -padmin -h127.0.0.1 -P6032

-- 查看查询路由统计
SELECT hostgroup, digest_text, count_star, sum_time
FROM stats_mysql_query_digest
ORDER BY count_star DESC LIMIT 20;

-- 查看按hostgroup的统计
SELECT hostgroup, SUM(count_star) as total_queries
FROM stats_mysql_query_digest
GROUP BY hostgroup;

-- 预期结果:
-- hostgroup 10 (写组): INSERT/UPDATE/DELETE等写操作
-- hostgroup 20 (读组): SELECT等读操作
2.3.3 测试故障转移
# 模拟从库1故障
systemctl stop mysqld  # 在192.168.1.12执行

# 查看ProxySQL自动摘除
mysql -uadmin -padmin -h127.0.0.1 -P6032 -e "SELECT hostgroup_id, hostname, status FROM mysql_servers;"

# 应该看到192.168.1.12的status变为SHUNNED

# 恢复从库1
systemctl start mysqld  # 在192.168.1.12执行

# 稍等片刻,ProxySQL会自动将其加回读组

三、示例代码和配置

3.1 完整配置示例

3.1.1 ProxySQL完整配置文件
# /etc/proxysql.cnf
# ProxySQL 完整配置文件

datadir="/var/lib/proxysql"
errorlog="/var/lib/proxysql/proxysql.log"

admin_variables=
{
    admin_credentials="admin:admin;cluster_admin:ClusterP@ss!"
    mysql_ifaces="0.0.0.0:6032"
    cluster_username="cluster_admin"
    cluster_password="ClusterP@ss!"
    web_enabled=true
    web_port=6080
}

mysql_variables=
{
    threads=8
    max_connections=4096
    default_query_delay=0
    default_query_timeout=36000000
    have_compress=true
    poll_timeout=2000
    interfaces="0.0.0.0:6033"
    default_schema="information_schema"
    stacksize=1048576
    server_version="8.0.35"
    connect_timeout_server=3000
    monitor_username="monitor"
    monitor_password="MonitorP@ss2024!"
    monitor_history=600000
    monitor_connect_interval=60000
    monitor_ping_interval=10000
    monitor_read_only_interval=1500
    monitor_read_only_timeout=500
    ping_interval_server_msec=120000
    ping_timeout_server=500
    commands_stats=true
    sessions_sort=true
    connect_retries_on_failure=10
    monitor_replication_lag_interval=2000
    monitor_replication_lag_timeout=1000
}

mysql_replication_hostgroups =
(
    {
        writer_hostgroup=10
        reader_hostgroup=20
        comment="RW Split Production Cluster"
    }
)

mysql_servers =
(
    {
        address="192.168.1.11"
        port=3306
        hostgroup=10
        weight=1
        max_connections=500
        comment="Master"
    },
    {
        address="192.168.1.12"
        port=3306
        hostgroup=20
        weight=100
        max_connections=500
        max_replication_lag=30
        comment="Slave1"
    },
    {
        address="192.168.1.13"
        port=3306
        hostgroup=20
        weight=100
        max_connections=500
        max_replication_lag=30
        comment="Slave2"
    }
)

mysql_users =
(
    {
        username="app_user"
        password="AppP@ss2024!"
        default_hostgroup=10
        transaction_persistent=1
        active=1
    },
    {
        username="readonly_user"
        password="ReadOnlyP@ss2024!"
        default_hostgroup=20
        transaction_persistent=0
        active=1
    }
)

mysql_query_rules =
(
    {
        rule_id=70
        active=1
        match_pattern="^BEGIN"
        destination_hostgroup=10
        apply=1
    },
    {
        rule_id=71
        active=1
        match_pattern="^START TRANSACTION"
        destination_hostgroup=10
        apply=1
    },
    {
        rule_id=80
        active=1
        match_pattern="/\\*master\\*/"
        destination_hostgroup=10
        apply=1
    },
    {
        rule_id=90
        active=1
        match_pattern="^SELECT.*FOR UPDATE"
        destination_hostgroup=10
        apply=1
    },
    {
        rule_id=100
        active=1
        match_pattern="^SELECT"
        destination_hostgroup=20
        apply=1
    },
    {
        rule_id=200
        active=1
        match_pattern="^INSERT"
        destination_hostgroup=10
        apply=1
    },
    {
        rule_id=201
        active=1
        match_pattern="^UPDATE"
        destination_hostgroup=10
        apply=1
    },
    {
        rule_id=202
        active=1
        match_pattern="^DELETE"
        destination_hostgroup=10
        apply=1
    }
)

scheduler =
(
)

mysql_galera_hostgroups =
(
)
3.1.2 应用层读写分离(无中间件方案)

如果不想使用中间件,也可以在应用层实现读写分离。以下是各语言的实现示例:

Spring Boot配置

# application.yml
spring:
  datasource:
    master:
      jdbc-url: jdbc:mysql://192.168.1.11:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai
      username: app_user
      password: AppP@ss2024!
      driver-class-name: com.mysql.cj.jdbc.Driver
      hikari:
        pool-name: master-pool
        maximum-pool-size: 50
        minimum-idle: 10
        connection-timeout: 5000

    slaves:
    - jdbc-url: jdbc:mysql://192.168.1.12:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai
      username: app_user
      password: AppP@ss2024!
      driver-class-name: com.mysql.cj.jdbc.Driver
      hikari:
        pool-name: slave1-pool
        maximum-pool-size: 100
        minimum-idle: 20

    - jdbc-url: jdbc:mysql://192.168.1.13:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai
      username: app_user
      password: AppP@ss2024!
      driver-class-name: com.mysql.cj.jdbc.Driver
      hikari:
        pool-name: slave2-pool
        maximum-pool-size: 100
        minimum-idle: 20

动态数据源实现

// DataSourceType.java - 数据源类型枚举
package com.example.datasource;

public enum DataSourceType {
    MASTER,
    SLAVE
}

// DynamicDataSourceContextHolder.java - 线程本地变量持有当前数据源
package com.example.datasource;

public class DynamicDataSourceContextHolder {

    private static final ThreadLocal<DataSourceType> contextHolder = new ThreadLocal<>();

    public static void setDataSourceType(DataSourceType dataSourceType) {
        contextHolder.set(dataSourceType);
    }

    public static DataSourceType getDataSourceType() {
        return contextHolder.get() == null ? DataSourceType.MASTER : contextHolder.get();
    }

    public static void clearDataSourceType() {
        contextHolder.remove();
    }
}

// DynamicDataSource.java - 动态数据源
package com.example.datasource;

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

public class DynamicDataSource extends AbstractRoutingDataSource {

    @Override
    protected Object determineCurrentLookupKey() {
        return DynamicDataSourceContextHolder.getDataSourceType();
    }
}

// DataSourceConfig.java - 数据源配置类
package com.example.config;

import com.example.datasource.DataSourceType;
import com.example.datasource.DynamicDataSource;
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class DataSourceConfig {

    @Bean
    @ConfigurationProperties("spring.datasource.master")
    public DataSource masterDataSource() {
        return new HikariDataSource();
    }

    @Bean
    @ConfigurationProperties("spring.datasource.slaves[0]")
    public DataSource slave1DataSource() {
        return new HikariDataSource();
    }

    @Bean
    @ConfigurationProperties("spring.datasource.slaves[1]")
    public DataSource slave2DataSource() {
        return new HikariDataSource();
    }

    @Bean
    @Primary
    public DataSource dynamicDataSource() {
        DynamicDataSource dynamicDataSource = new DynamicDataSource();

        // 设置默认数据源为主库
        dynamicDataSource.setDefaultTargetDataSource(masterDataSource());

        // 配置多数据源
        Map<Object, Object> dataSourceMap = new HashMap<>();
        dataSourceMap.put(DataSourceType.MASTER, masterDataSource());
        // 从库使用负载均衡数据源
        dataSourceMap.put(DataSourceType.SLAVE, createSlaveLoadBalanceDataSource());

        dynamicDataSource.setTargetDataSources(dataSourceMap);
        return dynamicDataSource;
    }

    // 创建从库负载均衡数据源
    private DataSource createSlaveLoadBalanceDataSource() {
        // 简单的轮询实现,生产环境可使用更复杂的策略
        return new LoadBalanceDataSource(slave1DataSource(), slave2DataSource());
    }
}

// LoadBalanceDataSource.java - 负载均衡数据源
package com.example.datasource;

import javax.sql.DataSource;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

public class LoadBalanceDataSource implements DataSource {

    private final DataSource[] dataSources;
    private final AtomicInteger counter = new AtomicInteger(0);

    public LoadBalanceDataSource(DataSource... dataSources) {
        this.dataSources = dataSources;
    }

    @Override
    public Connection getConnection() throws SQLException {
        // 轮询选择数据源
        int index = Math.abs(counter.getAndIncrement() % dataSources.length);
        return dataSources[index].getConnection();
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        int index = Math.abs(counter.getAndIncrement() % dataSources.length);
        return dataSources[index].getConnection(username, password);
    }

    // 其他方法委托给第一个数据源
    @Override
    public PrintWriter getLogWriter() throws SQLException {
        return dataSources[0].getLogWriter();
    }

    @Override
    public void setLogWriter(PrintWriter out) throws SQLException {
        dataSources[0].setLogWriter(out);
    }

    @Override
    public void setLoginTimeout(int seconds) throws SQLException {
        dataSources[0].setLoginTimeout(seconds);
    }

    @Override
    public int getLoginTimeout() throws SQLException {
        return dataSources[0].getLoginTimeout();
    }

    @Override
    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
        return dataSources[0].getParentLogger();
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return dataSources[0].unwrap(iface);
    }

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return dataSources[0].isWrapperFor(iface);
    }
}

// ReadOnly.java - 只读注解
package com.example.annotation;

import java.lang.annotation.*;

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ReadOnly {
}

// DataSourceAspect.java - 数据源切换切面
package com.example.aspect;

import com.example.annotation.ReadOnly;
import com.example.datasource.DataSourceType;
import com.example.datasource.DynamicDataSourceContextHolder;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

@Aspect
@Order(-1)  // 确保在事务切面之前执行
@Component
public class DataSourceAspect {

    @Around("@annotation(com.example.annotation.ReadOnly) || @within(com.example.annotation.ReadOnly)")
    public Object around(ProceedingJoinPoint point) throws Throwable {
        MethodSignature signature = (MethodSignature) point.getSignature();

        // 检查方法或类上是否有@ReadOnly注解
        ReadOnly readOnly = signature.getMethod().getAnnotation(ReadOnly.class);
        if (readOnly == null) {
            readOnly = point.getTarget().getClass().getAnnotation(ReadOnly.class);
        }

        if (readOnly != null) {
            DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.SLAVE);
        }

        try {
            return point.proceed();
        } finally {
            DynamicDataSourceContextHolder.clearDataSourceType();
        }
    }
}

// UserService.java - 使用示例
package com.example.service;

import com.example.annotation.ReadOnly;
import com.example.entity.User;
import com.example.mapper.UserMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

@Service
public class UserService {

    @Autowired
    private UserMapper userMapper;

    // 读操作使用从库
    @ReadOnly
    public User findById(Long id) {
        return userMapper.findById(id);
    }

    // 列表查询使用从库
    @ReadOnly
    public List<User> findAll() {
        return userMapper.findAll();
    }

    // 写操作使用主库
    @Transactional
    public void create(User user) {
        userMapper.insert(user);
    }

    // 写后读场景:强制使用主库(不加@ReadOnly)
    @Transactional
    public User createAndGet(User user) {
        userMapper.insert(user);
        // 写后立即读取,必须从主库读,避免复制延迟问题
        return userMapper.findById(user.getId());
    }
}

3.2 实际应用案例

3.2.1 电商场景读写分离
// OrderService.java - 订单服务读写分离示例
package com.ecommerce.service;

import com.example.annotation.ReadOnly;
import com.example.datasource.DataSourceType;
import com.example.datasource.DynamicDataSourceContextHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class OrderService {

    /**
     * 创建订单 - 写操作,使用主库
     */
    @Transactional
    public Order createOrder(OrderRequest request) {
        // 扣减库存
        inventoryMapper.decreaseStock(request.getProductId(), request.getQuantity());

        // 创建订单
        Order order = new Order();
        order.setUserId(request.getUserId());
        order.setProductId(request.getProductId());
        order.setAmount(request.getAmount());
        order.setStatus(OrderStatus.CREATED);
        orderMapper.insert(order);

        // 写入订单项
        orderItemMapper.batchInsert(order.getId(), request.getItems());

        return order;
    }

    /**
     * 查询订单详情 - 读操作,使用从库
     */
    @ReadOnly
    public OrderDetail getOrderDetail(Long orderId) {
        Order order = orderMapper.findById(orderId);
        List<OrderItem> items = orderItemMapper.findByOrderId(orderId);
        Product product = productMapper.findById(order.getProductId());

        return OrderDetail.builder()
                .order(order)
                .items(items)
                .product(product)
                .build();
    }

    /**
     * 支付后查询订单 - 写后读场景,强制使用主库
     * 避免复制延迟导致用户看不到最新状态
     */
    @Transactional
    public Order payAndGet(Long orderId, PaymentInfo payment) {
        // 更新订单状态
        orderMapper.updateStatus(orderId, OrderStatus.PAID);

        // 记录支付信息
        paymentMapper.insert(payment);

        // 强制从主库读取(不使用@ReadOnly,使用主库)
        return orderMapper.findById(orderId);
    }

    /**
     * 订单列表查询 - 读操作,使用从库
     * 带分页和筛选条件
     */
    @ReadOnly
    public PageResult<Order> listOrders(OrderQuery query) {
        int total = orderMapper.count(query);
        List<Order> orders = orderMapper.findByQuery(query);

        return PageResult.of(orders, total, query.getPageSize());
    }

    /**
     * 实时库存查询 - 需要强一致性,使用主库
     * 库存是敏感数据,必须从主库读取
     */
    public Integer getRealTimeStock(Long productId) {
        // 强制切换到主库
        DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.MASTER);
        try {
            return inventoryMapper.getStock(productId);
        } finally {
            DynamicDataSourceContextHolder.clearDataSourceType();
        }
    }
}
3.2.2 处理复制延迟
// ReplicationLagAwareService.java - 复制延迟感知服务
package com.example.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class ReplicationLagAwareService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String WRITE_FLAG_PREFIX = "rw:write_flag:";
    private static final int REPLICATION_LAG_SECONDS = 3;  // 预估复制延迟

    /**
     * 写操作后标记,确保后续读取使用主库
     */
    public void markWrite(String userId, String dataType) {
        String key = WRITE_FLAG_PREFIX + userId + ":" + dataType;
        redisTemplate.opsForValue().set(key, "1", REPLICATION_LAG_SECONDS, TimeUnit.SECONDS);
    }

    /**
     * 检查是否需要从主库读取
     */
    public boolean shouldReadFromMaster(String userId, String dataType) {
        String key = WRITE_FLAG_PREFIX + userId + ":" + dataType;
        return Boolean.TRUE.equals(redisTemplate.hasKey(key));
    }

    /**
     * 带复制延迟感知的读取
     */
    public <T> T readWithLagAwareness(String userId, String dataType,
                                      java.util.function.Supplier<T> masterRead,
                                      java.util.function.Supplier<T> slaveRead) {
        if (shouldReadFromMaster(userId, dataType)) {
            // 近期有写操作,从主库读取
            return masterRead.get();
        } else {
            // 从从库读取
            return slaveRead.get();
        }
    }
}

// 使用示例
@Service
public class UserProfileService {

    @Autowired
    private ReplicationLagAwareService lagAwareService;

    @Autowired
    private UserMapper userMapper;

    // 更新用户资料
    @Transactional
    public void updateProfile(Long userId, UserProfile profile) {
        userMapper.updateProfile(userId, profile);
        // 标记写操作
        lagAwareService.markWrite(userId.toString(), "profile");
    }

    // 获取用户资料
    public UserProfile getProfile(Long userId) {
        return lagAwareService.readWithLagAwareness(
            userId.toString(),
            "profile",
            () -> {
                // 主库读取
                DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.MASTER);
                try {
                    return userMapper.findProfileById(userId);
                } finally {
                    DynamicDataSourceContextHolder.clearDataSourceType();
                }
            },
            () -> {
                // 从库读取
                DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.SLAVE);
                try {
                    return userMapper.findProfileById(userId);
                } finally {
                    DynamicDataSourceContextHolder.clearDataSourceType();
                }
            }
        );
    }
}
3.2.3 Python读写分离实现
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL读写分离Python实现
依赖: pip install pymysql DBUtils
"""

import pymysql
from DBUtils.PooledDB import PooledDB
from functools import wraps
import threading
import random
from contextlib import contextmanager

class MySQLPool:
    """MySQL连接池封装"""

    def __init__(self, host, port, user, password, database, max_connections=50):
        self.pool = PooledDB(
            creator=pymysql,
            maxconnections=max_connections,
            mincached=5,
            maxcached=20,
            blocking=True,
            host=host,
            port=port,
            user=user,
            password=password,
            database=database,
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor,
            autocommit=True
        )

    def get_connection(self):
        return self.pool.connection()

class ReadWriteSplitDatabase:
    """读写分离数据库客户端"""

    # 线程本地存储,用于事务中强制使用主库
    _local = threading.local()

    def __init__(self, master_config, slave_configs):
        """
        初始化读写分离数据库

        Args:
            master_config: 主库配置
            slave_configs: 从库配置列表
        """
        # 创建主库连接池
        self.master_pool = MySQLPool(**master_config)

        # 创建从库连接池列表
        self.slave_pools = [MySQLPool(**config) for config in slave_configs]
        self._slave_index = 0

    def _get_slave_pool(self):
        """轮询获取从库连接池"""
        if not self.slave_pools:
            return self.master_pool

        pool = self.slave_pools[self._slave_index % len(self.slave_pools)]
        self._slave_index += 1
        return pool

    def _in_transaction(self):
        """检查是否在事务中"""
        return getattr(self._local, 'in_transaction', False)

    def _force_master(self):
        """检查是否强制使用主库"""
        return getattr(self._local, 'force_master', False)

    @contextmanager
    def transaction(self):
        """事务上下文管理器"""
        self._local.in_transaction = True
        conn = self.master_pool.get_connection()
        conn.autocommit(False)

        try:
            yield conn
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            self._local.in_transaction = False
            conn.close()

    @contextmanager
    def force_master_context(self):
        """强制使用主库的上下文"""
        self._local.force_master = True
        try:
            yield
        finally:
            self._local.force_master = False

    def execute_write(self, sql, params=None):
        """执行写操作(使用主库)"""
        conn = self.master_pool.get_connection()
        try:
            with conn.cursor() as cursor:
                cursor.execute(sql, params)
                return cursor.lastrowid
        finally:
            conn.close()

    def execute_read(self, sql, params=None):
        """执行读操作(使用从库,事务中使用主库)"""
        if self._in_transaction() or self._force_master():
            pool = self.master_pool
        else:
            pool = self._get_slave_pool()

        conn = pool.get_connection()
        try:
            with conn.cursor() as cursor:
                cursor.execute(sql, params)
                return cursor.fetchall()
        finally:
            conn.close()

    def execute_read_one(self, sql, params=None):
        """读取单条记录"""
        result = self.execute_read(sql, params)
        return result[0] if result else None

    def batch_insert(self, table, columns, values_list):
        """批量插入"""
        if not values_list:
            return 0

        placeholders = ', '.join(['%s'] * len(columns))
        columns_str = ', '.join(columns)
        sql = f"INSERT INTO {table} ({columns_str}) VALUES ({placeholders})"

        conn = self.master_pool.get_connection()
        try:
            with conn.cursor() as cursor:
                cursor.executemany(sql, values_list)
                return cursor.rowcount
        finally:
            conn.close()

def read_only(func):
    """只读装饰器(标识方法使用从库)"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        return func(*args, **kwargs)
    wrapper._read_only = True
    return wrapper

def write_then_read_master(func):
    """写后读强制主库装饰器"""
    @wraps(func)
    def wrapper(self, *args, **kwargs):
        with self.db.force_master_context():
            return func(self, *args, **kwargs)
    return wrapper

# 使用示例
class OrderRepository:
    """订单数据访问层"""

    def __init__(self, db: ReadWriteSplitDatabase):
        self.db = db

    @read_only
    def find_by_id(self, order_id):
        """查询订单(从库)"""
        sql = "SELECT * FROM orders WHERE id = %s"
        return self.db.execute_read_one(sql, (order_id,))

    @read_only
    def find_by_user(self, user_id, page=1, page_size=20):
        """查询用户订单列表(从库)"""
        offset = (page - 1) * page_size
        sql = """
            SELECT * FROM orders
            WHERE user_id = %s
            ORDER BY created_at DESC
            LIMIT %s OFFSET %s
        """
        return self.db.execute_read(sql, (user_id, page_size, offset))

    def create_order(self, order_data):
        """创建订单(主库)"""
        sql = """
            INSERT INTO orders (user_id, product_id, amount, status, created_at)
            VALUES (%(user_id)s, %(product_id)s, %(amount)s, %(status)s, NOW())
        """
        return self.db.execute_write(sql, order_data)

    @write_then_read_master
    def create_and_get(self, order_data):
        """创建订单并返回(强制主库读取)"""
        order_id = self.create_order(order_data)
        return self.find_by_id(order_id)

    def update_status(self, order_id, status):
        """更新订单状态(主库)"""
        sql = "UPDATE orders SET status = %s WHERE id = %s"
        return self.db.execute_write(sql, (status, order_id))

# 初始化
if __name__ == '__main__':
    # 配置
    master_config = {
        'host': '192.168.1.11',
        'port': 3306,
        'user': 'app_user',
        'password': 'AppP@ss2024!',
        'database': 'ecommerce'
    }

    slave_configs = [
        {
            'host': '192.168.1.12',
            'port': 3306,
            'user': 'app_user',
            'password': 'AppP@ss2024!',
            'database': 'ecommerce'
        },
        {
            'host': '192.168.1.13',
            'port': 3306,
            'user': 'app_user',
            'password': 'AppP@ss2024!',
            'database': 'ecommerce'
        }
    ]

    # 创建数据库客户端
    db = ReadWriteSplitDatabase(master_config, slave_configs)
    order_repo = OrderRepository(db)

    # 使用事务
    with db.transaction() as conn:
        with conn.cursor() as cursor:
            cursor.execute("INSERT INTO orders (user_id, amount) VALUES (%s, %s)", (1, 100))
            cursor.execute("UPDATE inventory SET stock = stock - 1 WHERE product_id = %s", (1001,))
        # 事务中的读取使用主库
        with conn.cursor() as cursor:
            cursor.execute("SELECT * FROM orders WHERE user_id = %s", (1,))
            orders = cursor.fetchall()

四、最佳实践和注意事项

4.1 最佳实践

4.1.1 性能优化

1. 读写比例分析

-- 在ProxySQL中分析读写比例
SELECT
CASE
WHEN digest_text LIKE 'SELECT%' THEN 'READ'
ELSE 'WRITE'
END as operation_type,
SUM(count_star) as total_queries,
ROUND(SUM(count_star) * 100.0 / (SELECT SUM(count_star) FROM stats_mysql_query_digest), 2) as percentage
FROM stats_mysql_query_digest
GROUP BY operation_type;

-- 按hostgroup统计
SELECT 
    hostgroup,
    SUM(count_star) as total_queries,
    SUM(sum_time) as total_time_us,
    ROUND(SUM(sum_time) / SUM(count_star), 2) as avg_time_us
FROM stats_mysql_query_digest
GROUP BY hostgroup;

2. 连接池优化

-- ProxySQL连接池配置
UPDATE global_variables SET variable_value=2000 WHERE variable_name='mysql-max_connections';
UPDATE global_variables SET variable_value=1 WHERE variable_name='mysql-multiplexing';
UPDATE global_variables SET variable_value=10000 WHERE variable_name='mysql-connection_max_age_ms';
UPDATE global_variables SET variable_value=1000 WHERE variable_name='mysql-free_connections_pct';

LOAD MYSQL VARIABLES TO RUNTIME;
SAVE MYSQL VARIABLES TO DISK;

3. 查询缓存(ProxySQL Query Cache)

-- 启用查询缓存(适合读多写少场景)
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, cache_ttl, apply)
VALUES (500, 1, '^SELECT.*FROM products.*', 60000, 1);

-- 缓存商品列表60秒
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, cache_ttl, apply)
VALUES (501, 1, '^SELECT.*FROM categories.*', 300000, 1);

-- 缓存分类列表5分钟

LOAD MYSQL QUERY RULES TO RUNTIME;
SAVE MYSQL QUERY RULES TO DISK;
4.1.2 安全加固

1. 用户权限最小化

-- 主库上创建不同权限的用户

-- 只读用户(给从库查询使用)
CREATE USER 'readonly_app'@'192.168.1.%' IDENTIFIED BY 'ReadOnlyP@ss!';
GRANT SELECT ON app_db.* TO 'readonly_app'@'192.168.1.%';

-- 读写用户(给主库使用)
CREATE USER 'readwrite_app'@'192.168.1.%' IDENTIFIED BY 'ReadWriteP@ss!';
GRANT SELECT, INSERT, UPDATE, DELETE ON app_db.* TO 'readwrite_app'@'192.168.1.%';

-- 禁止危险操作
-- 不授予DROP、TRUNCATE、ALTER等权限

FLUSH PRIVILEGES;

2. ProxySQL访问控制

-- 修改ProxySQL管理密码
UPDATE global_variables SET variable_value='admin:NewAdminP@ss2024!' WHERE variable_name='admin-admin_credentials';

-- 限制管理接口访问IP
UPDATE global_variables SET variable_value='127.0.0.1:6032;192.168.1.0/24:6032' WHERE variable_name='admin-mysql_ifaces';

LOAD ADMIN VARIABLES TO RUNTIME;
SAVE ADMIN VARIABLES TO DISK;

3. SQL注入防护

-- 配置SQL黑名单
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, error_msg, apply)
VALUES (1, 1, '.*UNION.*SELECT.*', 'Query not allowed', 1);

INSERT INTO mysql_query_rules (rule_id, active, match_pattern, error_msg, apply)
VALUES (2, 1, '.*INTO OUTFILE.*', 'Query not allowed', 1);

INSERT INTO mysql_query_rules (rule_id, active, match_pattern, error_msg, apply)
VALUES (3, 1, '.*LOAD_FILE.*', 'Query not allowed', 1);

LOAD MYSQL QUERY RULES TO RUNTIME;
SAVE MYSQL QUERY RULES TO DISK;
4.1.3 高可用配置

1. ProxySQL双节点高可用

# 使用Keepalived实现ProxySQL高可用
# 在两个ProxySQL节点安装Keepalived
dnf install -y keepalived

# 主节点配置 /etc/keepalived/keepalived.conf
cat > /etc/keepalived/keepalived.conf << 'EOF'
global_defs {
    router_id PROXYSQL_MASTER
}

vrrp_script check_proxysql {
    script "/usr/local/bin/check_proxysql.sh"
    interval 2
    weight -20
}

vrrp_instance VI_PROXYSQL {
    state MASTER
    interface eth0
    virtual_router_id 60
    priority 100
    advert_int 1

    authentication {
        auth_type PASS
        auth_pass proxysql123
    }

    virtual_ipaddress {
        192.168.1.20/24
    }

    track_script {
        check_proxysql
    }
}
EOF

# 健康检查脚本
cat > /usr/local/bin/check_proxysql.sh << 'EOF'
#!/bin/bash
# 检查ProxySQL服务是否正常
mysql -uadmin -padmin -h127.0.0.1 -P6032 -e "SELECT 1" > /dev/null 2>&1
if [ $? -eq 0 ]; then
    exit 0
else
    exit 1
fi
EOF
chmod +x /usr/local/bin/check_proxysql.sh

# 启动Keepalived
systemctl start keepalived
systemctl enable keepalived

2. ProxySQL集群配置

-- 配置ProxySQL集群同步
INSERT INTO proxysql_servers (hostname, port, weight, comment)
VALUES ('192.168.1.21', 6032, 1, 'ProxySQL1');

INSERT INTO proxysql_servers (hostname, port, weight, comment)
VALUES ('192.168.1.22', 6032, 1, 'ProxySQL2');

LOAD PROXYSQL SERVERS TO RUNTIME;
SAVE PROXYSQL SERVERS TO DISK;

-- 启用集群同步
UPDATE global_variables SET variable_value='cluster_admin' WHERE variable_name='admin-cluster_username';
UPDATE global_variables SET variable_value='ClusterP@ss!' WHERE variable_name='admin-cluster_password';

LOAD ADMIN VARIABLES TO RUNTIME;
SAVE ADMIN VARIABLES TO DISK;

4.2 注意事项

4.2.1 配置注意

1. 复制延迟处理

# 监控复制延迟的阈值配置
proxysql:
# 从库延迟超过此值自动摘除(秒)
max_replication_lag: 30

# 延迟检测间隔(毫秒)
monitor_replication_lag_interval: 2000

mysql:
# 并行复制配置(减少复制延迟)
replica_parallel_type: LOGICAL_CLOCK
replica_parallel_workers: 8

2. 事务一致性

-- ProxySQL事务处理配置
-- transaction_persistent=1 确保事务中所有查询路由到同一服务器
INSERT INTO mysql_users (username, password, default_hostgroup, transaction_persistent)
VALUES ('app_user', 'password', 10, 1);

-- 启用事务追踪
UPDATE global_variables SET variable_value=1 WHERE variable_name='mysql-forward_autocommit';
UPDATE global_variables SET variable_value=1 WHERE variable_name='mysql-autocommit_false_is_transaction';
4.2.2 常见错误
错误类型 错误信息 原因分析 解决方案
连接失败 Can't connect to MySQL server 后端MySQL不可用 检查MySQL服务状态和网络
认证失败 Access denied for user 密码错误或用户不存在 检查ProxySQL和MySQL用户配置
复制延迟 Replication lag too high 从库复制延迟超过阈值 检查从库性能,优化复制配置
路由错误 Query routed to wrong hostgroup 查询规则配置错误 检查query_rules配置顺序
连接耗尽 Too many connections 连接池配置过小 增加max_connections
读写不一致 Data inconsistency 复制延迟导致 使用强制主库读取机制
事务失败 Transaction rolled back 跨hostgroup事务 确保事务在同一hostgroup
超时 Query timeout 慢查询或网络问题 优化查询或增加超时时间
4.2.3 兼容性注意

1. 应用兼容性

-- 需要注意的SQL兼容性问题

-- 1. LAST_INSERT_ID() 在读写分离后的行为
-- 必须在写入后立即获取,且确保在同一连接
long id = jdbcTemplate.queryForObject("SELECT LAST_INSERT_ID()", Long.class);

-- 2. 自增ID的获取
-- 推荐使用KeyHolder
KeyHolder keyHolder = new GeneratedKeyHolder();
jdbcTemplate.update(connection -> {
    PreparedStatement ps = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
    ps.setString(1, name);
    return ps;
}, keyHolder);
Long id = keyHolder.getKey().longValue();

-- 3. 临时表的使用
-- 临时表只存在于当前连接,读写分离后可能找不到
-- 解决:使用永久表或强制主库

2. 中间件兼容性

中间件 版本 MySQL 8.0支持 特点
ProxySQL 2.6+ 完整支持 功能丰富,性能好
MyCat 2.0+ 完整支持 分库分表能力强
MaxScale 6.0+ 完整支持 MariaDB官方,易用
MySQL Router 8.0+ 完整支持 MySQL官方,简单

五、故障排查和监控

5.1 故障排查

5.1.1 日志分析
# ProxySQL日志
tail -f /var/lib/proxysql/proxysql.log

# 关键日志模式
grep -E "Error|Warning|SHUNNED|OFFLINE" /var/lib/proxysql/proxysql.log

# MySQL慢查询日志
tail -f /var/log/mysql/slow-query.log

# 主从复制错误
grep -E "Error|Warning" /var/log/mysql/mysqld.log
5.1.2 常见问题排查

问题1:从库复制中断

-- 在从库执行
SHOW REPLICA STATUS\G

-- 检查关键字段:
-- Replica_IO_Running: 应为Yes
-- Replica_SQL_Running: 应为Yes
-- Last_Error: 查看错误信息

-- 常见修复方法:
-- 1. 跳过错误(慎用)
STOP REPLICA;
SET GLOBAL SQL_SLAVE_SKIP_COUNTER = 1;
START REPLICA;

-- 2. 使用GTID跳过(推荐)
STOP REPLICA;
SET GTID_NEXT='错误的GTID';
BEGIN; COMMIT;
SET GTID_NEXT='AUTOMATIC';
START REPLICA;

-- 3. 重新全量同步
STOP REPLICA;
RESET REPLICA ALL;
-- 重新配置复制...

问题2:ProxySQL后端服务器状态异常

-- 连接ProxySQL管理接口
mysql -uadmin -padmin -h127.0.0.1 -P6032

-- 查看服务器状态
SELECT hostgroup_id, hostname, port, status, max_replication_lag
FROM mysql_servers;

-- 状态说明:
-- ONLINE: 正常
-- SHUNNED: 临时摘除(会自动恢复)
-- OFFLINE_SOFT: 软下线(不接受新连接)
-- OFFLINE_HARD: 硬下线(完全不可用)

-- 查看监控日志
SELECT * FROM mysql_server_connect_log ORDER BY time_start_us DESC LIMIT 10;
SELECT * FROM mysql_server_ping_log ORDER BY time_start_us DESC LIMIT 10;

-- 手动恢复节点
UPDATE mysql_servers SET status='ONLINE' WHERE hostname='192.168.1.12';
LOAD MYSQL SERVERS TO RUNTIME;

问题3:查询路由错误

-- 查看查询规则匹配情况
SELECT rule_id, match_pattern, hits, destination_hostgroup
FROM stats_mysql_query_rules
ORDER BY rule_id;

-- 测试查询路由(不执行)
-- 使用mysql_query_rules的digest分析
SELECT hostgroup, digest_text, count_star
FROM stats_mysql_query_digest
ORDER BY count_star DESC
LIMIT 20;

-- 检查规则优先级(rule_id越小优先级越高)
SELECT rule_id, active, match_pattern, destination_hostgroup
FROM mysql_query_rules
ORDER BY rule_id;
5.1.3 调试技巧
# 开启ProxySQL调试日志
mysql -uadmin -padmin -h127.0.0.1 -P6032 << 'EOF'
SET mysql-verbose_query_error=1;
SET mysql-log_mysql_warnings_enabled=1;
LOAD MYSQL VARIABLES TO RUNTIME;
EOF

# 实时查看请求
watch -n 1 'mysql -uadmin -padmin -h127.0.0.1 -P6032 -e "SELECT hostgroup, count_star, sum_time FROM stats_mysql_query_digest ORDER BY count_star DESC LIMIT 10;"'

# 抓包分析
tcpdump -i eth0 port 3306 -w /tmp/mysql.pcap

5.2 性能监控

5.2.1 关键指标
-- ProxySQL性能指标
SELECT 
    Variable_Name,
    Variable_Value
FROM stats_mysql_global
WHERE Variable_Name IN (
    'Active_Transactions',
    'Client_Connections_connected',
    'Client_Connections_created',
    'Questions',
    'Slow_queries',
    'Com_autocommit',
    'Com_autocommit_filtered'
);

-- 后端连接状态
SELECT 
    hostgroup,
    srv_host,
    srv_port,
    status,
    ConnUsed,
    ConnFree,
    ConnOK,
    ConnERR,
    MaxConnUsed,
    Queries
FROM stats_mysql_connection_pool
ORDER BY hostgroup, srv_host;

-- 延迟统计
SELECT 
    hostgroup_id,
    hostname,
    port,
    Latency_us
FROM stats_mysql_connection_pool
ORDER BY hostgroup_id, hostname;
5.2.2 监控指标表
指标类别 指标名称 含义 告警阈值
连接 Client_Connections_connected 当前客户端连接数 > max_connections * 80%
连接 ConnFree 空闲后端连接数 < 10
性能 Questions 总查询数 根据基线设定
性能 Slow_queries 慢查询数 > 0 且持续增长
延迟 Latency_us 后端延迟(微秒) > 100000
复制 replication_lag 复制延迟(秒) > 10
错误 ConnERR 连接错误数 > 0
路由 Query Rules hits 规则匹配次数 监控分布是否合理
5.2.3 告警配置
# Prometheus告警规则
groups:
- name: proxysql-alerts
  rules:
  - alert: ProxySQLHighConnections
    expr: proxysql_client_connections_connected / proxysql_mysql_max_connections > 0.8
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "ProxySQL连接数过高"
      description: "{{ $labels.instance }} 连接使用率达到 {{ $value | humanizePercentage }}"

  - alert: ProxySQLBackendDown
    expr: proxysql_backend_status != 1
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "ProxySQL后端节点异常"
      description: "{{ $labels.instance }} 后端 {{ $labels.endpoint }} 状态异常"

  - alert: MySQLReplicationLag
    expr: proxysql_mysql_replication_lag > 30
    for: 2m
    labels:
      severity: warning
    annotations:
      summary: "MySQL复制延迟过高"
      description: "{{ $labels.instance }} 复制延迟达到 {{ $value }}秒"

  - alert: ProxySQLSlowQueries
    expr: rate(proxysql_slow_queries[5m]) > 1
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "ProxySQL慢查询增加"
      description: "{{ $labels.instance }} 慢查询率为 {{ $value }}/秒"
5.2.4 Grafana监控面板
{
  "dashboard": {
    "title": "MySQL读写分离监控",
    "panels": [
      {
        "title": "读写请求分布",
        "type": "piechart",
        "targets": [
          {
            "expr": "sum by (hostgroup) (rate(proxysql_queries_total[5m]))",
            "legendFormat": "Hostgroup {{ hostgroup }}"
          }
        ]
      },
      {
        "title": "QPS趋势",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(proxysql_questions[1m])",
            "legendFormat": "Total QPS"
          }
        ]
      },
      {
        "title": "后端连接状态",
        "type": "table",
        "targets": [
          {
            "expr": "proxysql_connection_pool_conn_used",
            "legendFormat": "{{ hostgroup }}-{{ endpoint }}"
          }
        ]
      },
      {
        "title": "复制延迟",
        "type": "graph",
        "targets": [
          {
            "expr": "mysql_slave_status_seconds_behind_master",
            "legendFormat": "{{ instance }}"
          }
        ]
      }
    ]
  }
}

5.3 备份与恢复

5.3.1 ProxySQL配置备份
#!/bin/bash
# ProxySQL配置备份脚本

BACKUP_DIR="/data/backup/proxysql"
DATE=$(date +%Y%m%d_%H%M%S)
ADMIN_USER="admin"
ADMIN_PASS="admin"

mkdir -p $BACKUP_DIR

# 导出配置到磁盘
mysql -u$ADMIN_USER -p$ADMIN_PASS -h127.0.0.1 -P6032 << 'EOF'
SAVE MYSQL USERS TO DISK;
SAVE MYSQL SERVERS TO DISK;
SAVE MYSQL QUERY RULES TO DISK;
SAVE MYSQL VARIABLES TO DISK;
SAVE ADMIN VARIABLES TO DISK;
SAVE PROXYSQL SERVERS TO DISK;
EOF

# 备份SQLite数据库
cp /var/lib/proxysql/proxysql.db $BACKUP_DIR/proxysql_$DATE.db

# 导出为SQL格式
sqlite3 /var/lib/proxysql/proxysql.db ".dump" > $BACKUP_DIR/proxysql_$DATE.sql

# 压缩备份
gzip $BACKUP_DIR/proxysql_$DATE.sql

# 清理7天前的备份
find $BACKUP_DIR -name "proxysql_*.gz" -mtime +7 -delete
find $BACKUP_DIR -name "proxysql_*.db" -mtime +7 -delete

echo "ProxySQL配置备份完成: $BACKUP_DIR/proxysql_$DATE"
5.3.2 恢复流程
#!/bin/bash
# ProxySQL配置恢复脚本

BACKUP_FILE=$1

if [ -z "$BACKUP_FILE" ]; then
    echo "用法: $0 <备份文件路径>"
    exit 1
fi

# 停止ProxySQL
systemctl stop proxysql

# 如果是.db文件直接替换
if [[ "$BACKUP_FILE" == *.db ]]; then
    cp $BACKUP_FILE /var/lib/proxysql/proxysql.db
    chown proxysql:proxysql /var/lib/proxysql/proxysql.db
fi

# 如果是.sql.gz文件
if [[ "$BACKUP_FILE" == *.sql.gz ]]; then
    # 清空现有数据
    rm -f /var/lib/proxysql/proxysql.db

    # 解压并导入
    gunzip -c $BACKUP_FILE | sqlite3 /var/lib/proxysql/proxysql.db
    chown proxysql:proxysql /var/lib/proxysql/proxysql.db
fi

# 启动ProxySQL
systemctl start proxysql

# 验证
sleep 3
mysql -uadmin -padmin -h127.0.0.1 -P6032 -e "SELECT * FROM mysql_servers;"

echo "ProxySQL配置恢复完成"

六、总结

6.1 技术要点回顾

成功实施MySQL读写分离需要掌握以下关键点:

1. 架构设计

  • 基于MySQL主从复制构建读写分离基础
  • 使用ProxySQL等中间件实现透明读写分离
  • 合理规划主从节点数量和配置

2. 路由策略

  • SELECT语句路由到从库
  • 写操作(INSERT/UPDATE/DELETE)路由到主库
  • 事务中的查询统一路由到主库

3. 一致性保障

  • 使用GTID模式简化复制管理
  • 配置半同步复制降低数据丢失风险
  • 针对写后读场景强制从主库读取

4. 高可用设计

  • ProxySQL双节点 + Keepalived实现代理层高可用
  • 配置从库复制延迟阈值自动摘除
  • 自动故障检测和节点恢复

5. 性能优化

  • 合理配置连接池
  • 使用ProxySQL查询缓存
  • 监控并优化读写比例

6.2 进阶学习方向

方向 内容 推荐资源
分库分表 水平拆分扩展写能力 ShardingSphere文档
MGR集群 MySQL Group Replication MySQL官方文档
自动化运维 Orchestrator自动故障切换 GitHub项目文档
分布式事务 XA/TCC/SAGA Seata框架
中间件原理 ProxySQL/MyCat源码 源码阅读

6.3 参考资料

附录

A. 命令速查表

命令 说明 示例
SHOW REPLICA STATUS 查看从库复制状态 SHOW REPLICA STATUS\G
START REPLICA 启动复制 START REPLICA;
STOP REPLICA 停止复制 STOP REPLICA;
CHANGE REPLICATION SOURCE TO 配置复制源 见配置章节
LOAD ... TO RUNTIME 加载ProxySQL配置 LOAD MYSQL SERVERS TO RUNTIME;
SAVE ... TO DISK 保存ProxySQL配置 SAVE MYSQL SERVERS TO DISK;

B. 配置参数详解

参数 所属组件 默认值 说明
gtid_mode MySQL OFF GTID模式,推荐ON
replica_parallel_workers MySQL 0 并行复制线程数
mysql-monitor_replication_lag_interval ProxySQL 10000 复制延迟检测间隔(ms)
transaction_persistent ProxySQL 0 事务是否路由到同一hostgroup
max_replication_lag ProxySQL 0 最大复制延迟(秒),0不限制

C. 术语表

术语 英文 说明
读写分离 Read-Write Splitting 将读写请求分离到不同节点
复制延迟 Replication Lag 从库数据落后主库的时间
主库 Master/Primary 处理写请求的数据库节点
从库 Slave/Replica 复制主库数据,处理读请求的节点
GTID Global Transaction Identifier 全局事务标识符
半同步复制 Semi-Synchronous Replication 主库等待至少一个从库确认收到数据
查询路由 Query Routing 根据规则将查询分发到不同节点
连接池 Connection Pool 复用数据库连接,减少建连开销
故障转移 Failover 主节点故障时切换到备节点
hostgroup hostgroup ProxySQL中的服务器分组概念



上一篇:CPU短缺全面爆发:英特尔、AMD产能告急,AI与云服务需求激增
下一篇:PHP开发者转型Go后端:六个月内踩遍的并发、内存与工程化深坑
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-24 03:06 , Processed in 0.412754 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表