代理服务是客户端与目标服务器之间的中间服务器,负责转发请求、响应并提供附加能力(如缓存、过滤、隐藏 IP、负载均衡等)。按部署位置分为正向代理(代表客户端)与反向代理(代表服务器)两大类。
现实业务中,我们经常会遇到海量异构图片链接的“收归”管理问题。比如从各大电商平台抓取的商品素材,它们的原始链接域名各异、格式混乱。如果想统一对外提供,就需要搭建一个高性能的代理服务,用我们自己的域名封装这些原始链接。
针对这种需求,常见做法有这几种:
| 方案 |
原理 |
优点 |
缺点 |
| 本地代理服务 |
自己起一个服务把新URL请求代理到原 URL |
完全可控灵活 |
需要跑一个服务 |
| Nginx 反向代理 |
用 Nginx 配置重写规则 |
高性能稳定 |
需要配 Nginx |
| 在线图片代理 |
用现成的代理服务中转 |
零部署 |
受限于第三方 |
| 下载后本地托管 |
先下到本地,再起本地服务器 |
离线可用 |
图片要存本地 |
对于千万级映射 + 高性能要求,最佳方案是什么?我们推荐:Redis + 高性能代理服务。
为什么是 Redis?因为它在这方面拥有天然优势:
| 指标 |
说明 |
| 容量 |
支持亿级 key,内存存储 |
| 速度 |
毫秒级 O(1) 查询,单机 QPS 10万+ |
| 可靠 |
支持持久化(RDB+AOF) |
| 简单 |
简单 KV 查询,无需复杂索引 |
在具体的技术选型上,大概有这几条路可以走:
| 方案 |
性能 |
复杂度 |
推荐场景 |
| Go + Redis |
⭐⭐⭐⭐⭐ |
中 |
首选,高并发,部署简单 |
| Nginx + Lua + Redis |
⭐⭐⭐⭐⭐ |
高 |
已有 Nginx 基础设施 |
| Python + Redis |
⭐⭐⭐ |
低 |
快速原型,QPS < 1000 |
| Tornado/FastAPI + Redis |
⭐⭐⭐ |
中 |
熟悉 Python,QPS < 5000 |
可以看到,Go 语言凭借其轻量的 Goroutine 和出色的并发表现,成为自建代理服务的首选。接下来,我会分享三个版本的 Go 代理服务实现,一步步升级打怪,来看看如何优雅地应对千万级数据量的挑战。
版本一:基于内存的基础版
万丈高楼平地起。在引入外部中间件之前,我们先来看看一个纯粹基于内存的图片代理服务是怎么玩的。它虽简单,却五脏俱全,特别适合映射数据量较少、不要求持久化的场景。
package main
import (
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
)
var (
mappings = make(map[string]string)
mappingsMu sync.RWMutex
reqTotal int64
reqHit int64
reqMiss int64
statsMu sync.Mutex
)
func addMapping(filename, originalURL string) {
mappingsMu.Lock()
defer mappingsMu.Unlock()
mappings[filename] = originalURL
}
func getOriginalURL(filename string) (string, bool) {
mappingsMu.RLock()
defer mappingsMu.RUnlock()
u, ok := mappings[filename]
return u, ok
}
func getMappingCount() int {
mappingsMu.RLock()
defer mappingsMu.RUnlock()
return len(mappings)
}
func getAllMappings() map[string]string {
mappingsMu.RLock()
defer mappingsMu.RUnlock()
result := make(map[string]string, len(mappings))
for k, v := range mappings {
result[k] = v
}
return result
}
// 根据文件扩展名获取 Content-Type
func getContentType(filename string) string {
ext := strings.ToLower(filepath.Ext(filename))
switch ext {
case ".jpg", ".jpeg":
return "image/jpeg"
case ".png":
return "image/png"
case ".gif":
return "image/gif"
case ".webp":
return "image/webp"
case ".svg":
return "image/svg+xml"
case ".bmp":
return "image/bmp"
case ".ico":
return "image/x-icon"
case ".pdf":
return "application/pdf"
default:
return "application/octet-stream"
}
}
func proxyImage(w http.ResponseWriter, r *http.Request) {
start := time.Now()
filename := strings.TrimPrefix(r.URL.Path, "/img/")
if filename == "" {
http.Error(w, "缺少文件名", http.StatusBadRequest)
return
}
var err error
filename, err = url.QueryUnescape(filename)
if err != nil {
http.Error(w, "无效的文件名", http.StatusBadRequest)
return
}
statsMu.Lock()
reqTotal++
statsMu.Unlock()
originalURL, ok := getOriginalURL(filename)
if !ok {
statsMu.Lock()
reqMiss++
statsMu.Unlock()
log.Printf("[404] %s - 映射不存在", filename)
http.Error(w, "图片不存在", http.StatusNotFound)
return
}
statsMu.Lock()
reqHit++
statsMu.Unlock()
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Get(originalURL)
if err != nil {
log.Printf("[错误] 请求原图失败: %v", err)
http.Error(w, "无法获取图片", http.StatusBadGateway)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Printf("[错误] 原图返回状态码: %d", resp.StatusCode)
http.Error(w, "原图不可用", resp.StatusCode)
return
}
// 先读取全部内容
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Printf("[错误] 读取原图失败: %v", err)
http.Error(w, "读取图片失败", http.StatusBadGateway)
return
}
// 设置 Content-Type(优先用扩展名判断,其次用原图响应头)
contentType := getContentType(filename)
if ct := resp.Header.Get("Content-Type"); ct != "" && ct != "application/octet-stream" {
contentType = ct
}
w.Header().Set("Content-Type", contentType)
// 设置缓存和代理头
w.Header().Set("X-Proxied-By", "ImageProxy-Memory")
w.Header().Set("Cache-Control", "public, max-age=3600")
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(body)))
// 写入响应
w.Write(body)
log.Printf("[成功] %s -> %s (耗时: %v, 类型: %s, 大小: %d bytes)",
filename, originalURL, time.Since(start), contentType, len(body))
}
func addHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "只支持 POST", http.StatusMethodNotAllowed)
return
}
filename := r.URL.Query().Get("filename")
originalURL := r.URL.Query().Get("url")
if filename == "" || originalURL == "" {
http.Error(w, "缺少参数 filename 或 url", http.StatusBadRequest)
return
}
addMapping(filename, originalURL)
log.Printf("[添加] %s -> %s", filename, originalURL)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{"status":"ok","filename":"%s","total":%d}`, filename, getMappingCount())
}
func queryHandler(w http.ResponseWriter, r *http.Request) {
filename := r.URL.Query().Get("filename")
if filename == "" {
http.Error(w, "缺少参数 filename", http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "application/json")
u, ok := getOriginalURL(filename)
if !ok {
w.WriteHeader(http.StatusNotFound)
fmt.Fprintf(w, `{"status":"not_found","filename":"%s"}`, filename)
return
}
fmt.Fprintf(w, `{"status":"ok","filename":"%s","url":"%s"}`, filename, u)
}
func listHandler(w http.ResponseWriter, r *http.Request) {
all := getAllMappings()
w.Header().Set("Content-Type", "application/json")
var sb strings.Builder
sb.WriteString(fmt.Sprintf(`{"total":%d,"mappings":{`, len(all)))
first := true
for k, v := range all {
if !first {
sb.WriteString(",")
}
sb.WriteString(fmt.Sprintf(`"%s":"%s"`, k, v))
first = false
}
sb.WriteString("}}")
fmt.Fprint(w, sb.String())
}
func statsHandler(w http.ResponseWriter, r *http.Request) {
statsMu.Lock()
t, h, m := reqTotal, reqHit, reqMiss
statsMu.Unlock()
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{"total_mappings":%d,"total_requests":%d,"hit_count":%d,"miss_count":%d}`,
getMappingCount(), t, h, m)
}
func healthHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{"status":"healthy","total_mappings":%d}`, getMappingCount())
}
func loadDemoData() {
demos := map[string]string{
"demo1.jpg": "https://picsum.photos/id/1018/800/600.jpg",
"demo2.jpg": "https://picsum.photos/id/1015/800/600.jpg",
"demo3.jpg": "https://picsum.photos/id/1019/800/600.jpg",
}
for k, v := range demos {
addMapping(k, v)
}
log.Printf("已加载 %d 条演示数据", len(demos))
}
func main() {
log.Println("图片代理服务 (内存版) 启动中...")
loadDemoData()
http.HandleFunc("/health", healthHandler)
http.HandleFunc("/img/", proxyImage)
http.HandleFunc("/api/add", addHandler)
http.HandleFunc("/api/query", queryHandler)
http.HandleFunc("/api/list", listHandler)
http.HandleFunc("/api/stats", statsHandler)
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
addr := ":" + port
log.Printf("服务地址: http://0.0.0.0%s", addr)
log.Println("接口:")
log.Println(" GET /img/{filename} 代理图片")
log.Println(" POST /api/add?filename=&url= 添加映射")
log.Println(" GET /api/query?filename= 查询映射")
log.Println(" GET /api/list 所有映射")
log.Println(" GET /api/stats 统计信息")
log.Println(" GET /health 健康检查")
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
go func() {
if err := http.ListenAndServe(addr, nil); err != nil {
log.Fatalf("启动失败: %v", err)
}
}()
<-quit
log.Println("服务已关闭")
}
版本二:Redis + MySQL 化茧成蝶
当我们的系统规模膨胀到几百万甚至上千万张图片时,内存版服务就有点力不从心了。重启服务就等于数据清空,这谁受得了?此时,就需要一个更可靠的持久化方案。我们可以利用 高性能代理服务 的思路,引入 Redis 作为高速缓存,MySQL 作为持久化的数据源,实现一个生产级的图片代理。
这套方案在架构上多了两个关键点:
- 双向索引:在 Redis 中,我们不仅要从
filename 查 url(正向),还要支持从 url 反查 filename(反向),两种查询都是 O(1) 复杂度。
- 数据同步:主动从 MySQL 拉取数据到 Redis 进行预热,并支持增量更新。
请看代码实现:
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"os/signal"
"path"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/redis/go-redis/v9"
)
// ==================== 配置 ====================
type Config struct {
ServerPort string // 代理服务端口
RedisAddr string // Redis 地址
RedisPassword string // Redis 密码(无则留空)
RedisDB int // Redis DB
MySQLAddr string // MySQL 地址
MySQLUser string // MySQL 用户
MySQLPassword string // MySQL 密码
MySQLDatabase string // MySQL 数据库
MySQLTable string // MySQL 表名
MySQLIDField string // ID 字段名(用于生成新链接的文件名)
}
func getEnv(key, defaultVal string) string {
if val := os.Getenv(key); val != "" {
return val
}
return defaultVal
}
var cfg = Config{
ServerPort: "8080",
RedisAddr: "localhost:6379",
RedisPassword: getEnv("REDIS_PASSWORD", ""),
RedisDB: 7,
MySQLAddr: "",
MySQLUser: "root",
MySQLPassword: getEnv("MYSQL_PASSWORD", ""),
MySQLDatabase: "jserve",
MySQLTable: "",
MySQLIDField: "",
}
// ==================== 全局变量 ====================
var (
redisClient *redis.Client
mysqlDB *sql.DB
ctx = context.Background()
)
// 全局 HTTP Client(连接复用,提升性能)
var httpClient = &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
},
}
// ==================== Redis 操作 ====================
func initRedis() error {
redisClient = redis.NewClient(&redis.Options{
Addr: cfg.RedisAddr,
Password: cfg.RedisPassword,
DB: cfg.RedisDB,
})
// 测试连接
_, err := redisClient.Ping(ctx).Result()
if err != nil {
return fmt.Errorf("Redis 连接失败: %v", err)
}
log.Printf("Redis 连接成功: %s", cfg.RedisAddr)
return nil
}
// 添加单条映射到 Redis(双向索引)
func addMappingToRedis(filename, originalURL string) error {
pipe := redisClient.Pipeline()
// 正向索引:filename -> url
pipe.Set(ctx, "img:"+filename, originalURL, 0)
// 反向索引:url -> filename
pipe.Set(ctx, "rev:"+originalURL, filename, 0)
_, err := pipe.Exec(ctx)
return err
}
// 批量添加映射到 Redis(双向索引)
func batchAddMappingsToRedis(mappings map[string]string) error {
if len(mappings) == 0 {
return nil
}
pipe := redisClient.Pipeline()
for filename, originalURL := range mappings {
// 正向索引:filename -> url
pipe.Set(ctx, "img:"+filename, originalURL, 0)
// 反向索引:url -> filename
pipe.Set(ctx, "rev:"+originalURL, filename, 0)
}
_, err := pipe.Exec(ctx)
return err
}
// 根据 URL 查询 filename(O(1))
func getFilenameByURL(originalURL string) (string, error) {
key := "rev:" + originalURL
filename, err := redisClient.Get(ctx, key).Result()
if err == redis.Nil {
return "", fmt.Errorf("映射不存在: %s", originalURL)
}
if err != nil {
return "", err
}
return filename, nil
}
// 根据 filename 查询 URL(O(1))
func getURLByFilename(filename string) (string, error) {
key := "img:" + filename
url, err := redisClient.Get(ctx, key).Result()
if err == redis.Nil {
return "", fmt.Errorf("映射不存在: %s", filename)
}
if err != nil {
return "", err
}
return url, nil
}
// 获取 Redis 中的映射总数
func getRedisMappingCount() (int64, error) {
var cursor uint64
var count int64
for {
keys, nextCursor, err := redisClient.Scan(ctx, cursor, "img:*", 1000).Result()
if err != nil {
return count, err
}
count += int64(len(keys))
cursor = nextCursor
if cursor == 0 {
break
}
}
return count, nil
}
// ==================== MySQL 操作 ====================
func initMySQL() error {
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
cfg.MySQLUser, cfg.MySQLPassword, cfg.MySQLAddr, cfg.MySQLDatabase)
var err error
mysqlDB, err = sql.Open("mysql", dsn)
if err != nil {
return fmt.Errorf("MySQL 连接失败: %v", err)
}
// 配置连接池(增大以支持并发)
mysqlDB.SetMaxOpenConns(20)
mysqlDB.SetMaxIdleConns(10)
mysqlDB.SetConnMaxLifetime(time.Hour)
// 测试连接
if err = mysqlDB.Ping(); err != nil {
return fmt.Errorf("MySQL Ping 失败: %v", err)
}
log.Printf("MySQL 连接成功: %s/%s", cfg.MySQLAddr, cfg.MySQLDatabase)
return nil
}
// ==================== 同步配置 ====================
const (
pageSize = 2000 // 每批处理的 product_id 数量
redisBatch = 5000 // Redis Pipeline 批量大小
syncBatchSize = 100 // 每批查询的 product_id 数量(IN 查询限制)
syncLockKey = "lock:image-proxy:sync" // 分布式锁 key
)
var maxWorkers = runtime.NumCPU() - 1 // 并发数(留1核给主线程)
// 尝试获取同步锁
func tryAcquireSyncLock() bool {
ok, _ := redisClient.SetNX(ctx, syncLockKey, "1", 60*time.Minute).Result()
return ok
}
// 释放同步锁
func releaseSyncLock() {
redisClient.Del(ctx, syncLockKey)
}
// 全量同步:从 MySQL 导入所有数据到 Redis(并发版)
func fullSyncMySQLToRedis() error {
// 获取分布式锁
if !tryAcquireSyncLock() {
log.Println("同步任务进行中,跳过本次执行")
return nil
}
defer releaseSyncLock()
log.Println("开始全量同步 MySQL -> Redis(并发模式)...")
// 获取总待处理数
var totalCount int
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE sync_status = 0", cfg.MySQLTable)
if err := mysqlDB.QueryRow(countQuery).Scan(&totalCount); err != nil {
return fmt.Errorf("获取总数失败: %v", err)
}
log.Printf("待处理记录: %d 条,并发数: %d", totalCount, maxWorkers)
if totalCount == 0 {
log.Println("没有需要同步的数据")
return nil
}
// 1. 获取所有待处理的 product_id
rows, err := mysqlDB.Query(fmt.Sprintf(
"SELECT product_id FROM %s WHERE sync_status = 0", cfg.MySQLTable))
if err != nil {
return fmt.Errorf("获取 product_id 列表失败: %v", err)
}
defer rows.Close()
var productIDs []string
for rows.Next() {
var pid string
if err := rows.Scan(&pid); err != nil {
continue
}
productIDs = append(productIDs, pid)
}
total := len(productIDs)
// 2. 统计计数器
var (
processedCount atomic.Int64
successCount atomic.Int64
failCount atomic.Int64
)
// 3. 进度打印 goroutine
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
p := processedCount.Load()
s := successCount.Load()
f := failCount.Load()
percent := float64(p) / float64(total) * 100
log.Printf("进度: %d/%d (%.1f%%) - 成功: %d, 失败: %d", p, total, percent, s, f)
}
}()
// 4. 并发处理
var wg sync.WaitGroup
semaphore := make(chan struct{}, maxWorkers)
// 按 pageSize 分批
for i := 0; i < total; i += pageSize {
end := i + pageSize
if end > total {
end = total
}
batch := productIDs[i:end]
wg.Add(1)
semaphore <- struct{}{}
go func(ids []string) {
defer wg.Done()
defer func() { <-semaphore }()
processProductBatch(ids, &processedCount, &successCount, &failCount)
}(batch)
}
wg.Wait()
log.Printf("全量同步完成!总计: %d, 成功: %d, 失败: %d",
processedCount.Load(), successCount.Load(), failCount.Load())
return nil
}
// 处理一批 product_id
func processProductBatch(ids []string, processed, success, fail *atomic.Int64) {
// 分成更小的批次进行 IN 查询
for i := 0; i < len(ids); i += syncBatchSize {
end := i + syncBatchSize
if end > len(ids) {
end = len(ids)
}
batch := ids[i:end]
processSyncBatch(batch, processed, success, fail)
}
}
// 处理单个同步批次(IN 查询)
func processSyncBatch(ids []string, processed, success, fail *atomic.Int64) {
if len(ids) == 0 {
return
}
// 构造 IN 查询
placeholders := strings.Repeat("?,", len(ids)-1) + "?"
query := fmt.Sprintf(
"SELECT product_id, main_image_url, sales_link, product_video, "+
"description_images, main_images, aspect_ratio_images, pic_urls, sku_list "+
"FROM %s WHERE product_id IN (%s)", cfg.MySQLTable, placeholders)
// 将 []string 转为 []interface{}
args := make([]interface{}, len(ids))
for j, id := range ids {
args[j] = id
}
rows, err := mysqlDB.Query(query, args...)
if err != nil {
log.Printf("批量查询失败: %v", err)
fail.Add(int64(len(ids)))
return
}
defer rows.Close()
var batchMappings = make(map[string]string)
var syncedIDs []string
for rows.Next() {
var productID string
var mainImageURL, salesLink, productVideo sql.NullString
var descriptionImages, mainImages, aspectRatioImages, picUrls, skuList sql.NullString
if err := rows.Scan(&productID, &mainImageURL, &salesLink, &productVideo,
&descriptionImages, &mainImages, &aspectRatioImages, &picUrls, &skuList); err != nil {
log.Printf("扫描行失败: %v", err)
continue
}
urls := extractURLs(productID, mainImageURL, salesLink, productVideo,
descriptionImages, mainImages, aspectRatioImages, picUrls, skuList)
for filename, originalURL := range urls {
batchMappings[filename] = originalURL
}
syncedIDs = append(syncedIDs, productID)
}
// 批量写入 Redis
if len(batchMappings) > 0 {
if err := batchAddMappingsToRedis(batchMappings); err != nil {
log.Printf("Redis 批量写入失败: %v", err)
fail.Add(int64(len(syncedIDs)))
for _, pid := range syncedIDs {
updateSyncStatus(pid, 2)
}
} else {
success.Add(int64(len(syncedIDs)))
for _, pid := range syncedIDs {
updateSyncStatus(pid, 1)
}
}
}
processed.Add(int64(len(ids)))
}
// 更新同步状态
func updateSyncStatus(productID string, status int) {
query := fmt.Sprintf("UPDATE %s SET sync_status = ? WHERE product_id = ?", cfg.MySQLTable)
if _, err := mysqlDB.Exec(query, status, productID); err != nil {
log.Printf("更新状态失败 (product_id=%s, status=%d): %v", productID, status, err)
}
}
// 从 JSON 字段中提取所有 URL
// 从 URL 中提取扩展名(带点),如 .jpg .mp4 .png
func getExtFromURL(rawURL string) string {
ext := path.Ext(rawURL)
if ext == "" {
return ""
}
return ext
}
func extractURLs(productID string, mainImageURL, salesLink, productVideo sql.NullString, descriptionImages, mainImages, aspectRatioImages, picUrls, skuList sql.NullString) map[string]string {
result := make(map[string]string)
// 1. main_image_url (单字符串)
if mainImageURL.Valid && mainImageURL.String != "" {
ext := getExtFromURL(mainImageURL.String)
if ext == "" {
ext = ".jpg"
}
filename := fmt.Sprintf("%s_main_image_url%s", productID, ext)
result[filename] = mainImageURL.String
}
// 2. sales_link (单字符串)
if salesLink.Valid && salesLink.String != "" {
ext := getExtFromURL(salesLink.String)
if ext == "" {
ext = ".jpg"
}
filename := fmt.Sprintf("%s_sales_link%s", productID, ext)
result[filename] = salesLink.String
}
// 2.5 product_video (单字符串,保持原后缀)
if productVideo.Valid && productVideo.String != "" {
ext := getExtFromURL(productVideo.String)
if ext == "" {
ext = ".mp4"
}
filename := fmt.Sprintf("%s_product_video%s", productID, ext)
result[filename] = productVideo.String
}
// 3. description_images (字符串数组)
if descriptionImages.Valid && descriptionImages.String != "" {
var urls []string
if err := json.Unmarshal([]byte(descriptionImages.String), &urls); err == nil {
for i, u := range urls {
if u != "" {
ext := getExtFromURL(u)
if ext == "" {
ext = ".jpg"
}
filename := fmt.Sprintf("%s_description_images_%d%s", productID, i, ext)
result[filename] = u
}
}
}
}
// 4. main_images (字符串数组)
if mainImages.Valid && mainImages.String != "" {
var urls []string
if err := json.Unmarshal([]byte(mainImages.String), &urls); err == nil {
for i, u := range urls {
if u != "" {
ext := getExtFromURL(u)
if ext == "" {
ext = ".jpg"
}
filename := fmt.Sprintf("%s_main_images_%d%s", productID, i, ext)
result[filename] = u
}
}
}
}
// 5. aspect_ratio_images (字符串数组)
if aspectRatioImages.Valid && aspectRatioImages.String != "" {
var urls []string
if err := json.Unmarshal([]byte(aspectRatioImages.String), &urls); err == nil {
for i, u := range urls {
if u != "" {
ext := getExtFromURL(u)
if ext == "" {
ext = ".jpg"
}
filename := fmt.Sprintf("%s_aspect_ratio_images_%d%s", productID, i, ext)
result[filename] = u
}
}
}
}
// 6. pic_urls (字符串数组)
if picUrls.Valid && picUrls.String != "" {
var urls []string
if err := json.Unmarshal([]byte(picUrls.String), &urls); err == nil {
for i, u := range urls {
if u != "" {
ext := getExtFromURL(u)
if ext == "" {
ext = ".jpg"
}
filename := fmt.Sprintf("%s_pic_urls_%d%s", productID, i, ext)
result[filename] = u
}
}
}
}
// 7. sku_list (对象数组,提取 thumbUrl)
if skuList.Valid && skuList.String != "" {
var skus []struct {
ThumbUrl string `json:"thumbUrl"`
SkuID int64 `json:"skuId"`
}
if err := json.Unmarshal([]byte(skuList.String), &skus); err == nil {
for _, sku := range skus {
if sku.ThumbUrl != "" {
ext := getExtFromURL(sku.ThumbUrl)
if ext == "" {
ext = ".jpg"
}
filename := fmt.Sprintf("%s_sku_%d_thumbUrl%s", productID, sku.SkuID, ext)
result[filename] = sku.ThumbUrl
}
}
}
}
return result
}
// 增量同步:检查新增数据并同步到 Redis
func incrementalSync() error {
// 获取 Redis 中已有的最大 ID
var _ string
// 这里简化处理,实际可以用 Redis Sorted Set 或定期记录同步位置
// 为了演示,我们用时间戳方式
return nil // 增量同步需要根据业务逻辑实现
}
// ==================== 代理服务 ====================
// 代理图片请求(按 filename 查询,返回原图)
func proxyImage(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 提取文件名
filename := strings.TrimPrefix(r.URL.Path, "/img/")
filename = strings.TrimPrefix(filename, "/")
if filename == "" {
http.Error(w, "缺少文件名", http.StatusBadRequest)
return
}
// URL 解码
filename, err := url.QueryUnescape(filename)
if err != nil {
http.Error(w, "无效的文件名", http.StatusBadRequest)
return
}
// 根据 filename 查询 URL
originalURL, err := getURLByFilename(filename)
if err != nil {
log.Printf("[404] %s - %v", filename, err)
http.Error(w, "图片不存在", http.StatusNotFound)
return
}
// 请求原图(复用全局 httpClient)
resp, err := httpClient.Get(originalURL)
if err != nil {
log.Printf("[错误] 请求原图失败: %v", err)
http.Error(w, "无法获取图片", http.StatusBadGateway)
return
}
defer resp.Body.Close()
// 检查响应状态
if resp.StatusCode != http.StatusOK {
log.Printf("[错误] 原图返回状态码: %d", resp.StatusCode)
http.Error(w, "原图不可用", resp.StatusCode)
return
}
// 复制响应头
for key, values := range resp.Header {
for _, value := range values {
w.Header().Add(key, value)
}
}
// 设置必要的响应头
w.Header().Set("X-Original-URL", originalURL)
w.Header().Set("X-Proxied-By", "ImageProxy-Redis")
w.Header().Set("Cache-Control", "public, max-age=3600")
// 返回图片内容
_, err = io.Copy(w, resp.Body)
if err != nil {
log.Printf("[错误] 传输图片失败: %v", err)
return
}
log.Printf("[成功] %s -> %s (耗时: %v)", filename, originalURL, time.Since(start))
}
// 按 URL 查询并代理图片(新增:反向查询)
func proxyImageByURL(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 从 Query 参数获取 URL(格式:/proxy?url=https://...)
originalURL := r.URL.Query().Get("url")
if originalURL == "" {
http.Error(w, "缺少 url 参数", http.StatusBadRequest)
return
}
// 根据 URL 查询 filename(反向查询)
filename, err := getFilenameByURL(originalURL)
if err != nil {
log.Printf("[404] URL: %s - %v", originalURL, err)
http.Error(w, "映射不存在", http.StatusNotFound)
return
}
// 请求原图(复用全局 httpClient)
resp, err := httpClient.Get(originalURL)
if err != nil {
log.Printf("[错误] 请求原图失败: %v", err)
http.Error(w, "无法获取图片", http.StatusBadGateway)
return
}
defer resp.Body.Close()
// 检查响应状态
if resp.StatusCode != http.StatusOK {
log.Printf("[错误] 原图返回状态码: %d", resp.StatusCode)
http.Error(w, "原图不可用", resp.StatusCode)
return
}
// 复制响应头
for key, values := range resp.Header {
for _, value := range values {
w.Header().Add(key, value)
}
}
// 设置必要的响应头
w.Header().Set("X-Filename", filename)
w.Header().Set("X-Proxied-By", "ImageProxy-Redis")
w.Header().Set("Cache-Control", "public, max-age=3600")
// 返回图片内容
_, err = io.Copy(w, resp.Body)
if err != nil {
log.Printf("[错误] 传输图片失败: %v", err)
return
}
log.Printf("[反向查询成功] URL: %s -> filename: %s (耗时: %v)", originalURL, filename, time.Since(start))
}
// ==================== API 接口 ====================
// 统计信息
func statsHandler(w http.ResponseWriter, r *http.Request) {
count, err := getRedisMappingCount()
if err != nil {
http.Error(w, fmt.Sprintf("获取统计失败: %v", err), http.StatusInternalServerError)
return
}
w.Write([]byte(fmt.Sprintf(`{"total_mappings": %d, "redis": "%s"}`, count, cfg.RedisAddr)))
}
// 健康检查
func healthHandler(w http.ResponseWriter, r *http.Request) {
// 检查 Redis
if _, err := redisClient.Ping(ctx).Result(); err != nil {
w.Write([]byte(fmt.Sprintf(`{"status": "unhealthy", "redis": "down", "error": "%v"}`, err)))
return
}
w.Write([]byte(`{"status": "healthy", "redis": "up"}`))
}
// 同步状态
func syncStatusHandler(w http.ResponseWriter, r *http.Request) {
count, _ := getRedisMappingCount()
w.Write([]byte(fmt.Sprintf(`{"synced_count": %d, "mysql_table": "%s", "last_sync": "manual"}`, count, cfg.MySQLTable)))
}
// 手动触发同步
func triggerSyncHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "只支持 POST", http.StatusMethodNotAllowed)
return
}
go func() {
if err := fullSyncMySQLToRedis(); err != nil {
log.Printf("同步失败: %v", err)
}
}()
w.Write([]byte(`{"status": "sync_started", "message": "全量同步已启动,请查看日志"}`))
}
// ==================== 主程序 ====================
func main() {
log.Println(`
╔══════════════════════════════════════════════════════════════╗
║ 图片代理服务 ║
╚══════════════════════════════════════════════════════════════╝`)
// 1. 初始化 Redis
if err := initRedis(); err != nil {
log.Fatalf("Redis 初始化失败: %v", err)
}
// 2. 初始化 MySQL
if err := initMySQL(); err != nil {
log.Fatalf("MySQL 初始化失败: %v", err)
}
// 3. 全量同步
if err := fullSyncMySQLToRedis(); err != nil {
log.Printf("全量同步失败: %v", err)
log.Println("服务将继续启动,但 Redis 中可能没有数据")
}
// 4. 设置路由
http.HandleFunc("/health", healthHandler)
http.HandleFunc("/img/", proxyImage)
http.HandleFunc("/proxy", proxyImageByURL) // 新增:按URL反向查询(GET /proxy?url=...)
http.HandleFunc("/api/stats", statsHandler)
http.HandleFunc("/api/sync", triggerSyncHandler)
http.HandleFunc("/api/sync/status", syncStatusHandler)
// 5. 启动服务
addr := fmt.Sprintf(":%s", cfg.ServerPort)
log.Printf(`
╔══════════════════════════════════════════════════════════════╗
║ 服务配置信息 ║
╠══════════════════════════════════════════════════════════════╣
║ 代理端口: %s ║
║ Redis: %s ║
║ MySQL: %s/%s ║
║ 表名: %s ║
╠══════════════════════════════════════════════════════════════╣
║ API 接口: ║
║ - GET /img/{filename} 代理图片(按文件名查) ║
║ - GET /proxy/{url} 代理图片(按URL查,O(1)) ║
║ - GET /api/stats 统计信息 ║
║ - POST /api/sync 触发全量同步 ║
║ - GET /api/sync/status 同步状态 ║
║ - GET /health 健康检查 ║
╚══════════════════════════════════════════════════════════════╝`, cfg.ServerPort, cfg.RedisAddr, cfg.MySQLAddr, cfg.MySQLDatabase, cfg.MySQLTable)
// 优雅退出
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
go func() {
if err := http.ListenAndServe(addr, nil); err != nil {
log.Fatalf("服务启动失败: %v", err)
}
}()
<-quit
log.Println("正在关闭服务...")
redisClient.Close()
mysqlDB.Close()
log.Println("服务已关闭")
}
版本三:多平台微服务隔离
在实际微服务架构中,我们可能要为不同平台(例如某多和某东)分别部署代理服务。怎么让它们在同一个 Redis 实例里和谐共处还不互相干扰?很简单:用不同的 DB 号或 key 前缀做隔离。
下面的代码就是为某东(JD)平台定制的一个变体,它监听在 8081 端口,使用 Redis 的 DB 8,分布式锁的 key 也做了区分。
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"os/signal"
"path"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/redis/go-redis/v9"
)
// ==================== 配置 ====================
type Config struct {
ServerPort string // 代理服务端口
RedisAddr string // Redis 地址
RedisPassword string // Redis 密码(无则留空)
RedisDB int // Redis DB
MySQLAddr string // MySQL 地址
MySQLUser string // MySQL 用户
MySQLPassword string // MySQL 密码
MySQLDatabase string // MySQL 数据库
MySQLTable string // MySQL 表名
MySQLIDField string // ID 字段名(用于生成新链接的文件名)
}
func getEnv(key, defaultVal string) string {
if val := os.Getenv(key); val != "" {
return val
}
return defaultVal
}
var cfg = Config{
ServerPort: "8081", // JD 服务端口(区别于 PDD 的 8080)
RedisAddr: "localhost:6379",
RedisPassword: getEnv("REDIS_PASSWORD", ""),
RedisDB: 8, // JD 使用 DB8(PDD 使用 DB7)
MySQLAddr: "",
MySQLUser: "root",
MySQLPassword: getEnv("MYSQL_PASSWORD", ""),
MySQLDatabase: "jserve",
MySQLTable: "", // JD 表名
MySQLIDField: "product_id",
}
// ==================== 全局变量 ====================
var (
redisClient *redis.Client
mysqlDB *sql.DB
ctx = context.Background()
)
// 全局 HTTP Client(连接复用,提升性能)
var httpClient = &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
},
}
// ==================== Redis 操作 ====================
func initRedis() error {
redisClient = redis.NewClient(&redis.Options{
Addr: cfg.RedisAddr,
Password: cfg.RedisPassword,
DB: cfg.RedisDB,
})
// 测试连接
_, err := redisClient.Ping(ctx).Result()
if err != nil {
return fmt.Errorf("Redis 连接失败: %v", err)
}
log.Printf("Redis 连接成功: %s", cfg.RedisAddr)
return nil
}
// 添加单条映射到 Redis(双向索引)
func addMappingToRedis(filename, originalURL string) error {
pipe := redisClient.Pipeline()
// 正向索引:filename -> url
pipe.Set(ctx, "img:"+filename, originalURL, 0)
// 反向索引:url -> filename
pipe.Set(ctx, "rev:"+originalURL, filename, 0)
_, err := pipe.Exec(ctx)
return err
}
// 批量添加映射到 Redis(双向索引)
func batchAddMappingsToRedis(mappings map[string]string) error {
if len(mappings) == 0 {
return nil
}
pipe := redisClient.Pipeline()
for filename, originalURL := range mappings {
// 正向索引:filename -> url
pipe.Set(ctx, "img:"+filename, originalURL, 0)
// 反向索引:url -> filename
pipe.Set(ctx, "rev:"+originalURL, filename, 0)
}
_, err := pipe.Exec(ctx)
return err
}
// 根据 URL 查询 filename(O(1))
func getFilenameByURL(originalURL string) (string, error) {
key := "rev:" + originalURL
filename, err := redisClient.Get(ctx, key).Result()
if err == redis.Nil {
return "", fmt.Errorf("映射不存在: %s", originalURL)
}
if err != nil {
return "", err
}
return filename, nil
}
// 根据 filename 查询 URL(O(1))
func getURLByFilename(filename string) (string, error) {
key := "img:" + filename
url, err := redisClient.Get(ctx, key).Result()
if err == redis.Nil {
return "", fmt.Errorf("映射不存在: %s", filename)
}
if err != nil {
return "", err
}
return url, nil
}
// 获取 Redis 中的映射总数
func getRedisMappingCount() (int64, error) {
var cursor uint64
var count int64
for {
keys, nextCursor, err := redisClient.Scan(ctx, cursor, "img:*", 1000).Result()
if err != nil {
return count, err
}
count += int64(len(keys))
cursor = nextCursor
if cursor == 0 {
break
}
}
return count, nil
}
// ==================== MySQL 操作 ====================
func initMySQL() error {
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
cfg.MySQLUser, cfg.MySQLPassword, cfg.MySQLAddr, cfg.MySQLDatabase)
var err error
mysqlDB, err = sql.Open("mysql", dsn)
if err != nil {
return fmt.Errorf("MySQL 连接失败: %v", err)
}
// 配置连接池(增大以支持并发)
mysqlDB.SetMaxOpenConns(20)
mysqlDB.SetMaxIdleConns(10)
mysqlDB.SetConnMaxLifetime(time.Hour)
// 测试连接
if err = mysqlDB.Ping(); err != nil {
return fmt.Errorf("MySQL Ping 失败: %v", err)
}
log.Printf("MySQL 连接成功: %s/%s", cfg.MySQLAddr, cfg.MySQLDatabase)
return nil
}
// ==================== 同步配置 ====================
const (
pageSize = 2000 // 每批处理的 product_id 数量
redisBatch = 5000 // Redis Pipeline 批量大小
syncBatchSize = 100 // 每批查询的 product_id 数量(IN 查询限制)
syncLockKey = "lock:image-proxy:jd:sync" // 分布式锁 key(JD 专用,避免与 PDD 冲突)
)
var maxWorkers = runtime.NumCPU() - 1 // 并发数(留1核给主线程)
// 尝试获取同步锁
func tryAcquireSyncLock() bool {
ok, _ := redisClient.SetNX(ctx, syncLockKey, "1", 60*time.Minute).Result()
return ok
}
// 释放同步锁
func releaseSyncLock() {
redisClient.Del(ctx, syncLockKey)
}
// 全量同步:从 MySQL 导入所有数据到 Redis(并发版)
func fullSyncMySQLToRedis() error {
// 获取分布式锁
if !tryAcquireSyncLock() {
log.Println("同步任务进行中,跳过本次执行")
return nil
}
defer releaseSyncLock()
log.Println("开始全量同步 MySQL -> Redis(并发模式)...")
// 获取总待处理数
var totalCount int
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE sync_status = 0", cfg.MySQLTable)
if err := mysqlDB.QueryRow(countQuery).Scan(&totalCount); err != nil {
return fmt.Errorf("获取总数失败: %v", err)
}
log.Printf("待处理记录: %d 条,并发数: %d", totalCount, maxWorkers)
if totalCount == 0 {
log.Println("没有需要同步的数据")
return nil
}
// 1. 获取所有待处理的 product_id
rows, err := mysqlDB.Query(fmt.Sprintf(
"SELECT product_id FROM %s WHERE sync_status = 0", cfg.MySQLTable))
if err != nil {
return fmt.Errorf("获取 product_id 列表失败: %v", err)
}
defer rows.Close()
var productIDs []string
for rows.Next() {
var pid string
if err := rows.Scan(&pid); err != nil {
continue
}
productIDs = append(productIDs, pid)
}
total := len(productIDs)
// 2. 统计计数器
var (
processedCount atomic.Int64
successCount atomic.Int64
failCount atomic.Int64
)
// 3. 进度打印 goroutine
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
p := processedCount.Load()
s := successCount.Load()
f := failCount.Load()
percent := float64(p) / float64(total) * 100
log.Printf("进度: %d/%d (%.1f%%) - 成功: %d, 失败: %d", p, total, percent, s, f)
}
}()
// 4. 并发处理
var wg sync.WaitGroup
semaphore := make(chan struct{}, maxWorkers)
// 按 pageSize 分批
for i := 0; i < total; i += pageSize {
end := i + pageSize
if end > total {
end = total
}
batch := productIDs[i:end]
wg.Add(1)
semaphore <- struct{}{}
go func(ids []string) {
defer wg.Done()
defer func() { <-semaphore }()
processProductBatch(ids, &processedCount, &successCount, &failCount)
}(batch)
}
wg.Wait()
log.Printf("全量同步完成!总计: %d, 成功: %d, 失败: %d",
processedCount.Load(), successCount.Load(), failCount.Load())
return nil
}
// 处理一批 product_id
func processProductBatch(ids []string, processed, success, fail *atomic.Int64) {
// 分成更小的批次进行 IN 查询
for i := 0; i < len(ids); i += syncBatchSize {
end := i + syncBatchSize
if end > len(ids) {
end = len(ids)
}
batch := ids[i:end]
processSyncBatch(batch, processed, success, fail)
}
}
// 处理单个同步批次(IN 查询)
func processSyncBatch(ids []string, processed, success, fail *atomic.Int64) {
if len(ids) == 0 {
return
}
// 构造 IN 查询
placeholders := strings.Repeat("?,", len(ids)-1) + "?"
query := fmt.Sprintf(
"SELECT product_id, main_image_url, sales_link, product_video, "+
"description_images, main_images, aspect_ratio_images, pic_urls, sku_list "+
"FROM %s WHERE product_id IN (%s)", cfg.MySQLTable, placeholders)
// 将 []string 转为 []interface{}
args := make([]interface{}, len(ids))
for j, id := range ids {
args[j] = id
}
rows, err := mysqlDB.Query(query, args...)
if err != nil {
log.Printf("批量查询失败: %v", err)
fail.Add(int64(len(ids)))
return
}
defer rows.Close()
var batchMappings = make(map[string]string)
var syncedIDs []string
for rows.Next() {
var productID string
var mainImageURL, salesLink, productVideo sql.NullString
var descriptionImages, mainImages, aspectRatioImages, picUrls, skuList sql.NullString
if err := rows.Scan(&productID, &mainImageURL, &salesLink, &productVideo,
&descriptionImages, &mainImages, &aspectRatioImages, &picUrls, &skuList); err != nil {
log.Printf("扫描行失败: %v", err)
continue
}
urls := extractURLs(productID, mainImageURL, salesLink, productVideo,
descriptionImages, mainImages, aspectRatioImages, picUrls, skuList)
for filename, originalURL := range urls {
batchMappings[filename] = originalURL
}
syncedIDs = append(syncedIDs, productID)
}
// 批量写入 Redis
if len(batchMappings) > 0 {
if err := batchAddMappingsToRedis(batchMappings); err != nil {
log.Printf("Redis 批量写入失败: %v", err)
fail.Add(int64(len(syncedIDs)))
for _, pid := range syncedIDs {
updateSyncStatus(pid, 2)
}
} else {
success.Add(int64(len(syncedIDs)))
for _, pid := range syncedIDs {
updateSyncStatus(pid, 1)
}
}
}
processed.Add(int64(len(ids)))
}
// 更新同步状态
func updateSyncStatus(productID string, status int) {
query := fmt.Sprintf("UPDATE %s SET sync_status = ? WHERE product_id = ?", cfg.MySQLTable)
if _, err := mysqlDB.Exec(query, status, productID); err != nil {
log.Printf("更新状态失败 (product_id=%s, status=%d): %v", productID, status, err)
}
}
// 从 URL 中提取扩展名(带点),如 .jpg .mp4 .png
func getExtFromURL(rawURL string) string {
ext := path.Ext(rawURL)
if ext == "" {
return ""
}
return ext
}
func extractURLs(productID string, mainImageURL, salesLink, productVideo sql.NullString, descriptionImages, mainImages, aspectRatioImages, picUrls, skuList sql.NullString) map[string]string {
result := make(map[string]string)
// 1. main_image_url (单字符串)
if mainImageURL.Valid && mainImageURL.String != "" {
ext := getExtFromURL(mainImageURL.String)
if ext == "" {
ext = ".jpg"
}
filename := fmt.Sprintf("%s_main_image_url%s", productID, ext)
result[filename] = mainImageURL.String
}
// 2. sales_link (单字符串)
if salesLink.Valid && salesLink.String != "" {
ext := getExtFromURL(salesLink.String)
if ext == "" {
ext = ".jpg"
}
filename := fmt.Sprintf("%s_sales_link%s", productID, ext)
result[filename] = salesLink.String
}
// 3. product_video (单字符串,保持原后缀)
if productVideo.Valid && productVideo.String != "" {
ext := getExtFromURL(productVideo.String)
if ext == "" {
ext = ".mp4"
}
filename := fmt.Sprintf("%s_product_video%s", productID, ext)
result[filename] = productVideo.String
}
// 4. description_images (字符串数组)
if descriptionImages.Valid && descriptionImages.String != "" {
var urls []string
if err := json.Unmarshal([]byte(descriptionImages.String), &urls); err == nil {
for i, u := range urls {
if u != "" {
ext := getExtFromURL(u)
if ext == "" {
ext = ".jpg"
}
filename := fmt.Sprintf("%s_description_images_%d%s", productID, i, ext)
result[filename] = u
}
}
}
}
// 5. main_images (字符串数组)
if mainImages.Valid && mainImages.String != "" {
var urls []string
if err := json.Unmarshal([]byte(mainImages.String), &urls); err == nil {
for i, u := range urls {
if u != "" {
ext := getExtFromURL(u)
if ext == "" {
ext = ".jpg"
}
filename := fmt.Sprintf("%s_main_images_%d%s", productID, i, ext)
result[filename] = u
}
}
}
}
// 6. aspect_ratio_images (字符串数组)
if aspectRatioImages.Valid && aspectRatioImages.String != "" {
var urls []string
if err := json.Unmarshal([]byte(aspectRatioImages.String), &urls); err == nil {
for i, u := range urls {
if u != "" {
ext := getExtFromURL(u)
if ext == "" {
ext = ".jpg"
}
filename := fmt.Sprintf("%s_aspect_ratio_images_%d%s", productID, i, ext)
result[filename] = u
}
}
}
}
// 7. pic_urls (字符串数组)
if picUrls.Valid && picUrls.String != "" {
var urls []string
if err := json.Unmarshal([]byte(picUrls.String), &urls); err == nil {
for i, u := range urls {
if u != "" {
ext := getExtFromURL(u)
if ext == "" {
ext = ".jpg"
}
filename := fmt.Sprintf("%s_pic_urls_%d%s", productID, i, ext)
result[filename] = u
}
}
}
}
// 8. sku_list (对象数组,提取 thumbUrl)
if skuList.Valid && skuList.String != "" {
var skus []struct {
ThumbUrl string `json:"thumbUrl"`
SkuID int64 `json:"skuId"`
}
if err := json.Unmarshal([]byte(skuList.String), &skus); err == nil {
for _, sku := range skus {
if sku.ThumbUrl != "" {
ext := getExtFromURL(sku.ThumbUrl)
if ext == "" {
ext = ".jpg"
}
filename := fmt.Sprintf("%s_sku_%d_thumbUrl%s", productID, sku.SkuID, ext)
result[filename] = sku.ThumbUrl
}
}
}
}
return result
}
// ==================== 代理服务 ====================
// 代理图片请求(按 filename 查询,返回原图)
func proxyImage(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 提取文件名
filename := strings.TrimPrefix(r.URL.Path, "/img/")
filename = strings.TrimPrefix(filename, "/")
if filename == "" {
http.Error(w, "缺少文件名", http.StatusBadRequest)
return
}
// URL 解码
filename, err := url.QueryUnescape(filename)
if err != nil {
http.Error(w, "无效的文件名", http.StatusBadRequest)
return
}
// 根据 filename 查询 URL
originalURL, err := getURLByFilename(filename)
if err != nil {
log.Printf("[404] %s - %v", filename, err)
http.Error(w, "图片不存在", http.StatusNotFound)
return
}
// 请求原图(复用全局 httpClient)
resp, err := httpClient.Get(originalURL)
if err != nil {
log.Printf("[错误] 请求原图失败: %v", err)
http.Error(w, "无法获取图片", http.StatusBadGateway)
return
}
defer resp.Body.Close()
// 检查响应状态
if resp.StatusCode != http.StatusOK {
log.Printf("[错误] 原图返回状态码: %d", resp.StatusCode)
http.Error(w, "原图不可用", resp.StatusCode)
return
}
// 复制响应头
for key, values := range resp.Header {
for _, value := range values {
w.Header().Add(key, value)
}
}
// 设置必要的响应头
w.Header().Set("X-Original-URL", originalURL)
w.Header().Set("X-Proxied-By", "ImageProxy-Redis-JD")
w.Header().Set("Cache-Control", "public, max-age=3600")
// 返回图片内容
_, err = io.Copy(w, resp.Body)
if err != nil {
log.Printf("[错误] 传输图片失败: %v", err)
return
}
log.Printf("[成功] %s -> %s (耗时: %v)", filename, originalURL, time.Since(start))
}
// 按 URL 查询并代理图片(反向查询)
func proxyImageByURL(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 从 Query 参数获取 URL(格式:/proxy?url=https://...)
originalURL := r.URL.Query().Get("url")
if originalURL == "" {
http.Error(w, "缺少 url 参数", http.StatusBadRequest)
return
}
// 根据 URL 查询 filename(反向查询)
filename, err := getFilenameByURL(originalURL)
if err != nil {
log.Printf("[404] URL: %s - %v", originalURL, err)
http.Error(w, "映射不存在", http.StatusNotFound)
return
}
// 请求原图(复用全局 httpClient)
resp, err := httpClient.Get(originalURL)
if err != nil {
log.Printf("[错误] 请求原图失败: %v", err)
http.Error(w, "无法获取图片", http.StatusBadGateway)
return
}
defer resp.Body.Close()
// 检查响应状态
if resp.StatusCode != http.StatusOK {
log.Printf("[错误] 原图返回状态码: %d", resp.StatusCode)
http.Error(w, "原图不可用", resp.StatusCode)
return
}
// 复制响应头
for key, values := range resp.Header {
for _, value := range values {
w.Header().Add(key, value)
}
}
// 设置必要的响应头
w.Header().Set("X-Filename", filename)
w.Header().Set("X-Proxied-By", "ImageProxy-Redis-JD")
w.Header().Set("Cache-Control", "public, max-age=3600")
// 返回图片内容
_, err = io.Copy(w, resp.Body)
if err != nil {
log.Printf("[错误] 传输图片失败: %v", err)
return
}
log.Printf("[反向查询成功] URL: %s -> filename: %s (耗时: %v)", originalURL, filename, time.Since(start))
}
// ==================== API 接口 ====================
// 统计信息
func statsHandler(w http.ResponseWriter, r *http.Request) {
count, err := getRedisMappingCount()
if err != nil {
http.Error(w, fmt.Sprintf("获取统计失败: %v", err), http.StatusInternalServerError)
return
}
w.Write([]byte(fmt.Sprintf(`{"total_mappings": %d, "redis": "%s", "db": %d}`, count, cfg.RedisAddr, cfg.RedisDB)))
}
// 健康检查
func healthHandler(w http.ResponseWriter, r *http.Request) {
// 检查 Redis
if _, err := redisClient.Ping(ctx).Result(); err != nil {
w.Write([]byte(fmt.Sprintf(`{"status": "unhealthy", "redis": "down", "error": "%v"}`, err)))
return
}
w.Write([]byte(`{"status": "healthy", "redis": "up"}`))
}
// 同步状态
func syncStatusHandler(w http.ResponseWriter, r *http.Request) {
count, _ := getRedisMappingCount()
w.Write([]byte(fmt.Sprintf(`{"synced_count": %d, "mysql_table": "%s", "last_sync": "manual"}`, count, cfg.MySQLTable)))
}
// 手动触发同步
func triggerSyncHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "只支持 POST", http.StatusMethodNotAllowed)
return
}
go func() {
if err := fullSyncMySQLToRedis(); err != nil {
log.Printf("同步失败: %v", err)
}
}()
w.Write([]byte(`{"status": "sync_started", "message": "全量同步已启动,请查看日志"}`))
}
// ==================== 主程序 ====================
func main() {
log.Println(`
╔══════════════════════════════════════════════════════════════╗
║ 图片代理服务 [JD] ║
╚══════════════════════════════════════════════════════════════╝`)
// 1. 初始化 Redis
if err := initRedis(); err != nil {
log.Fatalf("Redis 初始化失败: %v", err)
}
// 2. 初始化 MySQL
if err := initMySQL(); err != nil {
log.Fatalf("MySQL 初始化失败: %v", err)
}
// 3. 全量同步
if err := fullSyncMySQLToRedis(); err != nil {
log.Printf("全量同步失败: %v", err)
log.Println("服务将继续启动,但 Redis 中可能没有数据")
}
// 4. 设置路由
http.HandleFunc("/health", healthHandler)
http.HandleFunc("/img/", proxyImage)
http.HandleFunc("/proxy", proxyImageByURL) // GET /proxy?url=...
http.HandleFunc("/api/stats", statsHandler)
http.HandleFunc("/api/sync", triggerSyncHandler)
http.HandleFunc("/api/sync/status", syncStatusHandler)
// 5. 启动服务
addr := fmt.Sprintf(":%s", cfg.ServerPort)
log.Printf(`
╔══════════════════════════════════════════════════════════════╗
║ 服务配置信息 [JD] ║
╠══════════════════════════════════════════════════════════════╣
║ 代理端口: %s ║
║ Redis: %s (DB%d) ║
║ MySQL: %s/%s ║
║ 表名: %s ║
╠══════════════════════════════════════════════════════════════╣
║ API 接口: ║
║ - GET /img/{filename} 代理图片(按文件名查) ║
║ - GET /proxy?url=... 代理图片(按URL查,O(1)) ║
║ - GET /api/stats 统计信息 ║
║ - POST /api/sync 触发全量同步 ║
║ - GET /api/sync/status 同步状态 ║
║ - GET /health 健康检查 ║
╚══════════════════════════════════════════════════════════════╝`,
cfg.ServerPort, cfg.RedisAddr, cfg.RedisDB, cfg.MySQLAddr, cfg.MySQLDatabase, cfg.MySQLTable)
// 优雅退出
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
go func() {
if err := http.ListenAndServe(addr, nil); err != nil {
log.Fatalf("服务启动失败: %v", err)
}
}()
<-quit
log.Println("正在关闭服务...")
redisClient.Close()
mysqlDB.Close()
log.Println("服务已关闭")
}
从内存版的小 Demo 到最终生产级的微服务,核心思想其实很清晰:用 Go 的并发能力提供高性能 HTTP 服务,用 Redis 的无敌读写速度承载海量映射关系,再用 MySQL 充当可回溯的持久层。希望这三个版本的代码能给你一些启发,帮助你在自己的项目中少走弯路。如果你对这类后端架构实战内容感兴趣,云栈社区 里还有更多深度讨论。