找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

5226

积分

0

好友

718

主题
发表于 3 小时前 | 查看: 4| 回复: 0

代理服务是客户端与目标服务器之间的中间服务器,负责转发请求、响应并提供附加能力(如缓存、过滤、隐藏 IP、负载均衡等)。按部署位置分为正向代理(代表客户端)与反向代理(代表服务器)两大类。

现实业务中,我们经常会遇到海量异构图片链接的“收归”管理问题。比如从各大电商平台抓取的商品素材,它们的原始链接域名各异、格式混乱。如果想统一对外提供,就需要搭建一个高性能的代理服务,用我们自己的域名封装这些原始链接。

针对这种需求,常见做法有这几种:

方案 原理 优点 缺点
本地代理服务 自己起一个服务把新URL请求代理到原 URL 完全可控灵活 需要跑一个服务
Nginx 反向代理 用 Nginx 配置重写规则 高性能稳定 需要配 Nginx
在线图片代理 用现成的代理服务中转 零部署 受限于第三方
下载后本地托管 先下到本地,再起本地服务器 离线可用 图片要存本地

对于千万级映射 + 高性能要求,最佳方案是什么?我们推荐:Redis + 高性能代理服务

为什么是 Redis?因为它在这方面拥有天然优势:

指标 说明
容量 支持亿级 key,内存存储
速度 毫秒级 O(1) 查询,单机 QPS 10万+
可靠 支持持久化(RDB+AOF)
简单 简单 KV 查询,无需复杂索引

在具体的技术选型上,大概有这几条路可以走:

方案 性能 复杂度 推荐场景
Go + Redis ⭐⭐⭐⭐⭐ 首选,高并发,部署简单
Nginx + Lua + Redis ⭐⭐⭐⭐⭐ 已有 Nginx 基础设施
Python + Redis ⭐⭐⭐ 快速原型,QPS < 1000
Tornado/FastAPI + Redis ⭐⭐⭐ 熟悉 Python,QPS < 5000

可以看到,Go 语言凭借其轻量的 Goroutine 和出色的并发表现,成为自建代理服务的首选。接下来,我会分享三个版本的 Go 代理服务实现,一步步升级打怪,来看看如何优雅地应对千万级数据量的挑战。

版本一:基于内存的基础版

万丈高楼平地起。在引入外部中间件之前,我们先来看看一个纯粹基于内存的图片代理服务是怎么玩的。它虽简单,却五脏俱全,特别适合映射数据量较少、不要求持久化的场景。

package main

import (
    "fmt"
    "io"
    "log"
    "net/http"
    "net/url"
    "os"
    "os/signal"
    "path/filepath"
    "strings"
    "sync"
    "syscall"
    "time"
)

var (
    mappings   = make(map[string]string)
    mappingsMu sync.RWMutex
    reqTotal   int64
    reqHit     int64
    reqMiss    int64
    statsMu    sync.Mutex
)

func addMapping(filename, originalURL string) {
    mappingsMu.Lock()
    defer mappingsMu.Unlock()
    mappings[filename] = originalURL
}

func getOriginalURL(filename string) (string, bool) {
    mappingsMu.RLock()
    defer mappingsMu.RUnlock()
    u, ok := mappings[filename]
    return u, ok
}

func getMappingCount() int {
    mappingsMu.RLock()
    defer mappingsMu.RUnlock()
    return len(mappings)
}

func getAllMappings() map[string]string {
    mappingsMu.RLock()
    defer mappingsMu.RUnlock()
    result := make(map[string]string, len(mappings))
    for k, v := range mappings {
        result[k] = v
    }
    return result
}

// 根据文件扩展名获取 Content-Type
func getContentType(filename string) string {
    ext := strings.ToLower(filepath.Ext(filename))
    switch ext {
    case ".jpg", ".jpeg":
        return "image/jpeg"
    case ".png":
        return "image/png"
    case ".gif":
        return "image/gif"
    case ".webp":
        return "image/webp"
    case ".svg":
        return "image/svg+xml"
    case ".bmp":
        return "image/bmp"
    case ".ico":
        return "image/x-icon"
    case ".pdf":
        return "application/pdf"
    default:
        return "application/octet-stream"
    }
}

func proxyImage(w http.ResponseWriter, r *http.Request) {
    start := time.Now()
    filename := strings.TrimPrefix(r.URL.Path, "/img/")
    if filename == "" {
        http.Error(w, "缺少文件名", http.StatusBadRequest)
        return
    }
    var err error
    filename, err = url.QueryUnescape(filename)
    if err != nil {
        http.Error(w, "无效的文件名", http.StatusBadRequest)
        return
    }

    statsMu.Lock()
    reqTotal++
    statsMu.Unlock()

    originalURL, ok := getOriginalURL(filename)
    if !ok {
        statsMu.Lock()
        reqMiss++
        statsMu.Unlock()
        log.Printf("[404] %s - 映射不存在", filename)
        http.Error(w, "图片不存在", http.StatusNotFound)
        return
    }

    statsMu.Lock()
    reqHit++
    statsMu.Unlock()

    client := &http.Client{Timeout: 30 * time.Second}
    resp, err := client.Get(originalURL)
    if err != nil {
        log.Printf("[错误] 请求原图失败: %v", err)
        http.Error(w, "无法获取图片", http.StatusBadGateway)
        return
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        log.Printf("[错误] 原图返回状态码: %d", resp.StatusCode)
        http.Error(w, "原图不可用", resp.StatusCode)
        return
    }

    // 先读取全部内容
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        log.Printf("[错误] 读取原图失败: %v", err)
        http.Error(w, "读取图片失败", http.StatusBadGateway)
        return
    }

    // 设置 Content-Type(优先用扩展名判断,其次用原图响应头)
    contentType := getContentType(filename)
    if ct := resp.Header.Get("Content-Type"); ct != "" && ct != "application/octet-stream" {
        contentType = ct
    }
    w.Header().Set("Content-Type", contentType)

    // 设置缓存和代理头
    w.Header().Set("X-Proxied-By", "ImageProxy-Memory")
    w.Header().Set("Cache-Control", "public, max-age=3600")
    w.Header().Set("Content-Length", fmt.Sprintf("%d", len(body)))

    // 写入响应
    w.Write(body)
    log.Printf("[成功] %s -> %s (耗时: %v, 类型: %s, 大小: %d bytes)",
        filename, originalURL, time.Since(start), contentType, len(body))
}

func addHandler(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        http.Error(w, "只支持 POST", http.StatusMethodNotAllowed)
        return
    }
    filename := r.URL.Query().Get("filename")
    originalURL := r.URL.Query().Get("url")
    if filename == "" || originalURL == "" {
        http.Error(w, "缺少参数 filename 或 url", http.StatusBadRequest)
        return
    }
    addMapping(filename, originalURL)
    log.Printf("[添加] %s -> %s", filename, originalURL)
    w.Header().Set("Content-Type", "application/json")
    fmt.Fprintf(w, `{"status":"ok","filename":"%s","total":%d}`, filename, getMappingCount())
}

func queryHandler(w http.ResponseWriter, r *http.Request) {
    filename := r.URL.Query().Get("filename")
    if filename == "" {
        http.Error(w, "缺少参数 filename", http.StatusBadRequest)
        return
    }
    w.Header().Set("Content-Type", "application/json")
    u, ok := getOriginalURL(filename)
    if !ok {
        w.WriteHeader(http.StatusNotFound)
        fmt.Fprintf(w, `{"status":"not_found","filename":"%s"}`, filename)
        return
    }
    fmt.Fprintf(w, `{"status":"ok","filename":"%s","url":"%s"}`, filename, u)
}

func listHandler(w http.ResponseWriter, r *http.Request) {
    all := getAllMappings()
    w.Header().Set("Content-Type", "application/json")
    var sb strings.Builder
    sb.WriteString(fmt.Sprintf(`{"total":%d,"mappings":{`, len(all)))
    first := true
    for k, v := range all {
        if !first {
            sb.WriteString(",")
        }
        sb.WriteString(fmt.Sprintf(`"%s":"%s"`, k, v))
        first = false
    }
    sb.WriteString("}}")
    fmt.Fprint(w, sb.String())
}

func statsHandler(w http.ResponseWriter, r *http.Request) {
    statsMu.Lock()
    t, h, m := reqTotal, reqHit, reqMiss
    statsMu.Unlock()
    w.Header().Set("Content-Type", "application/json")
    fmt.Fprintf(w, `{"total_mappings":%d,"total_requests":%d,"hit_count":%d,"miss_count":%d}`,
        getMappingCount(), t, h, m)
}

func healthHandler(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "application/json")
    fmt.Fprintf(w, `{"status":"healthy","total_mappings":%d}`, getMappingCount())
}

func loadDemoData() {
    demos := map[string]string{
        "demo1.jpg": "https://picsum.photos/id/1018/800/600.jpg",
        "demo2.jpg": "https://picsum.photos/id/1015/800/600.jpg",
        "demo3.jpg": "https://picsum.photos/id/1019/800/600.jpg",
    }
    for k, v := range demos {
        addMapping(k, v)
    }
    log.Printf("已加载 %d 条演示数据", len(demos))
}

func main() {
    log.Println("图片代理服务 (内存版) 启动中...")
    loadDemoData()

    http.HandleFunc("/health", healthHandler)
    http.HandleFunc("/img/", proxyImage)
    http.HandleFunc("/api/add", addHandler)
    http.HandleFunc("/api/query", queryHandler)
    http.HandleFunc("/api/list", listHandler)
    http.HandleFunc("/api/stats", statsHandler)

    port := os.Getenv("PORT")
    if port == "" {
        port = "8080"
    }
    addr := ":" + port
    log.Printf("服务地址: http://0.0.0.0%s", addr)
    log.Println("接口:")
    log.Println("  GET  /img/{filename}              代理图片")
    log.Println("  POST /api/add?filename=&url=      添加映射")
    log.Println("  GET  /api/query?filename=         查询映射")
    log.Println("  GET  /api/list                    所有映射")
    log.Println("  GET  /api/stats                   统计信息")
    log.Println("  GET  /health                      健康检查")

    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        if err := http.ListenAndServe(addr, nil); err != nil {
            log.Fatalf("启动失败: %v", err)
        }
    }()
    <-quit
    log.Println("服务已关闭")
}

版本二:Redis + MySQL 化茧成蝶

当我们的系统规模膨胀到几百万甚至上千万张图片时,内存版服务就有点力不从心了。重启服务就等于数据清空,这谁受得了?此时,就需要一个更可靠的持久化方案。我们可以利用 高性能代理服务 的思路,引入 Redis 作为高速缓存,MySQL 作为持久化的数据源,实现一个生产级的图片代理。

这套方案在架构上多了两个关键点:

  1. 双向索引:在 Redis 中,我们不仅要从 filenameurl(正向),还要支持从 url 反查 filename(反向),两种查询都是 O(1) 复杂度。
  2. 数据同步:主动从 MySQL 拉取数据到 Redis 进行预热,并支持增量更新。

请看代码实现:

package main

import (
    "context"
    "database/sql"
    "encoding/json"
    "fmt"
    "io"
    "log"
    "net/http"
    "net/url"
    "os"
    "os/signal"
    "path"
    "runtime"
    "strings"
    "sync"
    "sync/atomic"
    "syscall"
    "time"

    _ "github.com/go-sql-driver/mysql"
    "github.com/redis/go-redis/v9"
)

// ==================== 配置 ====================
type Config struct {
    ServerPort    string // 代理服务端口
    RedisAddr     string // Redis 地址
    RedisPassword string // Redis 密码(无则留空)
    RedisDB       int    // Redis DB

    MySQLAddr     string // MySQL 地址
    MySQLUser     string // MySQL 用户
    MySQLPassword string // MySQL 密码
    MySQLDatabase string // MySQL 数据库
    MySQLTable    string // MySQL 表名
    MySQLIDField  string // ID 字段名(用于生成新链接的文件名)
}

func getEnv(key, defaultVal string) string {
    if val := os.Getenv(key); val != "" {
        return val
    }
    return defaultVal
}

var cfg = Config{
    ServerPort:    "8080",
    RedisAddr:     "localhost:6379",
    RedisPassword: getEnv("REDIS_PASSWORD", ""),
    RedisDB:       7,

    MySQLAddr:     "",
    MySQLUser:     "root",
    MySQLPassword: getEnv("MYSQL_PASSWORD", ""),
    MySQLDatabase: "jserve",
    MySQLTable:    "",
    MySQLIDField:  "",
}

// ==================== 全局变量 ====================
var (
    redisClient *redis.Client
    mysqlDB     *sql.DB
    ctx         = context.Background()
)

// 全局 HTTP Client(连接复用,提升性能)
var httpClient = &http.Client{
    Timeout: 30 * time.Second,
    Transport: &http.Transport{
        MaxIdleConns:        100,
        MaxIdleConnsPerHost: 10,
        IdleConnTimeout:     90 * time.Second,
    },
}

// ==================== Redis 操作 ====================
func initRedis() error {
    redisClient = redis.NewClient(&redis.Options{
        Addr:     cfg.RedisAddr,
        Password: cfg.RedisPassword,
        DB:       cfg.RedisDB,
    })

    // 测试连接
    _, err := redisClient.Ping(ctx).Result()
    if err != nil {
        return fmt.Errorf("Redis 连接失败: %v", err)
    }

    log.Printf("Redis 连接成功: %s", cfg.RedisAddr)
    return nil
}

// 添加单条映射到 Redis(双向索引)
func addMappingToRedis(filename, originalURL string) error {
    pipe := redisClient.Pipeline()
    // 正向索引:filename -> url
    pipe.Set(ctx, "img:"+filename, originalURL, 0)
    // 反向索引:url -> filename
    pipe.Set(ctx, "rev:"+originalURL, filename, 0)
    _, err := pipe.Exec(ctx)
    return err
}

// 批量添加映射到 Redis(双向索引)
func batchAddMappingsToRedis(mappings map[string]string) error {
    if len(mappings) == 0 {
        return nil
    }

    pipe := redisClient.Pipeline()
    for filename, originalURL := range mappings {
        // 正向索引:filename -> url
        pipe.Set(ctx, "img:"+filename, originalURL, 0)
        // 反向索引:url -> filename
        pipe.Set(ctx, "rev:"+originalURL, filename, 0)
    }
    _, err := pipe.Exec(ctx)
    return err
}

// 根据 URL 查询 filename(O(1))
func getFilenameByURL(originalURL string) (string, error) {
    key := "rev:" + originalURL
    filename, err := redisClient.Get(ctx, key).Result()
    if err == redis.Nil {
        return "", fmt.Errorf("映射不存在: %s", originalURL)
    }
    if err != nil {
        return "", err
    }
    return filename, nil
}

// 根据 filename 查询 URL(O(1))
func getURLByFilename(filename string) (string, error) {
    key := "img:" + filename
    url, err := redisClient.Get(ctx, key).Result()
    if err == redis.Nil {
        return "", fmt.Errorf("映射不存在: %s", filename)
    }
    if err != nil {
        return "", err
    }
    return url, nil
}

// 获取 Redis 中的映射总数
func getRedisMappingCount() (int64, error) {
    var cursor uint64
    var count int64
    for {
        keys, nextCursor, err := redisClient.Scan(ctx, cursor, "img:*", 1000).Result()
        if err != nil {
            return count, err
        }
        count += int64(len(keys))
        cursor = nextCursor
        if cursor == 0 {
            break
        }
    }
    return count, nil
}

// ==================== MySQL 操作 ====================
func initMySQL() error {
    dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
        cfg.MySQLUser, cfg.MySQLPassword, cfg.MySQLAddr, cfg.MySQLDatabase)

    var err error
    mysqlDB, err = sql.Open("mysql", dsn)
    if err != nil {
        return fmt.Errorf("MySQL 连接失败: %v", err)
    }

    // 配置连接池(增大以支持并发)
    mysqlDB.SetMaxOpenConns(20)
    mysqlDB.SetMaxIdleConns(10)
    mysqlDB.SetConnMaxLifetime(time.Hour)

    // 测试连接
    if err = mysqlDB.Ping(); err != nil {
        return fmt.Errorf("MySQL Ping 失败: %v", err)
    }

    log.Printf("MySQL 连接成功: %s/%s", cfg.MySQLAddr, cfg.MySQLDatabase)
    return nil
}

// ==================== 同步配置 ====================
const (
    pageSize      = 2000                       // 每批处理的 product_id 数量
    redisBatch    = 5000                       // Redis Pipeline 批量大小
    syncBatchSize = 100                        // 每批查询的 product_id 数量(IN 查询限制)
    syncLockKey   = "lock:image-proxy:sync"     // 分布式锁 key
)

var maxWorkers = runtime.NumCPU() - 1 // 并发数(留1核给主线程)

// 尝试获取同步锁
func tryAcquireSyncLock() bool {
    ok, _ := redisClient.SetNX(ctx, syncLockKey, "1", 60*time.Minute).Result()
    return ok
}

// 释放同步锁
func releaseSyncLock() {
    redisClient.Del(ctx, syncLockKey)
}

// 全量同步:从 MySQL 导入所有数据到 Redis(并发版)
func fullSyncMySQLToRedis() error {
    // 获取分布式锁
    if !tryAcquireSyncLock() {
        log.Println("同步任务进行中,跳过本次执行")
        return nil
    }
    defer releaseSyncLock()

    log.Println("开始全量同步 MySQL -> Redis(并发模式)...")

    // 获取总待处理数
    var totalCount int
    countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE sync_status = 0", cfg.MySQLTable)
    if err := mysqlDB.QueryRow(countQuery).Scan(&totalCount); err != nil {
        return fmt.Errorf("获取总数失败: %v", err)
    }
    log.Printf("待处理记录: %d 条,并发数: %d", totalCount, maxWorkers)

    if totalCount == 0 {
        log.Println("没有需要同步的数据")
        return nil
    }

    // 1. 获取所有待处理的 product_id
    rows, err := mysqlDB.Query(fmt.Sprintf(
        "SELECT product_id FROM %s WHERE sync_status = 0", cfg.MySQLTable))
    if err != nil {
        return fmt.Errorf("获取 product_id 列表失败: %v", err)
    }
    defer rows.Close()

    var productIDs []string
    for rows.Next() {
        var pid string
        if err := rows.Scan(&pid); err != nil {
            continue
        }
        productIDs = append(productIDs, pid)
    }

    total := len(productIDs)

    // 2. 统计计数器
    var (
        processedCount atomic.Int64
        successCount   atomic.Int64
        failCount      atomic.Int64
    )

    // 3. 进度打印 goroutine
    go func() {
        ticker := time.NewTicker(5 * time.Second)
        defer ticker.Stop()
        for range ticker.C {
            p := processedCount.Load()
            s := successCount.Load()
            f := failCount.Load()
            percent := float64(p) / float64(total) * 100
            log.Printf("进度: %d/%d (%.1f%%) - 成功: %d, 失败: %d", p, total, percent, s, f)
        }
    }()

    // 4. 并发处理
    var wg sync.WaitGroup
    semaphore := make(chan struct{}, maxWorkers)

    // 按 pageSize 分批
    for i := 0; i < total; i += pageSize {
        end := i + pageSize
        if end > total {
            end = total
        }
        batch := productIDs[i:end]

        wg.Add(1)
        semaphore <- struct{}{}
        go func(ids []string) {
            defer wg.Done()
            defer func() { <-semaphore }()
            processProductBatch(ids, &processedCount, &successCount, &failCount)
        }(batch)
    }

    wg.Wait()

    log.Printf("全量同步完成!总计: %d, 成功: %d, 失败: %d",
        processedCount.Load(), successCount.Load(), failCount.Load())
    return nil
}

// 处理一批 product_id
func processProductBatch(ids []string, processed, success, fail *atomic.Int64) {
    // 分成更小的批次进行 IN 查询
    for i := 0; i < len(ids); i += syncBatchSize {
        end := i + syncBatchSize
        if end > len(ids) {
            end = len(ids)
        }
        batch := ids[i:end]
        processSyncBatch(batch, processed, success, fail)
    }
}

// 处理单个同步批次(IN 查询)
func processSyncBatch(ids []string, processed, success, fail *atomic.Int64) {
    if len(ids) == 0 {
        return
    }

    // 构造 IN 查询
    placeholders := strings.Repeat("?,", len(ids)-1) + "?"
    query := fmt.Sprintf(
        "SELECT product_id, main_image_url, sales_link, product_video, "+
            "description_images, main_images, aspect_ratio_images, pic_urls, sku_list "+
            "FROM %s WHERE product_id IN (%s)", cfg.MySQLTable, placeholders)

    // 将 []string 转为 []interface{}
    args := make([]interface{}, len(ids))
    for j, id := range ids {
        args[j] = id
    }

    rows, err := mysqlDB.Query(query, args...)
    if err != nil {
        log.Printf("批量查询失败: %v", err)
        fail.Add(int64(len(ids)))
        return
    }
    defer rows.Close()

    var batchMappings = make(map[string]string)
    var syncedIDs []string

    for rows.Next() {
        var productID string
        var mainImageURL, salesLink, productVideo sql.NullString
        var descriptionImages, mainImages, aspectRatioImages, picUrls, skuList sql.NullString

        if err := rows.Scan(&productID, &mainImageURL, &salesLink, &productVideo,
            &descriptionImages, &mainImages, &aspectRatioImages, &picUrls, &skuList); err != nil {
            log.Printf("扫描行失败: %v", err)
            continue
        }

        urls := extractURLs(productID, mainImageURL, salesLink, productVideo,
            descriptionImages, mainImages, aspectRatioImages, picUrls, skuList)

        for filename, originalURL := range urls {
            batchMappings[filename] = originalURL
        }
        syncedIDs = append(syncedIDs, productID)
    }

    // 批量写入 Redis
    if len(batchMappings) > 0 {
        if err := batchAddMappingsToRedis(batchMappings); err != nil {
            log.Printf("Redis 批量写入失败: %v", err)
            fail.Add(int64(len(syncedIDs)))
            for _, pid := range syncedIDs {
                updateSyncStatus(pid, 2)
            }
        } else {
            success.Add(int64(len(syncedIDs)))
            for _, pid := range syncedIDs {
                updateSyncStatus(pid, 1)
            }
        }
    }

    processed.Add(int64(len(ids)))
}

// 更新同步状态
func updateSyncStatus(productID string, status int) {
    query := fmt.Sprintf("UPDATE %s SET sync_status = ? WHERE product_id = ?", cfg.MySQLTable)
    if _, err := mysqlDB.Exec(query, status, productID); err != nil {
        log.Printf("更新状态失败 (product_id=%s, status=%d): %v", productID, status, err)
    }
}

// 从 JSON 字段中提取所有 URL
// 从 URL 中提取扩展名(带点),如 .jpg .mp4 .png
func getExtFromURL(rawURL string) string {
    ext := path.Ext(rawURL)
    if ext == "" {
        return ""
    }
    return ext
}

func extractURLs(productID string, mainImageURL, salesLink, productVideo sql.NullString, descriptionImages, mainImages, aspectRatioImages, picUrls, skuList sql.NullString) map[string]string {
    result := make(map[string]string)

    // 1. main_image_url (单字符串)
    if mainImageURL.Valid && mainImageURL.String != "" {
        ext := getExtFromURL(mainImageURL.String)
        if ext == "" {
            ext = ".jpg"
        }
        filename := fmt.Sprintf("%s_main_image_url%s", productID, ext)
        result[filename] = mainImageURL.String
    }

    // 2. sales_link (单字符串)
    if salesLink.Valid && salesLink.String != "" {
        ext := getExtFromURL(salesLink.String)
        if ext == "" {
            ext = ".jpg"
        }
        filename := fmt.Sprintf("%s_sales_link%s", productID, ext)
        result[filename] = salesLink.String
    }

    // 2.5 product_video (单字符串,保持原后缀)
    if productVideo.Valid && productVideo.String != "" {
        ext := getExtFromURL(productVideo.String)
        if ext == "" {
            ext = ".mp4"
        }
        filename := fmt.Sprintf("%s_product_video%s", productID, ext)
        result[filename] = productVideo.String
    }

    // 3. description_images (字符串数组)
    if descriptionImages.Valid && descriptionImages.String != "" {
        var urls []string
        if err := json.Unmarshal([]byte(descriptionImages.String), &urls); err == nil {
            for i, u := range urls {
                if u != "" {
                    ext := getExtFromURL(u)
                    if ext == "" {
                        ext = ".jpg"
                    }
                    filename := fmt.Sprintf("%s_description_images_%d%s", productID, i, ext)
                    result[filename] = u
                }
            }
        }
    }

    // 4. main_images (字符串数组)
    if mainImages.Valid && mainImages.String != "" {
        var urls []string
        if err := json.Unmarshal([]byte(mainImages.String), &urls); err == nil {
            for i, u := range urls {
                if u != "" {
                    ext := getExtFromURL(u)
                    if ext == "" {
                        ext = ".jpg"
                    }
                    filename := fmt.Sprintf("%s_main_images_%d%s", productID, i, ext)
                    result[filename] = u
                }
            }
        }
    }

    // 5. aspect_ratio_images (字符串数组)
    if aspectRatioImages.Valid && aspectRatioImages.String != "" {
        var urls []string
        if err := json.Unmarshal([]byte(aspectRatioImages.String), &urls); err == nil {
            for i, u := range urls {
                if u != "" {
                    ext := getExtFromURL(u)
                    if ext == "" {
                        ext = ".jpg"
                    }
                    filename := fmt.Sprintf("%s_aspect_ratio_images_%d%s", productID, i, ext)
                    result[filename] = u
                }
            }
        }
    }

    // 6. pic_urls (字符串数组)
    if picUrls.Valid && picUrls.String != "" {
        var urls []string
        if err := json.Unmarshal([]byte(picUrls.String), &urls); err == nil {
            for i, u := range urls {
                if u != "" {
                    ext := getExtFromURL(u)
                    if ext == "" {
                        ext = ".jpg"
                    }
                    filename := fmt.Sprintf("%s_pic_urls_%d%s", productID, i, ext)
                    result[filename] = u
                }
            }
        }
    }

    // 7. sku_list (对象数组,提取 thumbUrl)
    if skuList.Valid && skuList.String != "" {
        var skus []struct {
            ThumbUrl string `json:"thumbUrl"`
            SkuID    int64  `json:"skuId"`
        }
        if err := json.Unmarshal([]byte(skuList.String), &skus); err == nil {
            for _, sku := range skus {
                if sku.ThumbUrl != "" {
                    ext := getExtFromURL(sku.ThumbUrl)
                    if ext == "" {
                        ext = ".jpg"
                    }
                    filename := fmt.Sprintf("%s_sku_%d_thumbUrl%s", productID, sku.SkuID, ext)
                    result[filename] = sku.ThumbUrl
                }
            }
        }
    }

    return result
}

// 增量同步:检查新增数据并同步到 Redis
func incrementalSync() error {
    // 获取 Redis 中已有的最大 ID
    var _ string
    // 这里简化处理,实际可以用 Redis Sorted Set 或定期记录同步位置
    // 为了演示,我们用时间戳方式

    return nil // 增量同步需要根据业务逻辑实现
}

// ==================== 代理服务 ====================

// 代理图片请求(按 filename 查询,返回原图)
func proxyImage(w http.ResponseWriter, r *http.Request) {
    start := time.Now()

    // 提取文件名
    filename := strings.TrimPrefix(r.URL.Path, "/img/")
    filename = strings.TrimPrefix(filename, "/")
    if filename == "" {
        http.Error(w, "缺少文件名", http.StatusBadRequest)
        return
    }

    // URL 解码
    filename, err := url.QueryUnescape(filename)
    if err != nil {
        http.Error(w, "无效的文件名", http.StatusBadRequest)
        return
    }

    // 根据 filename 查询 URL
    originalURL, err := getURLByFilename(filename)
    if err != nil {
        log.Printf("[404] %s - %v", filename, err)
        http.Error(w, "图片不存在", http.StatusNotFound)
        return
    }

    // 请求原图(复用全局 httpClient)
    resp, err := httpClient.Get(originalURL)
    if err != nil {
        log.Printf("[错误] 请求原图失败: %v", err)
        http.Error(w, "无法获取图片", http.StatusBadGateway)
        return
    }
    defer resp.Body.Close()

    // 检查响应状态
    if resp.StatusCode != http.StatusOK {
        log.Printf("[错误] 原图返回状态码: %d", resp.StatusCode)
        http.Error(w, "原图不可用", resp.StatusCode)
        return
    }

    // 复制响应头
    for key, values := range resp.Header {
        for _, value := range values {
            w.Header().Add(key, value)
        }
    }

    // 设置必要的响应头
    w.Header().Set("X-Original-URL", originalURL)
    w.Header().Set("X-Proxied-By", "ImageProxy-Redis")
    w.Header().Set("Cache-Control", "public, max-age=3600")

    // 返回图片内容
    _, err = io.Copy(w, resp.Body)
    if err != nil {
        log.Printf("[错误] 传输图片失败: %v", err)
        return
    }

    log.Printf("[成功] %s -> %s (耗时: %v)", filename, originalURL, time.Since(start))
}

// 按 URL 查询并代理图片(新增:反向查询)
func proxyImageByURL(w http.ResponseWriter, r *http.Request) {
    start := time.Now()

    // 从 Query 参数获取 URL(格式:/proxy?url=https://...)
    originalURL := r.URL.Query().Get("url")
    if originalURL == "" {
        http.Error(w, "缺少 url 参数", http.StatusBadRequest)
        return
    }

    // 根据 URL 查询 filename(反向查询)
    filename, err := getFilenameByURL(originalURL)
    if err != nil {
        log.Printf("[404] URL: %s - %v", originalURL, err)
        http.Error(w, "映射不存在", http.StatusNotFound)
        return
    }

    // 请求原图(复用全局 httpClient)
    resp, err := httpClient.Get(originalURL)
    if err != nil {
        log.Printf("[错误] 请求原图失败: %v", err)
        http.Error(w, "无法获取图片", http.StatusBadGateway)
        return
    }
    defer resp.Body.Close()

    // 检查响应状态
    if resp.StatusCode != http.StatusOK {
        log.Printf("[错误] 原图返回状态码: %d", resp.StatusCode)
        http.Error(w, "原图不可用", resp.StatusCode)
        return
    }

    // 复制响应头
    for key, values := range resp.Header {
        for _, value := range values {
            w.Header().Add(key, value)
        }
    }

    // 设置必要的响应头
    w.Header().Set("X-Filename", filename)
    w.Header().Set("X-Proxied-By", "ImageProxy-Redis")
    w.Header().Set("Cache-Control", "public, max-age=3600")

    // 返回图片内容
    _, err = io.Copy(w, resp.Body)
    if err != nil {
        log.Printf("[错误] 传输图片失败: %v", err)
        return
    }

    log.Printf("[反向查询成功] URL: %s -> filename: %s (耗时: %v)", originalURL, filename, time.Since(start))
}

// ==================== API 接口 ====================

// 统计信息
func statsHandler(w http.ResponseWriter, r *http.Request) {
    count, err := getRedisMappingCount()
    if err != nil {
        http.Error(w, fmt.Sprintf("获取统计失败: %v", err), http.StatusInternalServerError)
        return
    }
    w.Write([]byte(fmt.Sprintf(`{"total_mappings": %d, "redis": "%s"}`, count, cfg.RedisAddr)))
}

// 健康检查
func healthHandler(w http.ResponseWriter, r *http.Request) {
    // 检查 Redis
    if _, err := redisClient.Ping(ctx).Result(); err != nil {
        w.Write([]byte(fmt.Sprintf(`{"status": "unhealthy", "redis": "down", "error": "%v"}`, err)))
        return
    }
    w.Write([]byte(`{"status": "healthy", "redis": "up"}`))
}

// 同步状态
func syncStatusHandler(w http.ResponseWriter, r *http.Request) {
    count, _ := getRedisMappingCount()
    w.Write([]byte(fmt.Sprintf(`{"synced_count": %d, "mysql_table": "%s", "last_sync": "manual"}`, count, cfg.MySQLTable)))
}

// 手动触发同步
func triggerSyncHandler(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        http.Error(w, "只支持 POST", http.StatusMethodNotAllowed)
        return
    }

    go func() {
        if err := fullSyncMySQLToRedis(); err != nil {
            log.Printf("同步失败: %v", err)
        }
    }()

    w.Write([]byte(`{"status": "sync_started", "message": "全量同步已启动,请查看日志"}`))
}

// ==================== 主程序 ====================
func main() {
    log.Println(`
╔══════════════════════════════════════════════════════════════╗
║              图片代理服务            ║
╚══════════════════════════════════════════════════════════════╝`)

    // 1. 初始化 Redis
    if err := initRedis(); err != nil {
        log.Fatalf("Redis 初始化失败: %v", err)
    }

    // 2. 初始化 MySQL
    if err := initMySQL(); err != nil {
        log.Fatalf("MySQL 初始化失败: %v", err)
    }

    // 3. 全量同步
    if err := fullSyncMySQLToRedis(); err != nil {
        log.Printf("全量同步失败: %v", err)
        log.Println("服务将继续启动,但 Redis 中可能没有数据")
    }

    // 4. 设置路由
    http.HandleFunc("/health", healthHandler)
    http.HandleFunc("/img/", proxyImage)
    http.HandleFunc("/proxy", proxyImageByURL) // 新增:按URL反向查询(GET /proxy?url=...)
    http.HandleFunc("/api/stats", statsHandler)
    http.HandleFunc("/api/sync", triggerSyncHandler)
    http.HandleFunc("/api/sync/status", syncStatusHandler)

    // 5. 启动服务
    addr := fmt.Sprintf(":%s", cfg.ServerPort)
    log.Printf(`
╔══════════════════════════════════════════════════════════════╗
║                    服务配置信息                              ║
╠══════════════════════════════════════════════════════════════╣
║  代理端口:   %s                                         ║
║  Redis:     %s                                   ║
║  MySQL:     %s/%s                           ║
║  表名:      %s                                     ║
╠══════════════════════════════════════════════════════════════╣
║  API 接口:                                                  ║
║  - GET  /img/{filename}   代理图片(按文件名查)           ║
║  - GET  /proxy/{url}       代理图片(按URL查,O(1))         ║
║  - GET  /api/stats        统计信息                          ║
║  - POST /api/sync         触发全量同步                      ║
║  - GET  /api/sync/status  同步状态                          ║
║  - GET  /health           健康检查                          ║
╚══════════════════════════════════════════════════════════════╝`, cfg.ServerPort, cfg.RedisAddr, cfg.MySQLAddr, cfg.MySQLDatabase, cfg.MySQLTable)

    // 优雅退出
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        if err := http.ListenAndServe(addr, nil); err != nil {
            log.Fatalf("服务启动失败: %v", err)
        }
    }()

    <-quit
    log.Println("正在关闭服务...")

    redisClient.Close()
    mysqlDB.Close()
    log.Println("服务已关闭")
}

版本三:多平台微服务隔离

在实际微服务架构中,我们可能要为不同平台(例如某多和某东)分别部署代理服务。怎么让它们在同一个 Redis 实例里和谐共处还不互相干扰?很简单:用不同的 DB 号或 key 前缀做隔离。

下面的代码就是为某东(JD)平台定制的一个变体,它监听在 8081 端口,使用 Redis 的 DB 8,分布式锁的 key 也做了区分。

package main

import (
    "context"
    "database/sql"
    "encoding/json"
    "fmt"
    "io"
    "log"
    "net/http"
    "net/url"
    "os"
    "os/signal"
    "path"
    "runtime"
    "strings"
    "sync"
    "sync/atomic"
    "syscall"
    "time"

    _ "github.com/go-sql-driver/mysql"
    "github.com/redis/go-redis/v9"
)

// ==================== 配置 ====================
type Config struct {
    ServerPort    string // 代理服务端口
    RedisAddr     string // Redis 地址
    RedisPassword string // Redis 密码(无则留空)
    RedisDB       int    // Redis DB

    MySQLAddr     string // MySQL 地址
    MySQLUser     string // MySQL 用户
    MySQLPassword string // MySQL 密码
    MySQLDatabase string // MySQL 数据库
    MySQLTable    string // MySQL 表名
    MySQLIDField  string // ID 字段名(用于生成新链接的文件名)
}

func getEnv(key, defaultVal string) string {
    if val := os.Getenv(key); val != "" {
        return val
    }
    return defaultVal
}

var cfg = Config{
    ServerPort:    "8081",                              // JD 服务端口(区别于 PDD 的 8080)
    RedisAddr:     "localhost:6379",
    RedisPassword: getEnv("REDIS_PASSWORD", ""),
    RedisDB:       8,                                   // JD 使用 DB8(PDD 使用 DB7)

    MySQLAddr:     "",
    MySQLUser:     "root",
    MySQLPassword: getEnv("MYSQL_PASSWORD", ""),
    MySQLDatabase: "jserve",
    MySQLTable:    "",                   // JD 表名
    MySQLIDField:  "product_id",
}

// ==================== 全局变量 ====================
var (
    redisClient *redis.Client
    mysqlDB     *sql.DB
    ctx         = context.Background()
)

// 全局 HTTP Client(连接复用,提升性能)
var httpClient = &http.Client{
    Timeout: 30 * time.Second,
    Transport: &http.Transport{
        MaxIdleConns:        100,
        MaxIdleConnsPerHost: 10,
        IdleConnTimeout:     90 * time.Second,
    },
}

// ==================== Redis 操作 ====================
func initRedis() error {
    redisClient = redis.NewClient(&redis.Options{
        Addr:     cfg.RedisAddr,
        Password: cfg.RedisPassword,
        DB:       cfg.RedisDB,
    })

    // 测试连接
    _, err := redisClient.Ping(ctx).Result()
    if err != nil {
        return fmt.Errorf("Redis 连接失败: %v", err)
    }

    log.Printf("Redis 连接成功: %s", cfg.RedisAddr)
    return nil
}

// 添加单条映射到 Redis(双向索引)
func addMappingToRedis(filename, originalURL string) error {
    pipe := redisClient.Pipeline()
    // 正向索引:filename -> url
    pipe.Set(ctx, "img:"+filename, originalURL, 0)
    // 反向索引:url -> filename
    pipe.Set(ctx, "rev:"+originalURL, filename, 0)
    _, err := pipe.Exec(ctx)
    return err
}

// 批量添加映射到 Redis(双向索引)
func batchAddMappingsToRedis(mappings map[string]string) error {
    if len(mappings) == 0 {
        return nil
    }

    pipe := redisClient.Pipeline()
    for filename, originalURL := range mappings {
        // 正向索引:filename -> url
        pipe.Set(ctx, "img:"+filename, originalURL, 0)
        // 反向索引:url -> filename
        pipe.Set(ctx, "rev:"+originalURL, filename, 0)
    }
    _, err := pipe.Exec(ctx)
    return err
}

// 根据 URL 查询 filename(O(1))
func getFilenameByURL(originalURL string) (string, error) {
    key := "rev:" + originalURL
    filename, err := redisClient.Get(ctx, key).Result()
    if err == redis.Nil {
        return "", fmt.Errorf("映射不存在: %s", originalURL)
    }
    if err != nil {
        return "", err
    }
    return filename, nil
}

// 根据 filename 查询 URL(O(1))
func getURLByFilename(filename string) (string, error) {
    key := "img:" + filename
    url, err := redisClient.Get(ctx, key).Result()
    if err == redis.Nil {
        return "", fmt.Errorf("映射不存在: %s", filename)
    }
    if err != nil {
        return "", err
    }
    return url, nil
}

// 获取 Redis 中的映射总数
func getRedisMappingCount() (int64, error) {
    var cursor uint64
    var count int64
    for {
        keys, nextCursor, err := redisClient.Scan(ctx, cursor, "img:*", 1000).Result()
        if err != nil {
            return count, err
        }
        count += int64(len(keys))
        cursor = nextCursor
        if cursor == 0 {
            break
        }
    }
    return count, nil
}

// ==================== MySQL 操作 ====================
func initMySQL() error {
    dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
        cfg.MySQLUser, cfg.MySQLPassword, cfg.MySQLAddr, cfg.MySQLDatabase)

    var err error
    mysqlDB, err = sql.Open("mysql", dsn)
    if err != nil {
        return fmt.Errorf("MySQL 连接失败: %v", err)
    }

    // 配置连接池(增大以支持并发)
    mysqlDB.SetMaxOpenConns(20)
    mysqlDB.SetMaxIdleConns(10)
    mysqlDB.SetConnMaxLifetime(time.Hour)

    // 测试连接
    if err = mysqlDB.Ping(); err != nil {
        return fmt.Errorf("MySQL Ping 失败: %v", err)
    }

    log.Printf("MySQL 连接成功: %s/%s", cfg.MySQLAddr, cfg.MySQLDatabase)
    return nil
}

// ==================== 同步配置 ====================
const (
    pageSize      = 2000                         // 每批处理的 product_id 数量
    redisBatch    = 5000                         // Redis Pipeline 批量大小
    syncBatchSize = 100                          // 每批查询的 product_id 数量(IN 查询限制)
    syncLockKey   = "lock:image-proxy:jd:sync"   // 分布式锁 key(JD 专用,避免与 PDD 冲突)
)

var maxWorkers = runtime.NumCPU() - 1 // 并发数(留1核给主线程)

// 尝试获取同步锁
func tryAcquireSyncLock() bool {
    ok, _ := redisClient.SetNX(ctx, syncLockKey, "1", 60*time.Minute).Result()
    return ok
}

// 释放同步锁
func releaseSyncLock() {
    redisClient.Del(ctx, syncLockKey)
}

// 全量同步:从 MySQL 导入所有数据到 Redis(并发版)
func fullSyncMySQLToRedis() error {
    // 获取分布式锁
    if !tryAcquireSyncLock() {
        log.Println("同步任务进行中,跳过本次执行")
        return nil
    }
    defer releaseSyncLock()

    log.Println("开始全量同步 MySQL -> Redis(并发模式)...")

    // 获取总待处理数
    var totalCount int
    countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE sync_status = 0", cfg.MySQLTable)
    if err := mysqlDB.QueryRow(countQuery).Scan(&totalCount); err != nil {
        return fmt.Errorf("获取总数失败: %v", err)
    }
    log.Printf("待处理记录: %d 条,并发数: %d", totalCount, maxWorkers)

    if totalCount == 0 {
        log.Println("没有需要同步的数据")
        return nil
    }

    // 1. 获取所有待处理的 product_id
    rows, err := mysqlDB.Query(fmt.Sprintf(
        "SELECT product_id FROM %s WHERE sync_status = 0", cfg.MySQLTable))
    if err != nil {
        return fmt.Errorf("获取 product_id 列表失败: %v", err)
    }
    defer rows.Close()

    var productIDs []string
    for rows.Next() {
        var pid string
        if err := rows.Scan(&pid); err != nil {
            continue
        }
        productIDs = append(productIDs, pid)
    }

    total := len(productIDs)

    // 2. 统计计数器
    var (
        processedCount atomic.Int64
        successCount   atomic.Int64
        failCount      atomic.Int64
    )

    // 3. 进度打印 goroutine
    go func() {
        ticker := time.NewTicker(5 * time.Second)
        defer ticker.Stop()
        for range ticker.C {
            p := processedCount.Load()
            s := successCount.Load()
            f := failCount.Load()
            percent := float64(p) / float64(total) * 100
            log.Printf("进度: %d/%d (%.1f%%) - 成功: %d, 失败: %d", p, total, percent, s, f)
        }
    }()

    // 4. 并发处理
    var wg sync.WaitGroup
    semaphore := make(chan struct{}, maxWorkers)

    // 按 pageSize 分批
    for i := 0; i < total; i += pageSize {
        end := i + pageSize
        if end > total {
            end = total
        }
        batch := productIDs[i:end]

        wg.Add(1)
        semaphore <- struct{}{}
        go func(ids []string) {
            defer wg.Done()
            defer func() { <-semaphore }()
            processProductBatch(ids, &processedCount, &successCount, &failCount)
        }(batch)
    }

    wg.Wait()

    log.Printf("全量同步完成!总计: %d, 成功: %d, 失败: %d",
        processedCount.Load(), successCount.Load(), failCount.Load())
    return nil
}

// 处理一批 product_id
func processProductBatch(ids []string, processed, success, fail *atomic.Int64) {
    // 分成更小的批次进行 IN 查询
    for i := 0; i < len(ids); i += syncBatchSize {
        end := i + syncBatchSize
        if end > len(ids) {
            end = len(ids)
        }
        batch := ids[i:end]
        processSyncBatch(batch, processed, success, fail)
    }
}

// 处理单个同步批次(IN 查询)
func processSyncBatch(ids []string, processed, success, fail *atomic.Int64) {
    if len(ids) == 0 {
        return
    }

    // 构造 IN 查询
    placeholders := strings.Repeat("?,", len(ids)-1) + "?"
    query := fmt.Sprintf(
        "SELECT product_id, main_image_url, sales_link, product_video, "+
            "description_images, main_images, aspect_ratio_images, pic_urls, sku_list "+
            "FROM %s WHERE product_id IN (%s)", cfg.MySQLTable, placeholders)

    // 将 []string 转为 []interface{}
    args := make([]interface{}, len(ids))
    for j, id := range ids {
        args[j] = id
    }

    rows, err := mysqlDB.Query(query, args...)
    if err != nil {
        log.Printf("批量查询失败: %v", err)
        fail.Add(int64(len(ids)))
        return
    }
    defer rows.Close()

    var batchMappings = make(map[string]string)
    var syncedIDs []string

    for rows.Next() {
        var productID string
        var mainImageURL, salesLink, productVideo sql.NullString
        var descriptionImages, mainImages, aspectRatioImages, picUrls, skuList sql.NullString

        if err := rows.Scan(&productID, &mainImageURL, &salesLink, &productVideo,
            &descriptionImages, &mainImages, &aspectRatioImages, &picUrls, &skuList); err != nil {
            log.Printf("扫描行失败: %v", err)
            continue
        }

        urls := extractURLs(productID, mainImageURL, salesLink, productVideo,
            descriptionImages, mainImages, aspectRatioImages, picUrls, skuList)

        for filename, originalURL := range urls {
            batchMappings[filename] = originalURL
        }
        syncedIDs = append(syncedIDs, productID)
    }

    // 批量写入 Redis
    if len(batchMappings) > 0 {
        if err := batchAddMappingsToRedis(batchMappings); err != nil {
            log.Printf("Redis 批量写入失败: %v", err)
            fail.Add(int64(len(syncedIDs)))
            for _, pid := range syncedIDs {
                updateSyncStatus(pid, 2)
            }
        } else {
            success.Add(int64(len(syncedIDs)))
            for _, pid := range syncedIDs {
                updateSyncStatus(pid, 1)
            }
        }
    }

    processed.Add(int64(len(ids)))
}

// 更新同步状态
func updateSyncStatus(productID string, status int) {
    query := fmt.Sprintf("UPDATE %s SET sync_status = ? WHERE product_id = ?", cfg.MySQLTable)
    if _, err := mysqlDB.Exec(query, status, productID); err != nil {
        log.Printf("更新状态失败 (product_id=%s, status=%d): %v", productID, status, err)
    }
}

// 从 URL 中提取扩展名(带点),如 .jpg .mp4 .png
func getExtFromURL(rawURL string) string {
    ext := path.Ext(rawURL)
    if ext == "" {
        return ""
    }
    return ext
}

func extractURLs(productID string, mainImageURL, salesLink, productVideo sql.NullString, descriptionImages, mainImages, aspectRatioImages, picUrls, skuList sql.NullString) map[string]string {
    result := make(map[string]string)

    // 1. main_image_url (单字符串)
    if mainImageURL.Valid && mainImageURL.String != "" {
        ext := getExtFromURL(mainImageURL.String)
        if ext == "" {
            ext = ".jpg"
        }
        filename := fmt.Sprintf("%s_main_image_url%s", productID, ext)
        result[filename] = mainImageURL.String
    }

    // 2. sales_link (单字符串)
    if salesLink.Valid && salesLink.String != "" {
        ext := getExtFromURL(salesLink.String)
        if ext == "" {
            ext = ".jpg"
        }
        filename := fmt.Sprintf("%s_sales_link%s", productID, ext)
        result[filename] = salesLink.String
    }

    // 3. product_video (单字符串,保持原后缀)
    if productVideo.Valid && productVideo.String != "" {
        ext := getExtFromURL(productVideo.String)
        if ext == "" {
            ext = ".mp4"
        }
        filename := fmt.Sprintf("%s_product_video%s", productID, ext)
        result[filename] = productVideo.String
    }

    // 4. description_images (字符串数组)
    if descriptionImages.Valid && descriptionImages.String != "" {
        var urls []string
        if err := json.Unmarshal([]byte(descriptionImages.String), &urls); err == nil {
            for i, u := range urls {
                if u != "" {
                    ext := getExtFromURL(u)
                    if ext == "" {
                        ext = ".jpg"
                    }
                    filename := fmt.Sprintf("%s_description_images_%d%s", productID, i, ext)
                    result[filename] = u
                }
            }
        }
    }

    // 5. main_images (字符串数组)
    if mainImages.Valid && mainImages.String != "" {
        var urls []string
        if err := json.Unmarshal([]byte(mainImages.String), &urls); err == nil {
            for i, u := range urls {
                if u != "" {
                    ext := getExtFromURL(u)
                    if ext == "" {
                        ext = ".jpg"
                    }
                    filename := fmt.Sprintf("%s_main_images_%d%s", productID, i, ext)
                    result[filename] = u
                }
            }
        }
    }

    // 6. aspect_ratio_images (字符串数组)
    if aspectRatioImages.Valid && aspectRatioImages.String != "" {
        var urls []string
        if err := json.Unmarshal([]byte(aspectRatioImages.String), &urls); err == nil {
            for i, u := range urls {
                if u != "" {
                    ext := getExtFromURL(u)
                    if ext == "" {
                        ext = ".jpg"
                    }
                    filename := fmt.Sprintf("%s_aspect_ratio_images_%d%s", productID, i, ext)
                    result[filename] = u
                }
            }
        }
    }

    // 7. pic_urls (字符串数组)
    if picUrls.Valid && picUrls.String != "" {
        var urls []string
        if err := json.Unmarshal([]byte(picUrls.String), &urls); err == nil {
            for i, u := range urls {
                if u != "" {
                    ext := getExtFromURL(u)
                    if ext == "" {
                        ext = ".jpg"
                    }
                    filename := fmt.Sprintf("%s_pic_urls_%d%s", productID, i, ext)
                    result[filename] = u
                }
            }
        }
    }

    // 8. sku_list (对象数组,提取 thumbUrl)
    if skuList.Valid && skuList.String != "" {
        var skus []struct {
            ThumbUrl string `json:"thumbUrl"`
            SkuID    int64  `json:"skuId"`
        }
        if err := json.Unmarshal([]byte(skuList.String), &skus); err == nil {
            for _, sku := range skus {
                if sku.ThumbUrl != "" {
                    ext := getExtFromURL(sku.ThumbUrl)
                    if ext == "" {
                        ext = ".jpg"
                    }
                    filename := fmt.Sprintf("%s_sku_%d_thumbUrl%s", productID, sku.SkuID, ext)
                    result[filename] = sku.ThumbUrl
                }
            }
        }
    }

    return result
}

// ==================== 代理服务 ====================

// 代理图片请求(按 filename 查询,返回原图)
func proxyImage(w http.ResponseWriter, r *http.Request) {
    start := time.Now()

    // 提取文件名
    filename := strings.TrimPrefix(r.URL.Path, "/img/")
    filename = strings.TrimPrefix(filename, "/")
    if filename == "" {
        http.Error(w, "缺少文件名", http.StatusBadRequest)
        return
    }

    // URL 解码
    filename, err := url.QueryUnescape(filename)
    if err != nil {
        http.Error(w, "无效的文件名", http.StatusBadRequest)
        return
    }

    // 根据 filename 查询 URL
    originalURL, err := getURLByFilename(filename)
    if err != nil {
        log.Printf("[404] %s - %v", filename, err)
        http.Error(w, "图片不存在", http.StatusNotFound)
        return
    }

    // 请求原图(复用全局 httpClient)
    resp, err := httpClient.Get(originalURL)
    if err != nil {
        log.Printf("[错误] 请求原图失败: %v", err)
        http.Error(w, "无法获取图片", http.StatusBadGateway)
        return
    }
    defer resp.Body.Close()

    // 检查响应状态
    if resp.StatusCode != http.StatusOK {
        log.Printf("[错误] 原图返回状态码: %d", resp.StatusCode)
        http.Error(w, "原图不可用", resp.StatusCode)
        return
    }

    // 复制响应头
    for key, values := range resp.Header {
        for _, value := range values {
            w.Header().Add(key, value)
        }
    }

    // 设置必要的响应头
    w.Header().Set("X-Original-URL", originalURL)
    w.Header().Set("X-Proxied-By", "ImageProxy-Redis-JD")
    w.Header().Set("Cache-Control", "public, max-age=3600")

    // 返回图片内容
    _, err = io.Copy(w, resp.Body)
    if err != nil {
        log.Printf("[错误] 传输图片失败: %v", err)
        return
    }

    log.Printf("[成功] %s -> %s (耗时: %v)", filename, originalURL, time.Since(start))
}

// 按 URL 查询并代理图片(反向查询)
func proxyImageByURL(w http.ResponseWriter, r *http.Request) {
    start := time.Now()

    // 从 Query 参数获取 URL(格式:/proxy?url=https://...)
    originalURL := r.URL.Query().Get("url")
    if originalURL == "" {
        http.Error(w, "缺少 url 参数", http.StatusBadRequest)
        return
    }

    // 根据 URL 查询 filename(反向查询)
    filename, err := getFilenameByURL(originalURL)
    if err != nil {
        log.Printf("[404] URL: %s - %v", originalURL, err)
        http.Error(w, "映射不存在", http.StatusNotFound)
        return
    }

    // 请求原图(复用全局 httpClient)
    resp, err := httpClient.Get(originalURL)
    if err != nil {
        log.Printf("[错误] 请求原图失败: %v", err)
        http.Error(w, "无法获取图片", http.StatusBadGateway)
        return
    }
    defer resp.Body.Close()

    // 检查响应状态
    if resp.StatusCode != http.StatusOK {
        log.Printf("[错误] 原图返回状态码: %d", resp.StatusCode)
        http.Error(w, "原图不可用", resp.StatusCode)
        return
    }

    // 复制响应头
    for key, values := range resp.Header {
        for _, value := range values {
            w.Header().Add(key, value)
        }
    }

    // 设置必要的响应头
    w.Header().Set("X-Filename", filename)
    w.Header().Set("X-Proxied-By", "ImageProxy-Redis-JD")
    w.Header().Set("Cache-Control", "public, max-age=3600")

    // 返回图片内容
    _, err = io.Copy(w, resp.Body)
    if err != nil {
        log.Printf("[错误] 传输图片失败: %v", err)
        return
    }

    log.Printf("[反向查询成功] URL: %s -> filename: %s (耗时: %v)", originalURL, filename, time.Since(start))
}

// ==================== API 接口 ====================

// 统计信息
func statsHandler(w http.ResponseWriter, r *http.Request) {
    count, err := getRedisMappingCount()
    if err != nil {
        http.Error(w, fmt.Sprintf("获取统计失败: %v", err), http.StatusInternalServerError)
        return
    }
    w.Write([]byte(fmt.Sprintf(`{"total_mappings": %d, "redis": "%s", "db": %d}`, count, cfg.RedisAddr, cfg.RedisDB)))
}

// 健康检查
func healthHandler(w http.ResponseWriter, r *http.Request) {
    // 检查 Redis
    if _, err := redisClient.Ping(ctx).Result(); err != nil {
        w.Write([]byte(fmt.Sprintf(`{"status": "unhealthy", "redis": "down", "error": "%v"}`, err)))
        return
    }
    w.Write([]byte(`{"status": "healthy", "redis": "up"}`))
}

// 同步状态
func syncStatusHandler(w http.ResponseWriter, r *http.Request) {
    count, _ := getRedisMappingCount()
    w.Write([]byte(fmt.Sprintf(`{"synced_count": %d, "mysql_table": "%s", "last_sync": "manual"}`, count, cfg.MySQLTable)))
}

// 手动触发同步
func triggerSyncHandler(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        http.Error(w, "只支持 POST", http.StatusMethodNotAllowed)
        return
    }

    go func() {
        if err := fullSyncMySQLToRedis(); err != nil {
            log.Printf("同步失败: %v", err)
        }
    }()

    w.Write([]byte(`{"status": "sync_started", "message": "全量同步已启动,请查看日志"}`))
}

// ==================== 主程序 ====================
func main() {
    log.Println(`
╔══════════════════════════════════════════════════════════════╗
║           图片代理服务 [JD]          ║
╚══════════════════════════════════════════════════════════════╝`)

    // 1. 初始化 Redis
    if err := initRedis(); err != nil {
        log.Fatalf("Redis 初始化失败: %v", err)
    }

    // 2. 初始化 MySQL
    if err := initMySQL(); err != nil {
        log.Fatalf("MySQL 初始化失败: %v", err)
    }

    // 3. 全量同步
    if err := fullSyncMySQLToRedis(); err != nil {
        log.Printf("全量同步失败: %v", err)
        log.Println("服务将继续启动,但 Redis 中可能没有数据")
    }

    // 4. 设置路由
    http.HandleFunc("/health", healthHandler)
    http.HandleFunc("/img/", proxyImage)
    http.HandleFunc("/proxy", proxyImageByURL) // GET /proxy?url=...
    http.HandleFunc("/api/stats", statsHandler)
    http.HandleFunc("/api/sync", triggerSyncHandler)
    http.HandleFunc("/api/sync/status", syncStatusHandler)

    // 5. 启动服务
    addr := fmt.Sprintf(":%s", cfg.ServerPort)
    log.Printf(`
╔══════════════════════════════════════════════════════════════╗
║                    服务配置信息 [JD]                         ║
╠══════════════════════════════════════════════════════════════╣
║  代理端口:   %s                                         ║
║  Redis:     %s (DB%d)                             ║
║  MySQL:     %s/%s                           ║
║  表名:      %s                                  ║
╠══════════════════════════════════════════════════════════════╣
║  API 接口:                                                  ║
║  - GET  /img/{filename}   代理图片(按文件名查)           ║
║  - GET  /proxy?url=...    代理图片(按URL查,O(1))          ║
║  - GET  /api/stats        统计信息                          ║
║  - POST /api/sync         触发全量同步                      ║
║  - GET  /api/sync/status  同步状态                          ║
║  - GET  /health           健康检查                          ║
╚══════════════════════════════════════════════════════════════╝`,
        cfg.ServerPort, cfg.RedisAddr, cfg.RedisDB, cfg.MySQLAddr, cfg.MySQLDatabase, cfg.MySQLTable)

    // 优雅退出
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        if err := http.ListenAndServe(addr, nil); err != nil {
            log.Fatalf("服务启动失败: %v", err)
        }
    }()

    <-quit
    log.Println("正在关闭服务...")

    redisClient.Close()
    mysqlDB.Close()
    log.Println("服务已关闭")
}

从内存版的小 Demo 到最终生产级的微服务,核心思想其实很清晰:用 Go 的并发能力提供高性能 HTTP 服务,用 Redis 的无敌读写速度承载海量映射关系,再用 MySQL 充当可回溯的持久层。希望这三个版本的代码能给你一些启发,帮助你在自己的项目中少走弯路。如果你对这类后端架构实战内容感兴趣,云栈社区 里还有更多深度讨论。




上一篇:Spring IOC/DI 实战详解:从XML到注解的装配、注入与作用域全解析
下一篇:法国零售商开售损坏RTX 5090显卡,售后不退不换
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-5-2 05:08 , Processed in 0.789500 second(s), 42 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表