MySQL读写分离实战:基于ProxySQL实现性能翻倍
一、概述
1.1 背景介绍
数据库往往是整个系统架构中的性能瓶颈。在我负责的某金融交易系统中,随着业务量的增长,单台MySQL服务器逐渐不堪重负:高峰期CPU使用率超过90%,慢查询数量激增,用户投诉响应慢的工单越来越多。
经过分析发现,系统的读写比例约为8:2,大量的查询请求集中在主库上,而主库还要同时处理写入操作。这是典型的读多写少场景,最有效的优化方案就是读写分离。
读写分离的核心思想是:将数据库的写操作(INSERT、UPDATE、DELETE)路由到主库,将读操作(SELECT)路由到从库。通过这种方式,可以将读负载分散到多个从库,主库专注于处理写操作,从而大幅提升系统整体的吞吐能力。
实施读写分离后,我们的系统性能提升了约2.5倍,主库CPU使用率降到30%以下,慢查询减少了80%,真正实现了数据库性能翻倍的目标。
1.2 技术特点
读写分离架构具有以下核心特点:
负载均衡
- 读请求分散到多个从库,减轻主库压力
- 支持按权重分配流量,灵活调整负载比例
- 可根据从库性能差异进行差异化配置
线性扩展
- 通过增加从库数量横向扩展读能力
- 理论上读性能可以无限扩展
- 新增从库对业务无影响
高可用保障
- 从库故障自动摘除,不影响服务
- 主库故障可快速切换从库为主库
- 支持多种故障检测和切换机制
数据一致性
- 基于MySQL主从复制实现数据同步
- 支持半同步复制降低数据丢失风险
- 可配置强制读主库保证数据一致性
1.3 适用场景
| 场景类型 |
读写比例 |
是否适合 |
说明 |
| 内容展示类 |
读>95% |
非常适合 |
新闻、博客、电商商品详情 |
| 报表分析类 |
读>90% |
非常适合 |
数据分析、报表查询 |
| 社交互动类 |
读80% |
适合 |
社交动态、评论列表 |
| 交易系统类 |
读60% |
较适合 |
订单查询、账户余额 |
| 实时竞价类 |
读50% |
需评估 |
需要考虑延迟问题 |
| 强一致性场景 |
- |
不适合 |
金融核心交易 |
不适用场景:
- 对数据一致性要求极高的场景(如银行转账的余额读取)
- 写多读少的场景(如日志系统)
- 读写请求高度耦合的事务操作
1.4 环境要求
| 组件 |
版本要求 |
说明 |
| MySQL |
8.0.35+ / 8.4 LTS |
本文基于8.0.35版本 |
| 操作系统 |
Rocky Linux 9 / Ubuntu 24.04 |
推荐Rocky Linux 9 |
| ProxySQL |
2.6.0+ |
本文使用的中间件方案 |
| MyCat |
2.0+ |
备选中间件方案 |
| 内存 |
8GB+ |
中间件节点4GB+,数据库节点8GB+ |
| 磁盘 |
SSD |
数据库节点必须使用SSD |
服务器规划示例:
| 角色 |
IP地址 |
配置 |
说明 |
| Master |
192.168.1.11 |
8C16G |
MySQL主库 |
| Slave1 |
192.168.1.12 |
8C16G |
MySQL从库1 |
| Slave2 |
192.168.1.13 |
8C16G |
MySQL从库2 |
| Proxy1 |
192.168.1.21 |
4C8G |
ProxySQL主 |
| Proxy2 |
192.168.1.22 |
4C8G |
ProxySQL备 |
二、详细步骤
2.1 准备工作
2.1.1 MySQL主从复制搭建
读写分离的基础是MySQL主从复制,先搭建一主两从的复制架构:
#!/bin/bash
# MySQL 8.0 安装脚本(所有节点执行)
# 安装MySQL 8.0
dnf install -y mysql-server mysql
# 启动MySQL
systemctl start mysqld
systemctl enable mysqld
# 初始化root密码
mysql -e "ALTER USER 'root'@'localhost' IDENTIFIED BY 'RootP@ss2024!';"
主库配置(192.168.1.11):
# 编辑MySQL配置文件
cat > /etc/my.cnf.d/master.cnf << 'EOF'
[mysqld]
# 基础配置
server-id = 1
port = 3306
datadir = /var/lib/mysql
socket = /var/lib/mysql/mysql.sock
log-error = /var/log/mysql/mysqld.log
pid-file = /var/run/mysqld/mysqld.pid
# 字符集
character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci
# 二进制日志(主从复制必须)
log-bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
max_binlog_size = 1G
binlog_expire_logs_seconds = 604800
sync_binlog = 1
# GTID配置(推荐使用GTID模式)
gtid_mode = ON
enforce_gtid_consistency = ON
binlog_gtid_simple_recovery = ON
# 性能配置
innodb_buffer_pool_size = 8G
innodb_log_file_size = 1G
innodb_flush_log_at_trx_commit = 1
innodb_flush_method = O_DIRECT
# 连接配置
max_connections = 2000
max_connect_errors = 100000
# 半同步复制(可选但推荐)
plugin-load-add = semisync_source.so
rpl_semi_sync_source_enabled = 1
rpl_semi_sync_source_timeout = 1000
EOF
# 重启MySQL
systemctl restart mysqld
从库配置(192.168.1.12和192.168.1.13):
# 从库1配置 (192.168.1.12)
cat > /etc/my.cnf.d/slave.cnf << 'EOF'
[mysqld]
# 基础配置
server-id = 2
port = 3306
datadir = /var/lib/mysql
socket = /var/lib/mysql/mysql.sock
log-error = /var/log/mysql/mysqld.log
pid-file = /var/run/mysqld/mysqld.pid
# 字符集
character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci
# 二进制日志(从库也开启,方便级联复制或故障切换)
log-bin = mysql-bin
binlog_format = ROW
relay_log = relay-bin
log_replica_updates = ON
# GTID配置
gtid_mode = ON
enforce_gtid_consistency = ON
# 从库只读
read_only = ON
super_read_only = ON
# 性能配置
innodb_buffer_pool_size = 8G
innodb_log_file_size = 1G
# 并行复制(提高复制速度)
replica_parallel_type = LOGICAL_CLOCK
replica_parallel_workers = 8
replica_preserve_commit_order = ON
# 半同步复制
plugin-load-add = semisync_replica.so
rpl_semi_sync_replica_enabled = 1
EOF
# 从库2配置 (192.168.1.13) - 修改server-id为3
# 其他配置相同
2.1.2 配置主从复制
在主库创建复制用户:
-- 登录主库
mysql -uroot -p'RootP@ss2024!'
-- 创建复制用户
CREATE USER 'repl'@'192.168.1.%' IDENTIFIED BY 'ReplP@ss2024!';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'repl'@'192.168.1.%';
FLUSH PRIVILEGES;
-- 查看主库状态
SHOW MASTER STATUS\G
-- 记录File和Position,或者使用GTID模式则不需要
在从库配置复制:
-- 登录从库
mysql -uroot -p'RootP@ss2024!'
-- 配置复制源(GTID模式)
CHANGE REPLICATION SOURCE TO
SOURCE_HOST='192.168.1.11',
SOURCE_PORT=3306,
SOURCE_USER='repl',
SOURCE_PASSWORD='ReplP@ss2024!',
SOURCE_AUTO_POSITION=1,
GET_SOURCE_PUBLIC_KEY=1;
-- 启动复制
START REPLICA;
-- 查看复制状态
SHOW REPLICA STATUS\G
-- 关键检查项:
-- Replica_IO_Running: Yes
-- Replica_SQL_Running: Yes
-- Seconds_Behind_Source: 0
2.1.3 验证主从复制
-- 在主库创建测试数据
CREATE DATABASE test_rw;
USE test_rw;
CREATE TABLE test (id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(100));
INSERT INTO test (name) VALUES ('主库写入测试');
-- 在从库查询验证
SELECT * FROM test_rw.test;
-- 应该能看到"主库写入测试"的数据
2.2 核心配置
本文使用ProxySQL作为读写分离中间件,它是目前最主流的MySQL代理工具,具有高性能、灵活配置、丰富监控等优点。
2.2.1 安装ProxySQL
# 在Proxy节点安装ProxySQL (192.168.1.21, 192.168.1.22)
# 添加ProxySQL仓库
cat > /etc/yum.repos.d/proxysql.repo << 'EOF'
[proxysql_repo]
name=ProxySQL YUM repository
baseurl=https://repo.proxysql.com/ProxySQL/proxysql-2.6.x/centos/9
gpgcheck=1
gpgkey=https://repo.proxysql.com/ProxySQL/proxysql-2.6.x/repo_pub_key
EOF
# 安装
dnf install -y proxysql
# 启动
systemctl start proxysql
systemctl enable proxysql
# 验证
proxysql --version
# ProxySQL version 2.6.0-...
2.2.2 配置ProxySQL
ProxySQL使用管理端口6032进行配置,应用连接端口为6033:
# 连接ProxySQL管理接口
mysql -uadmin -padmin -h127.0.0.1 -P6032 --prompt 'ProxySQL> '
配置后端MySQL服务器:
-- 添加MySQL服务器
-- hostgroup_id: 10=写组(主库),20=读组(从库)
-- 添加主库到写组
INSERT INTO mysql_servers (hostgroup_id, hostname, port, weight, comment)
VALUES (10, '192.168.1.11', 3306, 1, 'Master');
-- 添加从库到读组
INSERT INTO mysql_servers (hostgroup_id, hostname, port, weight, comment)
VALUES (20, '192.168.1.12', 3306, 100, 'Slave1');
INSERT INTO mysql_servers (hostgroup_id, hostname, port, weight, comment)
VALUES (20, '192.168.1.13', 3306, 100, 'Slave2');
-- 查看配置
SELECT * FROM mysql_servers;
-- 加载配置到运行时
LOAD MYSQL SERVERS TO RUNTIME;
-- 保存配置到磁盘
SAVE MYSQL SERVERS TO DISK;
配置MySQL用户:
-- 添加ProxySQL监控用户(需要在MySQL中创建)
UPDATE global_variables SET variable_value='monitor' WHERE variable_name='mysql-monitor_username';
UPDATE global_variables SET variable_value='MonitorP@ss2024!' WHERE variable_name='mysql-monitor_password';
-- 添加应用访问用户
INSERT INTO mysql_users (username, password, default_hostgroup, transaction_persistent)
VALUES ('app_user', 'AppP@ss2024!', 10, 1);
-- transaction_persistent=1 表示事务期间所有查询都路由到同一组
LOAD MYSQL USERS TO RUNTIME;
SAVE MYSQL USERS TO DISK;
LOAD MYSQL VARIABLES TO RUNTIME;
SAVE MYSQL VARIABLES TO DISK;
在MySQL主库创建相应用户:
-- 登录MySQL主库
mysql -uroot -p'RootP@ss2024!'
-- 创建监控用户
CREATE USER 'monitor'@'192.168.1.%' IDENTIFIED BY 'MonitorP@ss2024!';
GRANT REPLICATION CLIENT, PROCESS ON *.* TO 'monitor'@'192.168.1.%';
-- 创建应用用户
CREATE USER 'app_user'@'192.168.1.%' IDENTIFIED BY 'AppP@ss2024!';
GRANT SELECT, INSERT, UPDATE, DELETE ON *.* TO 'app_user'@'192.168.1.%';
FLUSH PRIVILEGES;
2.2.3 配置读写分离规则
-- 连接ProxySQL管理接口
mysql -uadmin -padmin -h127.0.0.1 -P6032 --prompt 'ProxySQL> '
-- 配置读写分离规则组
INSERT INTO mysql_replication_hostgroups (writer_hostgroup, reader_hostgroup, comment)
VALUES (10, 20, 'RW Split Cluster');
LOAD MYSQL SERVERS TO RUNTIME;
SAVE MYSQL SERVERS TO DISK;
-- 配置查询路由规则
-- 规则1:SELECT语句路由到读组(从库)
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (100, 1, '^SELECT.*', 20, 1);
-- 规则2:SELECT ... FOR UPDATE 路由到写组(主库)
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (90, 1, '^SELECT.*FOR UPDATE', 10, 1);
-- 规则3:需要强一致性的查询路由到主库(通过SQL注释标记)
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (80, 1, '/\\*master\\*/', 10, 1);
-- 规则4:事务中的查询路由到主库
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (70, 1, '^BEGIN', 10, 1);
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (71, 1, '^START TRANSACTION', 10, 1);
-- 规则5:写操作路由到主库
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (200, 1, '^INSERT', 10, 1);
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (201, 1, '^UPDATE', 10, 1);
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (202, 1, '^DELETE', 10, 1);
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply)
VALUES (203, 1, '^REPLACE', 10, 1);
-- 查看规则
SELECT rule_id, active, match_pattern, destination_hostgroup FROM mysql_query_rules ORDER BY rule_id;
-- 加载规则
LOAD MYSQL QUERY RULES TO RUNTIME;
SAVE MYSQL QUERY RULES TO DISK;
2.2.4 高级配置
-- 配置连接池
UPDATE global_variables SET variable_value=2000 WHERE variable_name='mysql-max_connections';
UPDATE global_variables SET variable_value=100 WHERE variable_name='mysql-default_max_latency_ms';
-- 配置健康检查
UPDATE global_variables SET variable_value=2000 WHERE variable_name='mysql-monitor_connect_interval';
UPDATE global_variables SET variable_value=1000 WHERE variable_name='mysql-monitor_ping_interval';
UPDATE global_variables SET variable_value=2000 WHERE variable_name='mysql-monitor_read_only_interval';
UPDATE global_variables SET variable_value=1000 WHERE variable_name='mysql-monitor_replication_lag_interval';
UPDATE global_variables SET variable_value=30 WHERE variable_name='mysql-monitor_replication_lag_timeout';
-- 配置复制延迟阈值(超过此延迟的从库自动摘除)
UPDATE global_variables SET variable_value=10 WHERE variable_name='mysql-monitor_slave_lag_when_null';
-- 加载配置
LOAD MYSQL VARIABLES TO RUNTIME;
SAVE MYSQL VARIABLES TO DISK;
2.3 启动和验证
2.3.1 验证后端服务器状态
-- 连接ProxySQL管理接口
mysql -uadmin -padmin -h127.0.0.1 -P6032
-- 查看后端服务器状态
SELECT hostgroup_id, hostname, port, status, weight, max_replication_lag
FROM mysql_servers;
-- 查看服务器运行时状态
SELECT * FROM runtime_mysql_servers;
-- 查看连接池状态
SELECT hostgroup, srv_host, srv_port, status, ConnUsed, ConnFree, ConnOK, ConnERR
FROM stats_mysql_connection_pool;
-- 查看复制延迟监控
SELECT hostname, port, time_start_us, replication_lag
FROM mysql_server_replication_lag_log
ORDER BY time_start_us DESC LIMIT 10;
2.3.2 验证读写分离
# 通过ProxySQL连接MySQL(使用6033端口)
mysql -uapp_user -p'AppP@ss2024!' -h192.168.1.21 -P6033
# 测试写操作(应该路由到主库)
USE test_rw;
INSERT INTO test (name) VALUES ('通过ProxySQL写入');
# 测试读操作(应该路由到从库)
SELECT * FROM test;
查看路由统计:
-- 连接ProxySQL管理接口
mysql -uadmin -padmin -h127.0.0.1 -P6032
-- 查看查询路由统计
SELECT hostgroup, digest_text, count_star, sum_time
FROM stats_mysql_query_digest
ORDER BY count_star DESC LIMIT 20;
-- 查看按hostgroup的统计
SELECT hostgroup, SUM(count_star) as total_queries
FROM stats_mysql_query_digest
GROUP BY hostgroup;
-- 预期结果:
-- hostgroup 10 (写组): INSERT/UPDATE/DELETE等写操作
-- hostgroup 20 (读组): SELECT等读操作
2.3.3 测试故障转移
# 模拟从库1故障
systemctl stop mysqld # 在192.168.1.12执行
# 查看ProxySQL自动摘除
mysql -uadmin -padmin -h127.0.0.1 -P6032 -e "SELECT hostgroup_id, hostname, status FROM mysql_servers;"
# 应该看到192.168.1.12的status变为SHUNNED
# 恢复从库1
systemctl start mysqld # 在192.168.1.12执行
# 稍等片刻,ProxySQL会自动将其加回读组
三、示例代码和配置
3.1 完整配置示例
3.1.1 ProxySQL完整配置文件
# /etc/proxysql.cnf
# ProxySQL 完整配置文件
datadir="/var/lib/proxysql"
errorlog="/var/lib/proxysql/proxysql.log"
admin_variables=
{
admin_credentials="admin:admin;cluster_admin:ClusterP@ss!"
mysql_ifaces="0.0.0.0:6032"
cluster_username="cluster_admin"
cluster_password="ClusterP@ss!"
web_enabled=true
web_port=6080
}
mysql_variables=
{
threads=8
max_connections=4096
default_query_delay=0
default_query_timeout=36000000
have_compress=true
poll_timeout=2000
interfaces="0.0.0.0:6033"
default_schema="information_schema"
stacksize=1048576
server_version="8.0.35"
connect_timeout_server=3000
monitor_username="monitor"
monitor_password="MonitorP@ss2024!"
monitor_history=600000
monitor_connect_interval=60000
monitor_ping_interval=10000
monitor_read_only_interval=1500
monitor_read_only_timeout=500
ping_interval_server_msec=120000
ping_timeout_server=500
commands_stats=true
sessions_sort=true
connect_retries_on_failure=10
monitor_replication_lag_interval=2000
monitor_replication_lag_timeout=1000
}
mysql_replication_hostgroups =
(
{
writer_hostgroup=10
reader_hostgroup=20
comment="RW Split Production Cluster"
}
)
mysql_servers =
(
{
address="192.168.1.11"
port=3306
hostgroup=10
weight=1
max_connections=500
comment="Master"
},
{
address="192.168.1.12"
port=3306
hostgroup=20
weight=100
max_connections=500
max_replication_lag=30
comment="Slave1"
},
{
address="192.168.1.13"
port=3306
hostgroup=20
weight=100
max_connections=500
max_replication_lag=30
comment="Slave2"
}
)
mysql_users =
(
{
username="app_user"
password="AppP@ss2024!"
default_hostgroup=10
transaction_persistent=1
active=1
},
{
username="readonly_user"
password="ReadOnlyP@ss2024!"
default_hostgroup=20
transaction_persistent=0
active=1
}
)
mysql_query_rules =
(
{
rule_id=70
active=1
match_pattern="^BEGIN"
destination_hostgroup=10
apply=1
},
{
rule_id=71
active=1
match_pattern="^START TRANSACTION"
destination_hostgroup=10
apply=1
},
{
rule_id=80
active=1
match_pattern="/\\*master\\*/"
destination_hostgroup=10
apply=1
},
{
rule_id=90
active=1
match_pattern="^SELECT.*FOR UPDATE"
destination_hostgroup=10
apply=1
},
{
rule_id=100
active=1
match_pattern="^SELECT"
destination_hostgroup=20
apply=1
},
{
rule_id=200
active=1
match_pattern="^INSERT"
destination_hostgroup=10
apply=1
},
{
rule_id=201
active=1
match_pattern="^UPDATE"
destination_hostgroup=10
apply=1
},
{
rule_id=202
active=1
match_pattern="^DELETE"
destination_hostgroup=10
apply=1
}
)
scheduler =
(
)
mysql_galera_hostgroups =
(
)
3.1.2 应用层读写分离(无中间件方案)
如果不想使用中间件,也可以在应用层实现读写分离。以下是各语言的实现示例:
Spring Boot配置:
# application.yml
spring:
datasource:
master:
jdbc-url: jdbc:mysql://192.168.1.11:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai
username: app_user
password: AppP@ss2024!
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
pool-name: master-pool
maximum-pool-size: 50
minimum-idle: 10
connection-timeout: 5000
slaves:
- jdbc-url: jdbc:mysql://192.168.1.12:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai
username: app_user
password: AppP@ss2024!
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
pool-name: slave1-pool
maximum-pool-size: 100
minimum-idle: 20
- jdbc-url: jdbc:mysql://192.168.1.13:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai
username: app_user
password: AppP@ss2024!
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
pool-name: slave2-pool
maximum-pool-size: 100
minimum-idle: 20
动态数据源实现:
// DataSourceType.java - 数据源类型枚举
package com.example.datasource;
public enum DataSourceType {
MASTER,
SLAVE
}
// DynamicDataSourceContextHolder.java - 线程本地变量持有当前数据源
package com.example.datasource;
public class DynamicDataSourceContextHolder {
private static final ThreadLocal<DataSourceType> contextHolder = new ThreadLocal<>();
public static void setDataSourceType(DataSourceType dataSourceType) {
contextHolder.set(dataSourceType);
}
public static DataSourceType getDataSourceType() {
return contextHolder.get() == null ? DataSourceType.MASTER : contextHolder.get();
}
public static void clearDataSourceType() {
contextHolder.remove();
}
}
// DynamicDataSource.java - 动态数据源
package com.example.datasource;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DynamicDataSourceContextHolder.getDataSourceType();
}
}
// DataSourceConfig.java - 数据源配置类
package com.example.config;
import com.example.datasource.DataSourceType;
import com.example.datasource.DynamicDataSource;
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties("spring.datasource.master")
public DataSource masterDataSource() {
return new HikariDataSource();
}
@Bean
@ConfigurationProperties("spring.datasource.slaves[0]")
public DataSource slave1DataSource() {
return new HikariDataSource();
}
@Bean
@ConfigurationProperties("spring.datasource.slaves[1]")
public DataSource slave2DataSource() {
return new HikariDataSource();
}
@Bean
@Primary
public DataSource dynamicDataSource() {
DynamicDataSource dynamicDataSource = new DynamicDataSource();
// 设置默认数据源为主库
dynamicDataSource.setDefaultTargetDataSource(masterDataSource());
// 配置多数据源
Map<Object, Object> dataSourceMap = new HashMap<>();
dataSourceMap.put(DataSourceType.MASTER, masterDataSource());
// 从库使用负载均衡数据源
dataSourceMap.put(DataSourceType.SLAVE, createSlaveLoadBalanceDataSource());
dynamicDataSource.setTargetDataSources(dataSourceMap);
return dynamicDataSource;
}
// 创建从库负载均衡数据源
private DataSource createSlaveLoadBalanceDataSource() {
// 简单的轮询实现,生产环境可使用更复杂的策略
return new LoadBalanceDataSource(slave1DataSource(), slave2DataSource());
}
}
// LoadBalanceDataSource.java - 负载均衡数据源
package com.example.datasource;
import javax.sql.DataSource;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
public class LoadBalanceDataSource implements DataSource {
private final DataSource[] dataSources;
private final AtomicInteger counter = new AtomicInteger(0);
public LoadBalanceDataSource(DataSource... dataSources) {
this.dataSources = dataSources;
}
@Override
public Connection getConnection() throws SQLException {
// 轮询选择数据源
int index = Math.abs(counter.getAndIncrement() % dataSources.length);
return dataSources[index].getConnection();
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
int index = Math.abs(counter.getAndIncrement() % dataSources.length);
return dataSources[index].getConnection(username, password);
}
// 其他方法委托给第一个数据源
@Override
public PrintWriter getLogWriter() throws SQLException {
return dataSources[0].getLogWriter();
}
@Override
public void setLogWriter(PrintWriter out) throws SQLException {
dataSources[0].setLogWriter(out);
}
@Override
public void setLoginTimeout(int seconds) throws SQLException {
dataSources[0].setLoginTimeout(seconds);
}
@Override
public int getLoginTimeout() throws SQLException {
return dataSources[0].getLoginTimeout();
}
@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return dataSources[0].getParentLogger();
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return dataSources[0].unwrap(iface);
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return dataSources[0].isWrapperFor(iface);
}
}
// ReadOnly.java - 只读注解
package com.example.annotation;
import java.lang.annotation.*;
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ReadOnly {
}
// DataSourceAspect.java - 数据源切换切面
package com.example.aspect;
import com.example.annotation.ReadOnly;
import com.example.datasource.DataSourceType;
import com.example.datasource.DynamicDataSourceContextHolder;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
@Aspect
@Order(-1) // 确保在事务切面之前执行
@Component
public class DataSourceAspect {
@Around("@annotation(com.example.annotation.ReadOnly) || @within(com.example.annotation.ReadOnly)")
public Object around(ProceedingJoinPoint point) throws Throwable {
MethodSignature signature = (MethodSignature) point.getSignature();
// 检查方法或类上是否有@ReadOnly注解
ReadOnly readOnly = signature.getMethod().getAnnotation(ReadOnly.class);
if (readOnly == null) {
readOnly = point.getTarget().getClass().getAnnotation(ReadOnly.class);
}
if (readOnly != null) {
DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.SLAVE);
}
try {
return point.proceed();
} finally {
DynamicDataSourceContextHolder.clearDataSourceType();
}
}
}
// UserService.java - 使用示例
package com.example.service;
import com.example.annotation.ReadOnly;
import com.example.entity.User;
import com.example.mapper.UserMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
@Service
public class UserService {
@Autowired
private UserMapper userMapper;
// 读操作使用从库
@ReadOnly
public User findById(Long id) {
return userMapper.findById(id);
}
// 列表查询使用从库
@ReadOnly
public List<User> findAll() {
return userMapper.findAll();
}
// 写操作使用主库
@Transactional
public void create(User user) {
userMapper.insert(user);
}
// 写后读场景:强制使用主库(不加@ReadOnly)
@Transactional
public User createAndGet(User user) {
userMapper.insert(user);
// 写后立即读取,必须从主库读,避免复制延迟问题
return userMapper.findById(user.getId());
}
}
3.2 实际应用案例
3.2.1 电商场景读写分离
// OrderService.java - 订单服务读写分离示例
package com.ecommerce.service;
import com.example.annotation.ReadOnly;
import com.example.datasource.DataSourceType;
import com.example.datasource.DynamicDataSourceContextHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrderService {
/**
* 创建订单 - 写操作,使用主库
*/
@Transactional
public Order createOrder(OrderRequest request) {
// 扣减库存
inventoryMapper.decreaseStock(request.getProductId(), request.getQuantity());
// 创建订单
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.CREATED);
orderMapper.insert(order);
// 写入订单项
orderItemMapper.batchInsert(order.getId(), request.getItems());
return order;
}
/**
* 查询订单详情 - 读操作,使用从库
*/
@ReadOnly
public OrderDetail getOrderDetail(Long orderId) {
Order order = orderMapper.findById(orderId);
List<OrderItem> items = orderItemMapper.findByOrderId(orderId);
Product product = productMapper.findById(order.getProductId());
return OrderDetail.builder()
.order(order)
.items(items)
.product(product)
.build();
}
/**
* 支付后查询订单 - 写后读场景,强制使用主库
* 避免复制延迟导致用户看不到最新状态
*/
@Transactional
public Order payAndGet(Long orderId, PaymentInfo payment) {
// 更新订单状态
orderMapper.updateStatus(orderId, OrderStatus.PAID);
// 记录支付信息
paymentMapper.insert(payment);
// 强制从主库读取(不使用@ReadOnly,使用主库)
return orderMapper.findById(orderId);
}
/**
* 订单列表查询 - 读操作,使用从库
* 带分页和筛选条件
*/
@ReadOnly
public PageResult<Order> listOrders(OrderQuery query) {
int total = orderMapper.count(query);
List<Order> orders = orderMapper.findByQuery(query);
return PageResult.of(orders, total, query.getPageSize());
}
/**
* 实时库存查询 - 需要强一致性,使用主库
* 库存是敏感数据,必须从主库读取
*/
public Integer getRealTimeStock(Long productId) {
// 强制切换到主库
DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.MASTER);
try {
return inventoryMapper.getStock(productId);
} finally {
DynamicDataSourceContextHolder.clearDataSourceType();
}
}
}
3.2.2 处理复制延迟
// ReplicationLagAwareService.java - 复制延迟感知服务
package com.example.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class ReplicationLagAwareService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String WRITE_FLAG_PREFIX = "rw:write_flag:";
private static final int REPLICATION_LAG_SECONDS = 3; // 预估复制延迟
/**
* 写操作后标记,确保后续读取使用主库
*/
public void markWrite(String userId, String dataType) {
String key = WRITE_FLAG_PREFIX + userId + ":" + dataType;
redisTemplate.opsForValue().set(key, "1", REPLICATION_LAG_SECONDS, TimeUnit.SECONDS);
}
/**
* 检查是否需要从主库读取
*/
public boolean shouldReadFromMaster(String userId, String dataType) {
String key = WRITE_FLAG_PREFIX + userId + ":" + dataType;
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}
/**
* 带复制延迟感知的读取
*/
public <T> T readWithLagAwareness(String userId, String dataType,
java.util.function.Supplier<T> masterRead,
java.util.function.Supplier<T> slaveRead) {
if (shouldReadFromMaster(userId, dataType)) {
// 近期有写操作,从主库读取
return masterRead.get();
} else {
// 从从库读取
return slaveRead.get();
}
}
}
// 使用示例
@Service
public class UserProfileService {
@Autowired
private ReplicationLagAwareService lagAwareService;
@Autowired
private UserMapper userMapper;
// 更新用户资料
@Transactional
public void updateProfile(Long userId, UserProfile profile) {
userMapper.updateProfile(userId, profile);
// 标记写操作
lagAwareService.markWrite(userId.toString(), "profile");
}
// 获取用户资料
public UserProfile getProfile(Long userId) {
return lagAwareService.readWithLagAwareness(
userId.toString(),
"profile",
() -> {
// 主库读取
DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.MASTER);
try {
return userMapper.findProfileById(userId);
} finally {
DynamicDataSourceContextHolder.clearDataSourceType();
}
},
() -> {
// 从库读取
DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.SLAVE);
try {
return userMapper.findProfileById(userId);
} finally {
DynamicDataSourceContextHolder.clearDataSourceType();
}
}
);
}
}
3.2.3 Python读写分离实现
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL读写分离Python实现
依赖: pip install pymysql DBUtils
"""
import pymysql
from DBUtils.PooledDB import PooledDB
from functools import wraps
import threading
import random
from contextlib import contextmanager
class MySQLPool:
"""MySQL连接池封装"""
def __init__(self, host, port, user, password, database, max_connections=50):
self.pool = PooledDB(
creator=pymysql,
maxconnections=max_connections,
mincached=5,
maxcached=20,
blocking=True,
host=host,
port=port,
user=user,
password=password,
database=database,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor,
autocommit=True
)
def get_connection(self):
return self.pool.connection()
class ReadWriteSplitDatabase:
"""读写分离数据库客户端"""
# 线程本地存储,用于事务中强制使用主库
_local = threading.local()
def __init__(self, master_config, slave_configs):
"""
初始化读写分离数据库
Args:
master_config: 主库配置
slave_configs: 从库配置列表
"""
# 创建主库连接池
self.master_pool = MySQLPool(**master_config)
# 创建从库连接池列表
self.slave_pools = [MySQLPool(**config) for config in slave_configs]
self._slave_index = 0
def _get_slave_pool(self):
"""轮询获取从库连接池"""
if not self.slave_pools:
return self.master_pool
pool = self.slave_pools[self._slave_index % len(self.slave_pools)]
self._slave_index += 1
return pool
def _in_transaction(self):
"""检查是否在事务中"""
return getattr(self._local, 'in_transaction', False)
def _force_master(self):
"""检查是否强制使用主库"""
return getattr(self._local, 'force_master', False)
@contextmanager
def transaction(self):
"""事务上下文管理器"""
self._local.in_transaction = True
conn = self.master_pool.get_connection()
conn.autocommit(False)
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
self._local.in_transaction = False
conn.close()
@contextmanager
def force_master_context(self):
"""强制使用主库的上下文"""
self._local.force_master = True
try:
yield
finally:
self._local.force_master = False
def execute_write(self, sql, params=None):
"""执行写操作(使用主库)"""
conn = self.master_pool.get_connection()
try:
with conn.cursor() as cursor:
cursor.execute(sql, params)
return cursor.lastrowid
finally:
conn.close()
def execute_read(self, sql, params=None):
"""执行读操作(使用从库,事务中使用主库)"""
if self._in_transaction() or self._force_master():
pool = self.master_pool
else:
pool = self._get_slave_pool()
conn = pool.get_connection()
try:
with conn.cursor() as cursor:
cursor.execute(sql, params)
return cursor.fetchall()
finally:
conn.close()
def execute_read_one(self, sql, params=None):
"""读取单条记录"""
result = self.execute_read(sql, params)
return result[0] if result else None
def batch_insert(self, table, columns, values_list):
"""批量插入"""
if not values_list:
return 0
placeholders = ', '.join(['%s'] * len(columns))
columns_str = ', '.join(columns)
sql = f"INSERT INTO {table} ({columns_str}) VALUES ({placeholders})"
conn = self.master_pool.get_connection()
try:
with conn.cursor() as cursor:
cursor.executemany(sql, values_list)
return cursor.rowcount
finally:
conn.close()
def read_only(func):
"""只读装饰器(标识方法使用从库)"""
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper._read_only = True
return wrapper
def write_then_read_master(func):
"""写后读强制主库装饰器"""
@wraps(func)
def wrapper(self, *args, **kwargs):
with self.db.force_master_context():
return func(self, *args, **kwargs)
return wrapper
# 使用示例
class OrderRepository:
"""订单数据访问层"""
def __init__(self, db: ReadWriteSplitDatabase):
self.db = db
@read_only
def find_by_id(self, order_id):
"""查询订单(从库)"""
sql = "SELECT * FROM orders WHERE id = %s"
return self.db.execute_read_one(sql, (order_id,))
@read_only
def find_by_user(self, user_id, page=1, page_size=20):
"""查询用户订单列表(从库)"""
offset = (page - 1) * page_size
sql = """
SELECT * FROM orders
WHERE user_id = %s
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
return self.db.execute_read(sql, (user_id, page_size, offset))
def create_order(self, order_data):
"""创建订单(主库)"""
sql = """
INSERT INTO orders (user_id, product_id, amount, status, created_at)
VALUES (%(user_id)s, %(product_id)s, %(amount)s, %(status)s, NOW())
"""
return self.db.execute_write(sql, order_data)
@write_then_read_master
def create_and_get(self, order_data):
"""创建订单并返回(强制主库读取)"""
order_id = self.create_order(order_data)
return self.find_by_id(order_id)
def update_status(self, order_id, status):
"""更新订单状态(主库)"""
sql = "UPDATE orders SET status = %s WHERE id = %s"
return self.db.execute_write(sql, (status, order_id))
# 初始化
if __name__ == '__main__':
# 配置
master_config = {
'host': '192.168.1.11',
'port': 3306,
'user': 'app_user',
'password': 'AppP@ss2024!',
'database': 'ecommerce'
}
slave_configs = [
{
'host': '192.168.1.12',
'port': 3306,
'user': 'app_user',
'password': 'AppP@ss2024!',
'database': 'ecommerce'
},
{
'host': '192.168.1.13',
'port': 3306,
'user': 'app_user',
'password': 'AppP@ss2024!',
'database': 'ecommerce'
}
]
# 创建数据库客户端
db = ReadWriteSplitDatabase(master_config, slave_configs)
order_repo = OrderRepository(db)
# 使用事务
with db.transaction() as conn:
with conn.cursor() as cursor:
cursor.execute("INSERT INTO orders (user_id, amount) VALUES (%s, %s)", (1, 100))
cursor.execute("UPDATE inventory SET stock = stock - 1 WHERE product_id = %s", (1001,))
# 事务中的读取使用主库
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM orders WHERE user_id = %s", (1,))
orders = cursor.fetchall()
四、最佳实践和注意事项
4.1 最佳实践
4.1.1 性能优化
1. 读写比例分析
-- 在ProxySQL中分析读写比例
SELECT
CASE
WHEN digest_text LIKE 'SELECT%' THEN 'READ'
ELSE 'WRITE'
END as operation_type,
SUM(count_star) as total_queries,
ROUND(SUM(count_star) * 100.0 / (SELECT SUM(count_star) FROM stats_mysql_query_digest), 2) as percentage
FROM stats_mysql_query_digest
GROUP BY operation_type;
-- 按hostgroup统计
SELECT
hostgroup,
SUM(count_star) as total_queries,
SUM(sum_time) as total_time_us,
ROUND(SUM(sum_time) / SUM(count_star), 2) as avg_time_us
FROM stats_mysql_query_digest
GROUP BY hostgroup;
2. 连接池优化
-- ProxySQL连接池配置
UPDATE global_variables SET variable_value=2000 WHERE variable_name='mysql-max_connections';
UPDATE global_variables SET variable_value=1 WHERE variable_name='mysql-multiplexing';
UPDATE global_variables SET variable_value=10000 WHERE variable_name='mysql-connection_max_age_ms';
UPDATE global_variables SET variable_value=1000 WHERE variable_name='mysql-free_connections_pct';
LOAD MYSQL VARIABLES TO RUNTIME;
SAVE MYSQL VARIABLES TO DISK;
3. 查询缓存(ProxySQL Query Cache)
-- 启用查询缓存(适合读多写少场景)
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, cache_ttl, apply)
VALUES (500, 1, '^SELECT.*FROM products.*', 60000, 1);
-- 缓存商品列表60秒
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, cache_ttl, apply)
VALUES (501, 1, '^SELECT.*FROM categories.*', 300000, 1);
-- 缓存分类列表5分钟
LOAD MYSQL QUERY RULES TO RUNTIME;
SAVE MYSQL QUERY RULES TO DISK;
4.1.2 安全加固
1. 用户权限最小化
-- 主库上创建不同权限的用户
-- 只读用户(给从库查询使用)
CREATE USER 'readonly_app'@'192.168.1.%' IDENTIFIED BY 'ReadOnlyP@ss!';
GRANT SELECT ON app_db.* TO 'readonly_app'@'192.168.1.%';
-- 读写用户(给主库使用)
CREATE USER 'readwrite_app'@'192.168.1.%' IDENTIFIED BY 'ReadWriteP@ss!';
GRANT SELECT, INSERT, UPDATE, DELETE ON app_db.* TO 'readwrite_app'@'192.168.1.%';
-- 禁止危险操作
-- 不授予DROP、TRUNCATE、ALTER等权限
FLUSH PRIVILEGES;
2. ProxySQL访问控制
-- 修改ProxySQL管理密码
UPDATE global_variables SET variable_value='admin:NewAdminP@ss2024!' WHERE variable_name='admin-admin_credentials';
-- 限制管理接口访问IP
UPDATE global_variables SET variable_value='127.0.0.1:6032;192.168.1.0/24:6032' WHERE variable_name='admin-mysql_ifaces';
LOAD ADMIN VARIABLES TO RUNTIME;
SAVE ADMIN VARIABLES TO DISK;
3. SQL注入防护
-- 配置SQL黑名单
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, error_msg, apply)
VALUES (1, 1, '.*UNION.*SELECT.*', 'Query not allowed', 1);
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, error_msg, apply)
VALUES (2, 1, '.*INTO OUTFILE.*', 'Query not allowed', 1);
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, error_msg, apply)
VALUES (3, 1, '.*LOAD_FILE.*', 'Query not allowed', 1);
LOAD MYSQL QUERY RULES TO RUNTIME;
SAVE MYSQL QUERY RULES TO DISK;
4.1.3 高可用配置
1. ProxySQL双节点高可用
# 使用Keepalived实现ProxySQL高可用
# 在两个ProxySQL节点安装Keepalived
dnf install -y keepalived
# 主节点配置 /etc/keepalived/keepalived.conf
cat > /etc/keepalived/keepalived.conf << 'EOF'
global_defs {
router_id PROXYSQL_MASTER
}
vrrp_script check_proxysql {
script "/usr/local/bin/check_proxysql.sh"
interval 2
weight -20
}
vrrp_instance VI_PROXYSQL {
state MASTER
interface eth0
virtual_router_id 60
priority 100
advert_int 1
authentication {
auth_type PASS
auth_pass proxysql123
}
virtual_ipaddress {
192.168.1.20/24
}
track_script {
check_proxysql
}
}
EOF
# 健康检查脚本
cat > /usr/local/bin/check_proxysql.sh << 'EOF'
#!/bin/bash
# 检查ProxySQL服务是否正常
mysql -uadmin -padmin -h127.0.0.1 -P6032 -e "SELECT 1" > /dev/null 2>&1
if [ $? -eq 0 ]; then
exit 0
else
exit 1
fi
EOF
chmod +x /usr/local/bin/check_proxysql.sh
# 启动Keepalived
systemctl start keepalived
systemctl enable keepalived
2. ProxySQL集群配置
-- 配置ProxySQL集群同步
INSERT INTO proxysql_servers (hostname, port, weight, comment)
VALUES ('192.168.1.21', 6032, 1, 'ProxySQL1');
INSERT INTO proxysql_servers (hostname, port, weight, comment)
VALUES ('192.168.1.22', 6032, 1, 'ProxySQL2');
LOAD PROXYSQL SERVERS TO RUNTIME;
SAVE PROXYSQL SERVERS TO DISK;
-- 启用集群同步
UPDATE global_variables SET variable_value='cluster_admin' WHERE variable_name='admin-cluster_username';
UPDATE global_variables SET variable_value='ClusterP@ss!' WHERE variable_name='admin-cluster_password';
LOAD ADMIN VARIABLES TO RUNTIME;
SAVE ADMIN VARIABLES TO DISK;
4.2 注意事项
4.2.1 配置注意
1. 复制延迟处理
# 监控复制延迟的阈值配置
proxysql:
# 从库延迟超过此值自动摘除(秒)
max_replication_lag: 30
# 延迟检测间隔(毫秒)
monitor_replication_lag_interval: 2000
mysql:
# 并行复制配置(减少复制延迟)
replica_parallel_type: LOGICAL_CLOCK
replica_parallel_workers: 8
2. 事务一致性
-- ProxySQL事务处理配置
-- transaction_persistent=1 确保事务中所有查询路由到同一服务器
INSERT INTO mysql_users (username, password, default_hostgroup, transaction_persistent)
VALUES ('app_user', 'password', 10, 1);
-- 启用事务追踪
UPDATE global_variables SET variable_value=1 WHERE variable_name='mysql-forward_autocommit';
UPDATE global_variables SET variable_value=1 WHERE variable_name='mysql-autocommit_false_is_transaction';
4.2.2 常见错误
| 错误类型 |
错误信息 |
原因分析 |
解决方案 |
| 连接失败 |
Can't connect to MySQL server |
后端MySQL不可用 |
检查MySQL服务状态和网络 |
| 认证失败 |
Access denied for user |
密码错误或用户不存在 |
检查ProxySQL和MySQL用户配置 |
| 复制延迟 |
Replication lag too high |
从库复制延迟超过阈值 |
检查从库性能,优化复制配置 |
| 路由错误 |
Query routed to wrong hostgroup |
查询规则配置错误 |
检查query_rules配置顺序 |
| 连接耗尽 |
Too many connections |
连接池配置过小 |
增加max_connections |
| 读写不一致 |
Data inconsistency |
复制延迟导致 |
使用强制主库读取机制 |
| 事务失败 |
Transaction rolled back |
跨hostgroup事务 |
确保事务在同一hostgroup |
| 超时 |
Query timeout |
慢查询或网络问题 |
优化查询或增加超时时间 |
4.2.3 兼容性注意
1. 应用兼容性
-- 需要注意的SQL兼容性问题
-- 1. LAST_INSERT_ID() 在读写分离后的行为
-- 必须在写入后立即获取,且确保在同一连接
long id = jdbcTemplate.queryForObject("SELECT LAST_INSERT_ID()", Long.class);
-- 2. 自增ID的获取
-- 推荐使用KeyHolder
KeyHolder keyHolder = new GeneratedKeyHolder();
jdbcTemplate.update(connection -> {
PreparedStatement ps = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
ps.setString(1, name);
return ps;
}, keyHolder);
Long id = keyHolder.getKey().longValue();
-- 3. 临时表的使用
-- 临时表只存在于当前连接,读写分离后可能找不到
-- 解决:使用永久表或强制主库
2. 中间件兼容性
| 中间件 |
版本 |
MySQL 8.0支持 |
特点 |
| ProxySQL |
2.6+ |
完整支持 |
功能丰富,性能好 |
| MyCat |
2.0+ |
完整支持 |
分库分表能力强 |
| MaxScale |
6.0+ |
完整支持 |
MariaDB官方,易用 |
| MySQL Router |
8.0+ |
完整支持 |
MySQL官方,简单 |
五、故障排查和监控
5.1 故障排查
5.1.1 日志分析
# ProxySQL日志
tail -f /var/lib/proxysql/proxysql.log
# 关键日志模式
grep -E "Error|Warning|SHUNNED|OFFLINE" /var/lib/proxysql/proxysql.log
# MySQL慢查询日志
tail -f /var/log/mysql/slow-query.log
# 主从复制错误
grep -E "Error|Warning" /var/log/mysql/mysqld.log
5.1.2 常见问题排查
问题1:从库复制中断
-- 在从库执行
SHOW REPLICA STATUS\G
-- 检查关键字段:
-- Replica_IO_Running: 应为Yes
-- Replica_SQL_Running: 应为Yes
-- Last_Error: 查看错误信息
-- 常见修复方法:
-- 1. 跳过错误(慎用)
STOP REPLICA;
SET GLOBAL SQL_SLAVE_SKIP_COUNTER = 1;
START REPLICA;
-- 2. 使用GTID跳过(推荐)
STOP REPLICA;
SET GTID_NEXT='错误的GTID';
BEGIN; COMMIT;
SET GTID_NEXT='AUTOMATIC';
START REPLICA;
-- 3. 重新全量同步
STOP REPLICA;
RESET REPLICA ALL;
-- 重新配置复制...
问题2:ProxySQL后端服务器状态异常
-- 连接ProxySQL管理接口
mysql -uadmin -padmin -h127.0.0.1 -P6032
-- 查看服务器状态
SELECT hostgroup_id, hostname, port, status, max_replication_lag
FROM mysql_servers;
-- 状态说明:
-- ONLINE: 正常
-- SHUNNED: 临时摘除(会自动恢复)
-- OFFLINE_SOFT: 软下线(不接受新连接)
-- OFFLINE_HARD: 硬下线(完全不可用)
-- 查看监控日志
SELECT * FROM mysql_server_connect_log ORDER BY time_start_us DESC LIMIT 10;
SELECT * FROM mysql_server_ping_log ORDER BY time_start_us DESC LIMIT 10;
-- 手动恢复节点
UPDATE mysql_servers SET status='ONLINE' WHERE hostname='192.168.1.12';
LOAD MYSQL SERVERS TO RUNTIME;
问题3:查询路由错误
-- 查看查询规则匹配情况
SELECT rule_id, match_pattern, hits, destination_hostgroup
FROM stats_mysql_query_rules
ORDER BY rule_id;
-- 测试查询路由(不执行)
-- 使用mysql_query_rules的digest分析
SELECT hostgroup, digest_text, count_star
FROM stats_mysql_query_digest
ORDER BY count_star DESC
LIMIT 20;
-- 检查规则优先级(rule_id越小优先级越高)
SELECT rule_id, active, match_pattern, destination_hostgroup
FROM mysql_query_rules
ORDER BY rule_id;
5.1.3 调试技巧
# 开启ProxySQL调试日志
mysql -uadmin -padmin -h127.0.0.1 -P6032 << 'EOF'
SET mysql-verbose_query_error=1;
SET mysql-log_mysql_warnings_enabled=1;
LOAD MYSQL VARIABLES TO RUNTIME;
EOF
# 实时查看请求
watch -n 1 'mysql -uadmin -padmin -h127.0.0.1 -P6032 -e "SELECT hostgroup, count_star, sum_time FROM stats_mysql_query_digest ORDER BY count_star DESC LIMIT 10;"'
# 抓包分析
tcpdump -i eth0 port 3306 -w /tmp/mysql.pcap
5.2 性能监控
5.2.1 关键指标
-- ProxySQL性能指标
SELECT
Variable_Name,
Variable_Value
FROM stats_mysql_global
WHERE Variable_Name IN (
'Active_Transactions',
'Client_Connections_connected',
'Client_Connections_created',
'Questions',
'Slow_queries',
'Com_autocommit',
'Com_autocommit_filtered'
);
-- 后端连接状态
SELECT
hostgroup,
srv_host,
srv_port,
status,
ConnUsed,
ConnFree,
ConnOK,
ConnERR,
MaxConnUsed,
Queries
FROM stats_mysql_connection_pool
ORDER BY hostgroup, srv_host;
-- 延迟统计
SELECT
hostgroup_id,
hostname,
port,
Latency_us
FROM stats_mysql_connection_pool
ORDER BY hostgroup_id, hostname;
5.2.2 监控指标表
| 指标类别 |
指标名称 |
含义 |
告警阈值 |
| 连接 |
Client_Connections_connected |
当前客户端连接数 |
> max_connections * 80% |
| 连接 |
ConnFree |
空闲后端连接数 |
< 10 |
| 性能 |
Questions |
总查询数 |
根据基线设定 |
| 性能 |
Slow_queries |
慢查询数 |
> 0 且持续增长 |
| 延迟 |
Latency_us |
后端延迟(微秒) |
> 100000 |
| 复制 |
replication_lag |
复制延迟(秒) |
> 10 |
| 错误 |
ConnERR |
连接错误数 |
> 0 |
| 路由 |
Query Rules hits |
规则匹配次数 |
监控分布是否合理 |
5.2.3 告警配置
# Prometheus告警规则
groups:
- name: proxysql-alerts
rules:
- alert: ProxySQLHighConnections
expr: proxysql_client_connections_connected / proxysql_mysql_max_connections > 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "ProxySQL连接数过高"
description: "{{ $labels.instance }} 连接使用率达到 {{ $value | humanizePercentage }}"
- alert: ProxySQLBackendDown
expr: proxysql_backend_status != 1
for: 1m
labels:
severity: critical
annotations:
summary: "ProxySQL后端节点异常"
description: "{{ $labels.instance }} 后端 {{ $labels.endpoint }} 状态异常"
- alert: MySQLReplicationLag
expr: proxysql_mysql_replication_lag > 30
for: 2m
labels:
severity: warning
annotations:
summary: "MySQL复制延迟过高"
description: "{{ $labels.instance }} 复制延迟达到 {{ $value }}秒"
- alert: ProxySQLSlowQueries
expr: rate(proxysql_slow_queries[5m]) > 1
for: 5m
labels:
severity: warning
annotations:
summary: "ProxySQL慢查询增加"
description: "{{ $labels.instance }} 慢查询率为 {{ $value }}/秒"
5.2.4 Grafana监控面板
{
"dashboard": {
"title": "MySQL读写分离监控",
"panels": [
{
"title": "读写请求分布",
"type": "piechart",
"targets": [
{
"expr": "sum by (hostgroup) (rate(proxysql_queries_total[5m]))",
"legendFormat": "Hostgroup {{ hostgroup }}"
}
]
},
{
"title": "QPS趋势",
"type": "graph",
"targets": [
{
"expr": "rate(proxysql_questions[1m])",
"legendFormat": "Total QPS"
}
]
},
{
"title": "后端连接状态",
"type": "table",
"targets": [
{
"expr": "proxysql_connection_pool_conn_used",
"legendFormat": "{{ hostgroup }}-{{ endpoint }}"
}
]
},
{
"title": "复制延迟",
"type": "graph",
"targets": [
{
"expr": "mysql_slave_status_seconds_behind_master",
"legendFormat": "{{ instance }}"
}
]
}
]
}
}
5.3 备份与恢复
5.3.1 ProxySQL配置备份
#!/bin/bash
# ProxySQL配置备份脚本
BACKUP_DIR="/data/backup/proxysql"
DATE=$(date +%Y%m%d_%H%M%S)
ADMIN_USER="admin"
ADMIN_PASS="admin"
mkdir -p $BACKUP_DIR
# 导出配置到磁盘
mysql -u$ADMIN_USER -p$ADMIN_PASS -h127.0.0.1 -P6032 << 'EOF'
SAVE MYSQL USERS TO DISK;
SAVE MYSQL SERVERS TO DISK;
SAVE MYSQL QUERY RULES TO DISK;
SAVE MYSQL VARIABLES TO DISK;
SAVE ADMIN VARIABLES TO DISK;
SAVE PROXYSQL SERVERS TO DISK;
EOF
# 备份SQLite数据库
cp /var/lib/proxysql/proxysql.db $BACKUP_DIR/proxysql_$DATE.db
# 导出为SQL格式
sqlite3 /var/lib/proxysql/proxysql.db ".dump" > $BACKUP_DIR/proxysql_$DATE.sql
# 压缩备份
gzip $BACKUP_DIR/proxysql_$DATE.sql
# 清理7天前的备份
find $BACKUP_DIR -name "proxysql_*.gz" -mtime +7 -delete
find $BACKUP_DIR -name "proxysql_*.db" -mtime +7 -delete
echo "ProxySQL配置备份完成: $BACKUP_DIR/proxysql_$DATE"
5.3.2 恢复流程
#!/bin/bash
# ProxySQL配置恢复脚本
BACKUP_FILE=$1
if [ -z "$BACKUP_FILE" ]; then
echo "用法: $0 <备份文件路径>"
exit 1
fi
# 停止ProxySQL
systemctl stop proxysql
# 如果是.db文件直接替换
if [[ "$BACKUP_FILE" == *.db ]]; then
cp $BACKUP_FILE /var/lib/proxysql/proxysql.db
chown proxysql:proxysql /var/lib/proxysql/proxysql.db
fi
# 如果是.sql.gz文件
if [[ "$BACKUP_FILE" == *.sql.gz ]]; then
# 清空现有数据
rm -f /var/lib/proxysql/proxysql.db
# 解压并导入
gunzip -c $BACKUP_FILE | sqlite3 /var/lib/proxysql/proxysql.db
chown proxysql:proxysql /var/lib/proxysql/proxysql.db
fi
# 启动ProxySQL
systemctl start proxysql
# 验证
sleep 3
mysql -uadmin -padmin -h127.0.0.1 -P6032 -e "SELECT * FROM mysql_servers;"
echo "ProxySQL配置恢复完成"
六、总结
6.1 技术要点回顾
成功实施MySQL读写分离需要掌握以下关键点:
1. 架构设计
- 基于MySQL主从复制构建读写分离基础
- 使用ProxySQL等中间件实现透明读写分离
- 合理规划主从节点数量和配置
2. 路由策略
- SELECT语句路由到从库
- 写操作(INSERT/UPDATE/DELETE)路由到主库
- 事务中的查询统一路由到主库
3. 一致性保障
- 使用GTID模式简化复制管理
- 配置半同步复制降低数据丢失风险
- 针对写后读场景强制从主库读取
4. 高可用设计
- ProxySQL双节点 + Keepalived实现代理层高可用
- 配置从库复制延迟阈值自动摘除
- 自动故障检测和节点恢复
5. 性能优化
- 合理配置连接池
- 使用ProxySQL查询缓存
- 监控并优化读写比例
6.2 进阶学习方向
| 方向 |
内容 |
推荐资源 |
| 分库分表 |
水平拆分扩展写能力 |
ShardingSphere文档 |
| MGR集群 |
MySQL Group Replication |
MySQL官方文档 |
| 自动化运维 |
Orchestrator自动故障切换 |
GitHub项目文档 |
| 分布式事务 |
XA/TCC/SAGA |
Seata框架 |
| 中间件原理 |
ProxySQL/MyCat源码 |
源码阅读 |
6.3 参考资料
附录
A. 命令速查表
| 命令 |
说明 |
示例 |
| SHOW REPLICA STATUS |
查看从库复制状态 |
SHOW REPLICA STATUS\G |
| START REPLICA |
启动复制 |
START REPLICA; |
| STOP REPLICA |
停止复制 |
STOP REPLICA; |
| CHANGE REPLICATION SOURCE TO |
配置复制源 |
见配置章节 |
| LOAD ... TO RUNTIME |
加载ProxySQL配置 |
LOAD MYSQL SERVERS TO RUNTIME; |
| SAVE ... TO DISK |
保存ProxySQL配置 |
SAVE MYSQL SERVERS TO DISK; |
B. 配置参数详解
| 参数 |
所属组件 |
默认值 |
说明 |
| gtid_mode |
MySQL |
OFF |
GTID模式,推荐ON |
| replica_parallel_workers |
MySQL |
0 |
并行复制线程数 |
| mysql-monitor_replication_lag_interval |
ProxySQL |
10000 |
复制延迟检测间隔(ms) |
| transaction_persistent |
ProxySQL |
0 |
事务是否路由到同一hostgroup |
| max_replication_lag |
ProxySQL |
0 |
最大复制延迟(秒),0不限制 |
C. 术语表
| 术语 |
英文 |
说明 |
| 读写分离 |
Read-Write Splitting |
将读写请求分离到不同节点 |
| 复制延迟 |
Replication Lag |
从库数据落后主库的时间 |
| 主库 |
Master/Primary |
处理写请求的数据库节点 |
| 从库 |
Slave/Replica |
复制主库数据,处理读请求的节点 |
| GTID |
Global Transaction Identifier |
全局事务标识符 |
| 半同步复制 |
Semi-Synchronous Replication |
主库等待至少一个从库确认收到数据 |
| 查询路由 |
Query Routing |
根据规则将查询分发到不同节点 |
| 连接池 |
Connection Pool |
复用数据库连接,减少建连开销 |
| 故障转移 |
Failover |
主节点故障时切换到备节点 |
| hostgroup |
hostgroup |
ProxySQL中的服务器分组概念 |