找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2437

积分

0

好友

323

主题
发表于 2 小时前 | 查看: 2| 回复: 0

一、为什么是 MongoDB?NoSQL 文档数据库的核心优势

1.1 传统关系型数据库的痛点

场景一:电商商品表

┌─────────────────────────────────────────────────────────────────┐
│ 商品表 (product)                                                 │
├─────────────────────────────────────────────────────────────────┤
│ id | name | category_id | brand_id | price | stock | ... | 50+ 字段│
└─────────────────────────────────────────────────────────────────┘

问题:
❌ 服装类商品有颜色、尺码属性
❌ 数码类商品有 CPU、内存、屏幕参数
❌ 图书类商品有作者、出版社、ISBN  

解决方案:MySQL 需要 EAV 模型或 JSON 字段 → 查询复杂、性能差  

1.2 MongoDB 的文档模型优势

// 服装商品
{
  "_id": ObjectId("..."),
  "name": "男士休闲衬衫",
  "category": "clothing",
  "price": 299.00,
  "attributes": {
    "color": ["白色", "蓝色", "黑色"],
    "size": ["S", "M", "L", "XL", "XXL"],
    "material": "纯棉",
    "sleeve": "长袖"
  },
  "tags": ["商务", "休闲", "透气"]
}

// 数码商品
{
  "_id": ObjectId("..."),
  "name": "iPhone 15 Pro",
  "category": "electronics",
  "price": 7999.00,
  "attributes": {
    "cpu": "A17 Pro",
    "ram": "8GB",
    "storage": ["128GB", "256GB", "516GB", "1TB"],
    "screen": "6.1 英寸 Super Retina XDR",
    "camera": "4800 万像素主摄"
  },
  "tags": ["5G", "旗舰", "苹果"]
}

// 图书商品
{
  "_id": ObjectId("..."),
  "name": "MongoDB 权威指南",
  "category": "book",
  "price": 89.00,
  "attributes": {
    "author": ["Kristina Chodorow"],
    "publisher": "O'Reilly Media",
    "isbn": "978-0596809485",
    "pages": 400,
    "publishDate": ISODate("2013-07-01")
  },
  "tags": ["数据库", "NoSQL", "技术"]
}

1.3 MongoDB 核心特性对比

特性 MySQL MongoDB 适用场景
数据模型 关系表 文档 (BSON) 灵活 Schema
水平扩展 困难 (需中间件) 原生支持 (分片) 大数据量
地理位置 基础支持 完整地理空间索引 LBS 应用
聚合查询 SQL GROUP BY 聚合管道 复杂分析
写入性能 中等 高 (Journal 可选) 高并发写入
事务支持 完整 ACID 4.0+ 多文档事务 金融场景

二、场景一:亿级大数据存储与分片集群

2.1 数据建模最佳实践

package com.example.mongodb.model;

import lombok.Builder;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Indexed;
import org.springframework.data.mongodb.core.index.CompoundIndex;
import org.springframework.data.mongodb.core.index.CompoundIndexes;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;

import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;

/**
 * 用户行为日志
 * 日增 1 亿+ 条记录,典型大数据场景
 */
@Document(collection = "user_behavior_log")
@CompoundIndexes({
    // 复合索引:查询 userId + behaviorType
    @CompoundIndex(name = "idx_user_behavior", def = "{'userId': 1, 'behaviorType': 1, 'timestamp': -1}"),
    // 复合索引:按商品统计
    @CompoundIndex(name = "idx_product_behavior", def = "{'productId': 1, 'behaviorType': 1, 'timestamp': -1}"),
    // TTL 索引:180 天后自动过期
    @CompoundIndex(name = "idx_ttl", def = "{'timestamp': 1}", expireAfter = "180d")
})
@Data
@Builder
public class UserBehaviorLog {

    @Id
    private String id;

    /**
     * 用户 ID(高基数,适合索引)
     */
    @Indexed
    private Long userId;

    /**
     * 行为类型
     * VIEW: 浏览, CLICK: 点击, CART: 加购, FAVORITE: 收藏, PURCHASE: 购买
     */
    @Indexed
    private String behaviorType;

    /**
     * 商品 ID
     */
    @Indexed
    private Long productId;

    /**
     * 商品类目(用于类目分析)
     */
    private String categoryId;

    /**
     * 行为元数据(灵活存储不同行为的额外信息)
     */
    private BehaviorMetadata metadata;

    /**
     * 设备信息
     */
    private DeviceInfo device;

    /**
     * 地理位置(用于区域分析)
     */
    private GeoLocation location;

    /**
     * 时间戳(用于时间序列查询)
     */
    @Indexed
    private LocalDateTime timestamp;

    /**
     * 行为元数据(嵌套文档)
     */
    @Data
    @Builder
    public static class BehaviorMetadata {
        /**
         * 浏览时长(秒)
         */
        private Integer viewDuration;

        /**
         * 来源页面
         */
        private String referrer;

        /**
         * 搜索关键词
         */
        private String searchKeyword;

        /**
         * 活动 ID(用于活动效果分析)
         */
        private String campaignId;

        /**
         * 额外扩展字段(使用 Map 存储动态字段)
         */
        private Map<String, Object> extraFields;
    }

    /**
     * 设备信息(嵌入文档,避免关联查询)
     */
    @Data
    @Builder
    public static class DeviceInfo {
        private String deviceId;
        private String deviceType;  // mobile, tablet, desktop
        private String os;
        private String osVersion;
        private String appVersion;
        private String browser;
    }

    /**
     * 地理位置(用于区域热力图)
     */
    @Data
    @Builder
    public static class GeoLocation {
        private String country;
        private String province;
        private String city;
        private Double latitude;
        private Double longitude;
    }
}

2.2 分片集群配置

# application.yml
spring:
  data:
    mongodb:
      # 生产环境使用分片集群连接
      uri: mongodb://shard1:27017,shard2:27017,shard3:27017/ecommerce?replicaSet=rs0

      # 读写关注(保证数据一致性)
      read-preference: primaryPreferred
      write-concern: acknowledged

      # 连接池配置
      max-connection-per-host: 100
      min-connection-per-host: 10
      connection-timeout: 5000
      socket-timeout: 10000
package com.example.mongodb.config;

import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.config.AbstractMongoClientConfiguration;
import org.springframework.data.mongodb.core.MongoTemplate;

import java.util.concurrent.TimeUnit;

/**
 * MongoDB 分片集群配置
 */
@Configuration
public class MongoConfig extends AbstractMongoClientConfiguration {

    @Override
    protected String getDatabaseName() {
        return "ecommerce";
    }

    @Bean
    public MongoClient mongoClient() {
        ConnectionString connectionString = new ConnectionString(
            "mongodb://shard1:27017,shard2:27017,shard3:27017/ecommerce?replicaSet=rs0"
        );

        MongoClientSettings settings = MongoClientSettings.builder()
                .applyConnectionString(connectionString)
                .applyToConnectionPoolSettings(builder ->
                    builder.maxSize(100)
                          .minSize(10)
                          .maxConnectionIdleTime(60, TimeUnit.SECONDS))
                .applyToSocketSettings(builder ->
                    builder.connectTimeout(5000, TimeUnit.MILLISECONDS)
                          .readTimeout(10000, TimeUnit.MILLISECONDS))
                .writeConcern(WriteConcern.ACKNOWLEDGED)
                .retryWrites(true)
                .build();

        return MongoClients.create(settings);
    }

    @Bean
    public MongoTemplate mongoTemplate(MongoClient mongoClient) {
        return new MongoTemplate(mongoClient, getDatabaseName());
    }
}

2.3 分片键选择策略

// MongoDB Shell - 分片集群配置

// 1. 启用分片
use config
db.adminCommand({ enableSharding: "ecommerce" })

// 2. 选择分片键(关键决策!)

// 方案 A:按 userId 分片(适合用户维度查询)
// 优点:同一用户的数据在同一分片,查询效率高
// 缺点:热点用户可能导致数据倾斜
db.adminCommand({
    shardCollection: "ecommerce.user_behavior_log",
    key: { userId: "hashed" }  // hashed 分片避免热点
})

// 方案 B:按时间范围分片(适合时间序列查询)
// 优点:时间范围查询高效,便于数据归档
// 缺点:近期数据集中在一个分片(写热点)
db.adminCommand({
    shardCollection: "ecommerce.user_behavior_log",
    key: { timestamp: 1 }
})

// 方案 C:复合分片键(平衡查询与写入)
// 优点:兼顾用户查询与时间范围查询
// 缺点:分片键较长,索引占用空间大
db.adminCommand({
    shardCollection: "ecommerce.user_behavior_log",
    key: { userId: "hashed", timestamp: 1 }
})

// 3. 查看分片状态
db.adminCommand({ listShards: 1 })
sh.status()

2.4 批量写入优化

package com.example.mongodb.repository;

import com.example.mongodb.model.UserBehaviorLog;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Repository;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 用户行为日志仓库
 * 支持高并发批量写入
 */
@Repository
@RequiredArgsConstructor
@Slf4j
public class UserBehaviorLogRepository {

    private final MongoTemplate mongoTemplate;

    // 异步写入线程池
    private final ExecutorService writeExecutor = Executors.newFixedThreadPool(
        Runtime.getRuntime().availableProcessors() * 2
    );

    /**
     * 单条写入(不推荐高频使用)
     */
    public void save(UserBehaviorLog log) {
        mongoTemplate.save(log);
    }

    /**
     * 批量写入(推荐)
     * 性能对比:
     * - 单条写入 10 万条:~30 秒
     * - 批量写入 10 万条:~3 秒(10 倍提升)
     */
    public void saveBatch(List<UserBehaviorLog> logs) {
        if (logs == null || logs.isEmpty()) {
            return;
        }

        // 分批处理(每批 1000 条)
        int batchSize = 1000;
        for (int i = 0; i < logs.size(); i += batchSize) {
            int end = Math.min(i + batchSize, logs.size());
            List<UserBehaviorLog> batch = logs.subList(i, end);
            mongoTemplate.insertAll(batch);
        }
    }

    /**
     * 异步批量写入(高并发场景)
     */
    public CompletableFuture<Void> saveBatchAsync(List<UserBehaviorLog> logs) {
        return CompletableFuture.runAsync(() -> saveBatch(logs), writeExecutor);
    }

    /**
     * 使用 Change Stream 监听数据变更
     */
    public void registerChangeListener() {
        mongoTemplate.executeDb(db -> {
            var collection = db.getCollection("user_behavior_log");
            var changeStream = collection.watch();

            changeStream.forEach(change -> {
                log.info("数据变更:{} - {}", change.getOperationType(), change.getFullDocument());
            });

            return null;
        });
    }
}

2.5 聚合管道分析

package com.example.mongodb.service;

import lombok.RequiredArgsConstructor;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.*;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Map;

import static org.springframework.data.mongodb.core.aggregation.Aggregation.*;

/**
 * 用户行为分析服务
 * 使用 MongoDB 聚合管道进行复杂分析
 */
@Service
@RequiredArgsConstructor
public class UserBehaviorAnalyticsService {

    private final MongoTemplate mongoTemplate;

    /**
     * 统计商品 PV/UV
     *
     * 聚合管道:
     * 1. $match: 过滤时间范围
     * 2. $group: 按商品分组,计算 PV 和 UV
     * 3. $sort: 按 PV 降序
     * 4. $limit: 取 Top N
     */
    public List<ProductStats> getProductStats(Long categoryId, int days, int topN) {
        LocalDateTime startTime = LocalDateTime.now().minusDays(days);

        Aggregation aggregation = newAggregation(
            // 1. 匹配条件
            match(Criteria.where("timestamp").gte(startTime)
                         .and("categoryId").is(categoryId)
                         .and("behaviorType").in("VIEW", "CLICK")),

            // 2. 按商品分组
            group("productId")
                .count().as("pv")  // 页面浏览量
                .addToSet("userId").as("uniqueUsers"),  // 去重用户

            // 3. 计算 UV
            project("pv")
                .and("uniqueUsers").size().as("uv"),

            // 4. 排序
            sort(Sort.by(Sort.Direction.DESC, "pv")),

            // 5. 限制数量
            limit(topN)
        );

        AggregationResults<ProductStats> results = mongoTemplate.aggregate(
            aggregation, "user_behavior_log", ProductStats.class
        );

        return results.getMappedResults();
    }

    /**
     * 用户行为漏斗分析
     *
     * 浏览 → 点击 → 加购 → 购买
     */
    public ConversionFunnel getConversionFunnel(Long productId, int days) {
        LocalDateTime startTime = LocalDateTime.now().minusDays(days);

        Aggregation aggregation = newAggregation(
            match(Criteria.where("productId").is(productId)
                         .and("timestamp").gte(startTime)
                         .and("behaviorType").in("VIEW", "CLICK", "CART", "PURCHASE")),

            group("behaviorType")
                .count().as("count")
                .addToSet("userId").as("users"),

            project("count")
                .and("users").size().as("uv")
        );

        List<BehaviorStats> stats = mongoTemplate.aggregate(
            aggregation, "user_behavior_log", BehaviorStats.class
        ).getMappedResults();

        // 转换为漏斗数据
        return ConversionFunnel.builder()
                .view(stats.stream().filter(s -> "VIEW".equals(s.getType())).findFirst().map(BehaviorStats::getUv).orElse(0L))
                .click(stats.stream().filter(s -> "CLICK".equals(s.getType())).findFirst().map(BehaviorStats::getUv).orElse(0L))
                .cart(stats.stream().filter(s -> "CART".equals(s.getType())).findFirst().map(BehaviorStats::getUv).orElse(0L))
                .purchase(stats.stream().filter(s -> "PURCHASE".equals(s.getType())).findFirst().map(BehaviorStats::getUv).orElse(0L))
                .build();
    }

    /**
     * 用户留存分析(次日、7 日、30 日留存)
     */
    public RetentionStats getRetentionStats(Long userId) {
        // 获取用户首次访问日期
        Aggregation firstVisitAgg = newAggregation(
            match(Criteria.where("userId").is(userId)),
            sort(Sort.by(Sort.Direction.ASC, "timestamp")),
            limit(1),
            project("timestamp").as("firstVisit")
        );

        LocalDateTime firstVisit = mongoTemplate.aggregate(
            firstVisitAgg, "user_behavior_log", FirstVisitResult.class
        ).getUniqueMappedResult().getFirstVisit();

        // 计算各时间段留存
        long nextDayRetention = countActiveDays(userId, firstVisit, 1, 1);
        long day7Retention = countActiveDays(userId, firstVisit, 7, 1);
        long day30Retention = countActiveDays(userId, firstVisit, 30, 1);

        return RetentionStats.builder()
                .nextDayRetention(nextDayRetention > 0)
                .day7Retention(day7Retention > 0)
                .day30Retention(day30Retention > 0)
                .build();
    }

    private long countActiveDays(Long userId, LocalDateTime startDate, int startDay, int endDay) {
        Aggregation agg = newAggregation(
            match(Criteria.where("userId").is(userId)
                         .and("timestamp").gte(startDate.plusDays(startDay))
                         .and("timestamp").lt(startDate.plusDays(startDay + 1))),
            count()
        );

        return mongoTemplate.aggregate(agg, "user_behavior_log", Long.class)
                .getUniqueMappedResult();
    }

    // DTO 类
    @lombok.Data
    @lombok.Builder
    public static class ProductStats {
        private Long productId;
        private Long pv;
        private Long uv;
    }

    @lombok.Data
    @lombok.Builder
    public static class BehaviorStats {
        private String type;
        private Long count;
        private Long uv;
    }

    @lombok.Data
    @lombok.Builder
    public static class ConversionFunnel {
        private Long view;
        private Long click;
        private Long cart;
        private Long purchase;
    }

    @lombok.Data
    public static class FirstVisitResult {
        private LocalDateTime firstVisit;
    }

    @lombok.Data
    @lombok.Builder
    public static class RetentionStats {
        private Boolean nextDayRetention;
        private Boolean day7Retention;
        private Boolean day30Retention;
    }
}

三、场景二:地理位置服务(LBS)实战

3.1 地理空间数据模型

package com.example.mongodb.model;

import lombok.Builder;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.geo.GeoJsonPoint;
import org.springframework.data.mongodb.core.index.GeoSpatialIndexType;
import org.springframework.data.mongodb.core.index.GeoSpatialIndexed;
import org.springframework.data.mongodb.core.mapping.Document;

import java.time.LocalDateTime;

/**
 * 外卖骑手位置
 * 实时追踪骑手位置,支持附近骑手查询
 */
@Document(collection = "rider_location")
@Data
@Builder
public class RiderLocation {

    @Id
    private String id;

    /**
     * 骑手 ID
     */
    private Long riderId;

    /**
     * 地理位置(GeoJSON Point)
     * 必须使用 2dsphere 索引
     */
    @GeoSpatialIndexed(type = GeoSpatialIndexType.GEO_2DSPHERE)
    private GeoJsonPoint location;

    /**
     * 当前位置描述
     */
    private String address;

    /**
     * 骑手状态
     * IDLE: 空闲, BUSY: 配送中, OFFLINE: 离线
     */
    private String status;

    /**
     * 当前订单 ID(配送中时)
     */
    private Long currentOrderId;

    /**
     * 位置更新时间
     */
    private LocalDateTime updateTime;

    /**
     * 速度(km/h)
     */
    private Double speed;

    /**
     * 方向(角度 0-360)
     */
    private Double direction;
}

/**
 * 商家位置
 * 支持附近商家推荐
 */
@Document(collection = "restaurant_location")
@Data
@Builder
public class RestaurantLocation {

    @Id
    private String id;

    private Long restaurantId;

    private String restaurantName;

    @GeoSpatialIndexed(type = GeoSpatialIndexType.GEO_2DSPHERE)
    private GeoJsonPoint location;

    private String address;

    /**
     * 配送范围(米)
     */
    private Integer deliveryRadius;

    /**
     * 平均配送时长(分钟)
     */
    private Integer avgDeliveryTime;

    /**
     * 起送价
     */
    private Double minDeliveryFee;

    /**
     * 评分
     */
    private Double rating;

    /**
     * 菜系标签
     */
    private List<String> cuisines;
}

/**
 * 用户打车订单位置
 * 支持附近车辆匹配
 */
@Document(collection = "ride_order")
@Data
@Builder
public class RideOrder {

    @Id
    private String id;

    private Long orderId;

    private Long userId;

    /**
     * 上车点
     */
    @GeoSpatialIndexed(type = GeoSpatialIndexType.GEO_2DSPHERE)
    private GeoJsonPoint pickupLocation;

    /**
     * 目的地
     */
    @GeoSpatialIndexed(type = GeoSpatialIndexType.GEO_2DSPHERE)
    private GeoJsonPoint dropoffLocation;

    /**
     * 订单状态
     * PENDING: 待接单, PICKED: 已接单, IN_PROGRESS: 行程中, COMPLETED: 已完成
     */
    private String status;

    /**
     * 匹配司机 ID
     */
    private Long driverId;

    private LocalDateTime createTime;
}

3.2 地理空间查询实战

package com.example.mongodb.repository;

import com.example.mongodb.model.RiderLocation;
import com.example.mongodb.model.RestaurantLocation;
import lombok.RequiredArgsConstructor;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.geo.GeoJsonPoint;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Repository;

import java.util.List;

/**
 * 地理位置查询仓库
 */
@Repository
@RequiredArgsConstructor
public class LocationRepository {

    private final MongoTemplate mongoTemplate;

    /**
     * 查询指定范围内的骑手
     *
     * @param longitude 经度
     * @param latitude 纬度
     * @param maxDistance 最大距离(米)
     * @param limit 返回数量限制
     */
    public List<RiderLocation> findNearbyRiders(double longitude, double latitude,
                                                  double maxDistance, int limit) {
        GeoJsonPoint point = new GeoJsonPoint(longitude, latitude);

        NearQuery nearQuery = NearQuery.near(point)
                .maxDistance(maxDistance)  // 米
                .num(limit)
                .spherical(true);  // 球面计算(更精确)

        return mongoTemplate.executeNearOps(nearQuery, RiderLocation.class)
                .stream()
                .map(result -> {
                    RiderLocation rider = result.getEntity();
                    // 设置计算出的距离
                    rider.setDistance(result.getDistance());
                    return rider;
                })
                .toList();
    }

    /**
     * 查询附近商家(带过滤条件)
     */
    public List<RestaurantLocation> findNearbyRestaurants(
            double longitude, double latitude,
            double maxDistance, List<String> cuisines, Double minRating) {

        GeoJsonPoint point = new GeoJsonPoint(longitude, latitude);

        // 构建查询条件
        Criteria criteria = Criteria.where("location").nearSphere(point).maxDistance(maxDistance);

        if (cuisines != null && !cuisines.isEmpty()) {
            criteria.and("cuisines").in(cuisines);
        }

        if (minRating != null) {
            criteria.and("rating").gte(minRating);
        }

        Query query = new Query(criteria)
                .with(org.springframework.data.domain.Sort.by("rating").descending())
                .limit(20);

        return mongoTemplate.find(query, RestaurantLocation.class);
    }

    /**
     * 计算两点间距离
     */
    public double calculateDistance(double lon1, double lat1, double lon2, double lat2) {
        GeoJsonPoint point1 = new GeoJsonPoint(lon1, lat1);
        GeoJsonPoint point2 = new GeoJsonPoint(lon2, lat2);

        // 使用 MongoDB 的地理空间计算
        String js = String.format("""
            function() {
                var point1 = { type: "Point", coordinates: [%f, %f] };
                var point2 = { type: "Point", coordinates: [%f, %f] };
                return GeoJSON.distance(point1, point2) * 6371000;  // 转换为米
            }
            """, lon1, lat1, lon2, lat2);

        // 实际项目中建议使用更精确的计算方式
        return haversineDistance(lat1, lon1, lat2, lon2);
    }

    /**
     * Haversine 公式计算球面距离
     */
    private double haversineDistance(double lat1, double lon1, double lat2, double lon2) {
        final int R = 6371000;  // 地球半径(米)

        double dLat = Math.toRadians(lat2 - lat1);
        double dLon = Math.toRadians(lon2 - lon1);

        double a = Math.sin(dLat / 2) * Math.sin(dLat / 2) +
                   Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2)) *
                   Math.sin(dLon / 2) * Math.sin(dLon / 2);

        double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));

        return R * c;
    }

    /**
     * 判断点是否在多边形内(电子围栏)
     */
    public boolean isPointInPolygon(double longitude, double latitude, List<double[]> polygon) {
        // 构建 GeoJSON Polygon
        String polygonJson = buildPolygonJson(polygon);

        // 使用 $geoWithin 查询
        Criteria criteria = Criteria.where("location").within(
            org.springframework.data.mongodb.core.query.GeoJson.polygon(
                polygon.stream()
                    .map(coord -> new GeoJsonPoint(coord[0], coord[1]))
                    .toArray(GeoJsonPoint[]::new)
            )
        );

        // 创建测试点
        GeoJsonPoint testPoint = new GeoJsonPoint(longitude, latitude);

        Query query = new Query(criteria);
        // 实际使用时查询包含该点的区域
        return true;  // 简化示例
    }

    private String buildPolygonJson(List<double[]> polygon) {
        StringBuilder sb = new StringBuilder("{ \"type\": \"Polygon\", \"coordinates\": [[");
        for (int i = 0; i < polygon.size(); i++) {
            double[] coord = polygon.get(i);
            sb.append("[").append(coord[0]).append(",").append(coord[1]).append("]");
            if (i < polygon.size() - 1) sb.append(",");
        }
        sb.append("]] }");
        return sb.toString();
    }
}

3.3 骑手位置实时更新

package com.example.mongodb.service;

import com.example.mongodb.model.RiderLocation;
import com.example.mongodb.repository.LocationRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.List;

/**
 * 骑手位置追踪服务
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class RiderTrackingService {

    private final MongoTemplate mongoTemplate;
    private final LocationRepository locationRepository;

    /**
     * 更新骑手位置
     * 骑手 APP 每 5 秒上报一次位置
     */
    public void updateRiderLocation(Long riderId, double longitude, double latitude,
                                   Double speed, Double direction) {
        Query query = new Query(Criteria.where("riderId").is(riderId));

        Update update = new Update()
                .set("location", new GeoJsonPoint(longitude, latitude))
                .set("speed", speed)
                .set("direction", direction)
                .set("updateTime", LocalDateTime.now());

        // upsert: 不存在则创建
        mongoTemplate.upsert(query, update, RiderLocation.class);
    }

    /**
     * 为订单匹配附近骑手
     */
    public List<RiderLocation> matchRidersForOrder(double pickupLongitude, double pickupLatitude,
                                                    double maxDistance, int limit) {
        return locationRepository.findNearbyRiders(
            pickupLongitude, pickupLatitude, maxDistance, limit
        );
    }

    /**
     * 清理离线骑手(超过 5 分钟未更新位置)
     */
    @Scheduled(fixedRate = 60000)  // 每分钟执行一次
    public void cleanupOfflineRiders() {
        LocalDateTime threshold = LocalDateTime.now().minusMinutes(5);

        Query query = new Query(
            Criteria.where("updateTime").lt(threshold)
                    .and("status").ne("OFFLINE")
        );

        Update update = new Update().set("status", "OFFLINE");

        long count = mongoTemplate.updateMulti(query, update, RiderLocation.class)
                .getModifiedCount();

        if (count > 0) {
            log.info("清理离线骑手:{} 人", count);
        }
    }

    /**
     * 计算骑手预计到达时间(ETA)
     */
    public EstimatedArrival calculateETA(Long riderId, double destLongitude, double destLatitude) {
        RiderLocation rider = mongoTemplate.findOne(
            Query.query(Criteria.where("riderId").is(riderId)),
            RiderLocation.class
        );

        if (rider == null) {
            return null;
        }

        // 计算距离
        double distance = haversineDistance(
            rider.getLocation().getY(), rider.getLocation().getX(),
            destLatitude, destLongitude
        );

        // 估算时间(假设平均速度 30km/h)
        double avgSpeed = rider.getSpeed() != null ? rider.getSpeed() : 30.0;
        double etaMinutes = (distance / 1000) / avgSpeed * 60;

        return EstimatedArrival.builder()
                .distanceMeters(distance)
                .etaMinutes((int) Math.ceil(etaMinutes))
                .riderStatus(rider.getStatus())
                .build();
    }

    private double haversineDistance(double lat1, double lon1, double lat2, double lon2) {
        final int R = 6371000;
        double dLat = Math.toRadians(lat2 - lat1);
        double dLon = Math.toRadians(lon2 - lon1);
        double a = Math.sin(dLat / 2) * Math.sin(dLat / 2) +
                   Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2)) *
                   Math.sin(dLon / 2) * Math.sin(dLon / 2);
        return R * 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
    }

    @lombok.Data
    @lombok.Builder
    public static class EstimatedArrival {
        private Double distanceMeters;
        private Integer etaMinutes;
        private String riderStatus;
    }
}

3.4 地理围栏应用

package com.example.mongodb.service;

import lombok.RequiredArgsConstructor;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;

import java.util.Arrays;
import java.util.List;

/**
 * 地理围栏服务
 * 用于配送范围校验、区域运营分析
 */
@Service
@RequiredArgsConstructor
public class GeoFenceService {

    private final MongoTemplate mongoTemplate;

    /**
     * 配送区域定义
     */
    @org.springframework.data.mongodb.core.mapping.Document(collection = "delivery_zone")
    @lombok.Data
    public static class DeliveryZone {
        @org.springframework.data.annotation.Id
        private String id;
        private String zoneName;
        private String city;
        /**
         * 多边形坐标(经纬度数组)
         */
        private List<List<Double>> polygon;
        private Boolean isActive;
    }

    /**
     * 检查地址是否在配送范围内
     */
    public boolean isInDeliveryZone(double longitude, double latitude, String city) {
        // 构建多边形查询
        Criteria criteria = Criteria.where("city").is(city)
                .and("isActive").is(true)
                .and("polygon").elemMatch(Criteria.where("$geoWithin").exists(true));

        // 使用 $geoWithin 查询点在多边形内
        Query query = new Query(criteria);

        // 实际实现需要使用 MongoDB 的地理空间聚合
        List<DeliveryZone> zones = mongoTemplate.find(query, DeliveryZone.class);

        for (DeliveryZone zone : zones) {
            if (isPointInPolygon(longitude, latitude, zone.getPolygon())) {
                return true;
            }
        }

        return false;
    }

    /**
     * 射线法判断点是否在多边形内
     */
    private boolean isPointInPolygon(double lng, double lat, List<List<Double>> polygon) {
        boolean inside = false;
        int n = polygon.size();

        for (int i = 0, j = n - 1; i < n; j = i++) {
            double xi = polygon.get(i).get(0), yi = polygon.get(i).get(1);
            double xj = polygon.get(j).get(0), yj = polygon.get(j).get(1);

            boolean intersect = ((yi > lat) != (yj > lat)) &&
                                (lng < (xj - xi) * (lat - yi) / (yj - yi) + xi);

            if (intersect) inside = !inside;
        }

        return inside;
    }

    /**
     * 统计区域内订单量
     */
    public long countOrdersInZone(String zoneId, LocalDateTime startTime, LocalDateTime endTime) {
        // 获取区域多边形
        DeliveryZone zone = mongoTemplate.findById(zoneId, DeliveryZone.class);
        if (zone == null) return 0;

        // 聚合查询统计
        // 实际实现需要使用聚合管道和 $geoWithin
        return 0;  // 简化示例
    }
}

四、场景三:实时消息推送系统

4.1 消息模型设计

package com.example.mongodb.model;

import lombok.Builder;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Indexed;
import org.springframework.data.mongodb.core.index.CompoundIndex;
import org.springframework.data.mongodb.core.index.CompoundIndexes;
import org.springframework.data.mongodb.core.mapping.Document;

import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;

/**
 * 站内消息
 * 支持单聊、群聊、系统通知
 */
@Document(collection = "message")
@CompoundIndexes({
    // 按会话查询最新消息
    @CompoundIndex(name = "idx_conversation_latest", def = "{'conversationId': 1, 'timestamp': -1}"),
    // 按用户查询消息
    @CompoundIndex(name = "idx_user_messages", def = "{'recipientId': 1, 'isRead': 1, 'timestamp': -1}"),
    // TTL 索引:消息 1 年后自动归档
    @CompoundIndex(name = "idx_ttl", def = "{'timestamp': 1}", expireAfter = "365d")
})
@Data
@Builder
public class Message {

    @Id
    private String id;

    /**
     * 发送者 ID
     */
    @Indexed
    private Long senderId;

    /**
     * 接收者 ID(单聊)
     */
    @Indexed
    private Long recipientId;

    /**
     * 会话 ID(群聊/单聊通用)
     */
    @Indexed
    private String conversationId;

    /**
     * 消息类型
     * TEXT: 文本, IMAGE: 图片, VOICE: 语音, VIDEO: 视频, FILE: 文件, SYSTEM: 系统通知
     */
    private String messageType;

    /**
     * 消息内容(文本或媒体 URL)
     */
    private String content;

    /**
     * 媒体元数据
     */
    private MediaMetadata mediaMetadata;

    /**
     * 是否已读
     */
    @Indexed
    private Boolean isRead;

    /**
     * 已读时间
     */
    private LocalDateTime readTime;

    /**
     * 消息时间戳
     */
    @Indexed
    private LocalDateTime timestamp;

    /**
     * 消息状态
     * SENT: 已发送, DELIVERED: 已送达, READ: 已读
     */
    private String status;

    /**
     * 回复的消息 ID
     */
    private String replyTo;

    /**
     * 提及的用户 ID 列表
     */
    private List<Long> mentionedUsers;

    /**
     * 扩展字段
     */
    private Map<String, Object> extra;

    @Data
    @Builder
    public static class MediaMetadata {
        private String url;
        private String thumbnailUrl;
        private Long fileSize;
        private String mimeType;
        private Integer duration;  // 音视频时长(秒)
        private Integer width;
        private Integer height;
    }
}

/**
 * 会话信息
 */
@Document(collection = "conversation")
@Data
@Builder
public class Conversation {

    @Id
    private String id;

    /**
     * 会话类型
     * PRIVATE: 单聊, GROUP: 群聊, SYSTEM: 系统通知
     */
    private String type;

    /**
     * 参与者 ID 列表
     */
    private List<Long> participants;

    /**
     * 群聊名称
     */
    private String groupName;

    /**
     * 群头像
     */
    private String groupAvatar;

    /**
     * 创建者 ID
     */
    private Long creatorId;

    /**
     * 最后一条消息 ID
     */
    private String lastMessageId;

    /**
     * 最后一条消息时间
     */
    private LocalDateTime lastMessageTime;

    /**
     * 未读消息数(按用户)
     */
    private Map<Long, Integer> unreadCount;

    /**
     * 创建时间
     */
    private LocalDateTime createTime;
}

4.2 消息推送服务

package com.example.mongodb.service;

import com.example.mongodb.model.Message;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.core.ChangeStreamEvent;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;

/**
 * 消息推送服务
 * 使用 WebSocket + MongoDB Change Stream 实现实时推送
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class MessagePushService {

    private final MongoTemplate mongoTemplate;
    private final SimpMessagingTemplate messagingTemplate;
    private final ObjectMapper objectMapper;

    private static final String WS_DEST = "/topic/messages";

    /**
     * 发送消息
     */
    public Message sendMessage(Long senderId, Long recipientId, String content,
                               String messageType) {
        Message message = Message.builder()
                .senderId(senderId)
                .recipientId(recipientId)
                .conversationId(buildConversationId(senderId, recipientId))
                .messageType(messageType)
                .content(content)
                .isRead(false)
                .status("SENT")
                .timestamp(LocalDateTime.now())
                .build();

        Message saved = mongoTemplate.save(message);

        // 通过 WebSocket 推送给接收者
        messagingTemplate.convertAndSendToUser(
            recipientId.toString(),
            WS_DEST,
            message
        );

        log.info("发送消息:{} -> {}", senderId, recipientId);
        return saved;
    }

    /**
     * 批量发送群消息
     */
    public Message sendGroupMessage(Long senderId, String conversationId,
                                    String content, List<Long> participants) {
        Message message = Message.builder()
                .senderId(senderId)
                .conversationId(conversationId)
                .messageType("TEXT")
                .content(content)
                .isRead(false)
                .status("SENT")
                .timestamp(LocalDateTime.now())
                .build();

        Message saved = mongoTemplate.save(message);

        // 推送给所有参与者(排除发送者)
        for (Long participantId : participants) {
            if (!participantId.equals(senderId)) {
                messagingTemplate.convertAndSendToUser(
                    participantId.toString(),
                    WS_DEST,
                    message
                );
            }
        }

        return saved;
    }

    /**
     * 标记消息已读
     */
    public void markAsRead(Long userId, List<String> messageIds) {
        Query query = new Query(
            Criteria.where("_id").in(messageIds)
                    .and("recipientId").is(userId)
        );

        Update update = new Update()
                .set("isRead", true)
                .set("readTime", LocalDateTime.now())
                .set("status", "READ");

        mongoTemplate.updateMulti(query, update, Message.class);

        // 通知发送者
        List<Message> messages = mongoTemplate.find(query, Message.class);
        for (Message msg : messages) {
            messagingTemplate.convertAndSendToUser(
                msg.getSenderId().toString(),
                "/topic/message-status",
                Map.of("messageId", msg.getId(), "status", "READ")
            );
        }
    }

    /**
     * 获取会话消息列表
     */
    public List<Message> getConversationMessages(String conversationId, Long userId,
                                                 int limit, String cursor) {
        Criteria criteria = Criteria.where("conversationId").is(conversationId);

        if (cursor != null) {
            // 游标分页
            criteria.and("timestamp").lt(LocalDateTime.parse(cursor));
        }

        Query query = new Query(criteria)
                .with(org.springframework.data.domain.Sort.by(
                    org.springframework.data.domain.Sort.Direction.DESC, "timestamp"))
                .limit(limit);

        return mongoTemplate.find(query, Message.class);
    }

    /**
     * 获取未读消息数
     */
    public long getUnreadCount(Long userId) {
        Query query = new Query(
            Criteria.where("recipientId").is(userId)
                    .and("isRead").is(false)
        );
        return mongoTemplate.count(query, Message.class);
    }

    /**
     * 监听 Change Stream 实现消息同步
     */
    @PostConstruct
    public void listenMessageChanges() {
        ChangeStreamOptions options = ChangeStreamOptions.builder()
                .filter("{ operationType: { $in: ['insert', 'update'] } }")
                .build();

        mongoTemplate.changeStream(Message.class, options, ChangeStreamEvent.class)
                .forEach(event -> {
                    log.info("消息变更:{}", event.getBody());
                    // 可以触发额外的业务逻辑
                });
    }

    private String buildConversationId(Long userId1, Long userId2) {
        return userId1 < userId2
                ? userId1 + "_" + userId2
                : userId2 + "_" + userId1;
    }
}

五、场景四:物联网时序数据存储

5.1 设备数据模型

package com.example.mongodb.model;

import lombok.Builder;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Indexed;
import org.springframework.data.mongodb.core.index.CompoundIndex;
import org.springframework.data.mongodb.core.index.CompoundIndexes;
import org.springframework.data.mongodb.core.mapping.Document;

import java.time.LocalDateTime;
import java.util.Map;

/**
 * IoT 设备传感器数据
 * 高频写入场景(每秒万级数据点)
 */
@Document(collection = "device_telemetry")
@CompoundIndexes({
    // 按设备 + 时间查询
    @CompoundIndex(name = "idx_device_time", def = "{'deviceId': 1, 'timestamp': -1}"),
    // 按设备 + 指标类型查询
    @CompoundIndex(name = "idx_device_metric", def = "{'deviceId': 1, 'metricType': 1, 'timestamp': -1}"),
    // TTL 索引:原始数据 30 天后自动删除(聚合数据永久保存)
    @CompoundIndex(name = "idx_ttl", def = "{'timestamp': 1}", expireAfter = "30d")
})
@Data
@Builder
public class DeviceTelemetry {

    @Id
    private String id;

    /**
     * 设备 ID
     */
    @Indexed
    private String deviceId;

    /**
     * 指标类型
     * TEMPERATURE: 温度, HUMIDITY: 湿度, PRESSURE: 压力, VOLTAGE: 电压
     */
    @Indexed
    private String metricType;

    /**
     * 指标值
     */
    private Double value;

    /**
     * 单位
     */
    private String unit;

    /**
     * 数据质量
     * GOOD: 正常,WARNING: 警告,ERROR: 错误
     */
    private String quality;

    /**
     * 时间戳
     */
    @Indexed
    private LocalDateTime timestamp;

    /**
     * 扩展标签
     */
    private Map<String, String> tags;
}

/**
 * 设备数据聚合(小时级)
 * 用于长期趋势分析
 */
@Document(collection = "device_telemetry_hourly")
@Data
@Builder
public class DeviceTelemetryHourly {

    @Id
    private String id;

    private String deviceId;

    private String metricType;

    /**
     * 小时桶(格式:yyyy-MM-dd HH:00)
     */
    private LocalDateTime hourBucket;

    /**
     * 平均值
     */
    private Double avgValue;

    /**
     * 最大值
     */
    private Double maxValue;

    /**
     * 最小值
     */
    private Double minValue;

    /**
     * 数据点数量
     */
    private Integer dataPointCount;
}

5.2 时序数据写入优化

package com.example.mongodb.service;

import com.example.mongodb.model.DeviceTelemetry;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * IoT 设备数据服务
 * 高并发写入优化
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class DeviceTelemetryService {

    private final MongoTemplate mongoTemplate;

    // 写入缓冲队列
    private final BlockingQueue<List<DeviceTelemetry>> writeQueue =
            new LinkedBlockingQueue<>(10000);

    // 异步写入线程池
    private final ExecutorService writeExecutor = Executors.newFixedThreadPool(
        Runtime.getRuntime().availableProcessors()
    );

    /**
     * 单点写入(不推荐高频使用)
     */
    public void save(DeviceTelemetry telemetry) {
        mongoTemplate.save(telemetry);
    }

    /**
     * 批量写入(推荐)
     */
    public void saveBatch(List<DeviceTelemetry> telemetryList) {
        if (telemetryList == null || telemetryList.isEmpty()) {
            return;
        }

        // 按设备分组(提升写入局部性)
        var grouped = telemetryList.stream()
                .collect(java.util.stream.Collectors.groupingBy(DeviceTelemetry::getDeviceId));

        for (List<DeviceTelemetry> group : grouped.values()) {
            mongoTemplate.insertAll(group);
        }
    }

    /**
     * 异步缓冲写入(高并发场景)
     *
     * 工作原理:
     * 1. 数据先放入内存队列
     * 2. 后台线程批量消费(每 100ms 或满 1000 条)
     * 3. 批量写入 MongoDB
     */
    public void saveAsync(DeviceTelemetry telemetry) {
        // 简化实现,实际应使用 Disruptor 等高性能队列
        List<DeviceTelemetry> batch = new ArrayList<>();
        batch.add(telemetry);

        writeExecutor.submit(() -> {
            try {
                saveBatch(batch);
            } catch (Exception e) {
                log.error("异步写入失败", e);
                // 可以加入死信队列重试
            }
        });
    }

    /**
     * 查询设备历史数据
     */
    public List<DeviceTelemetry> getDeviceHistory(String deviceId, String metricType,
                                                   LocalDateTime startTime, LocalDateTime endTime,
                                                   int limit) {
        var query = org.springframework.data.mongodb.core.query.Query.query(
                org.springframework.data.mongodb.core.query.Criteria.where("deviceId").is(deviceId)
                        .and("metricType").is(metricType)
                        .and("timestamp").gte(startTime).lte(endTime)
        )
        .with(org.springframework.data.domain.Sort.by(
            org.springframework.data.domain.Sort.Direction.DESC, "timestamp"))
        .limit(limit);

        return mongoTemplate.find(query, DeviceTelemetry.class);
    }

    /**
     * 聚合小时数据(定时任务)
     */
    @Scheduled(cron = "0 5 * * * *")  // 每小时第 5 分钟执行
    public void aggregateHourlyData() {
        LocalDateTime lastHour = LocalDateTime.now().minusHours(1).withMinute(0).withSecond(0);

        // 使用聚合管道计算小时统计
        var aggregation = org.springframework.data.mongodb.core.aggregation.Aggregation.newAggregation(
                org.springframework.data.mongodb.core.aggregation.Aggregation.match(
                        org.springframework.data.mongodb.core.query.Criteria.where("timestamp")
                                .gte(lastHour)
                                .lt(lastHour.plusHours(1))
                ),
                org.springframework.data.mongodb.core.aggregation.Aggregation.group("deviceId", "metricType")
                        .avg("value").as("avgValue")
                        .max("value").as("maxValue")
                        .min("value").as("minValue")
                        .count().as("dataPointCount"),
                org.springframework.data.mongodb.core.aggregation.Aggregation.project("deviceId", "metricType")
                        .and("avgValue").as("avgValue")
                        .and("maxValue").as("maxValue")
                        .and("minValue").as("minValue")
                        .and("dataPointCount").as("dataPointCount")
                        .andExpression("DATEUTIL_HOUR(%s)", lastHour).as("hourBucket")
        );

        // 将聚合结果写入小时表
        // 实际实现需要使用 mongoTemplate.aggregate() 并保存结果
    }
}

六、性能优化与监控

6.1 索引优化策略

// MongoDB Shell - 索引分析与优化

// 1. 查看查询执行计划
db.user_behavior_log.find({ userId: 12345, behaviorType: "VIEW" })
    .explain("executionStats")

// 2. 查看索引使用情况
db.user_behavior_log.getIndexes()

// 3. 删除未使用的索引
db.user_behavior_log.dropIndex("idx_unused")

// 4. 创建覆盖索引(Covered Query)
// 查询字段全部在索引中,无需回表
db.user_behavior_log.createIndex(
    { userId: 1, behaviorType: 1, timestamp: -1 },
    { name: "idx_covered_query" }
)

// 5. 查看索引大小
db.user_behavior_log.aggregate([ { $indexStats: {} } ])

// 6. 分析慢查询
db.setProfileLevel(1, { slowms: 100 })  // 记录>100ms 的查询
db.system.profile.find().sort({ ts: -1 }).limit(10)

6.2 查询优化技巧

package com.example.mongodb.optimization;

import lombok.RequiredArgsConstructor;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;

/**
 * MongoDB 查询优化最佳实践
 */
@Component
@RequiredArgsConstructor
public class QueryOptimization {

    private final MongoTemplate mongoTemplate;

    /**
     * 优化 1:使用投影限制返回字段
     * ❌ 错误:返回整个文档
     * ✅ 正确:只返回需要的字段
     */
    public void optimizeProjection() {
        // 错误示例
        // mongoTemplate.findOne(
        //     Query.query(Criteria.where("userId").is(123)),
        //     UserBehaviorLog.class
        // );

        // 正确示例:使用字段投影
        var query = new org.springframework.data.mongodb.core.query.Query(
            org.springframework.data.mongodb.core.query.Criteria.where("userId").is(123)
        );
        query.fields().include("behaviorType").include("timestamp").exclude("_id");

        mongoTemplate.find(query, org.springframework.data.mongodb.core.query.Query.class);
    }

    /**
     * 优化 2:使用 hint 强制使用索引
     */
    public void useIndexHint() {
        var query = new org.springframework.data.mongodb.core.query.Query(
            org.springframework.data.mongodb.core.query.Criteria.where("userId").is(123)
                    .and("behaviorType").is("VIEW")
        );
        // query.hint("idx_user_behavior");  // Spring Data MongoDB 5.0+ 支持
    }

    /**
     * 优化 3:避免全集合扫描
     * ❌ 错误:对索引字段使用函数
     * ✅ 正确:使用范围查询
     */
    public void avoidCollectionScan() {
        // 错误:无法使用索引
        // Criteria.where("timestamp").gte(LocalDateTime.now().minusDays(7))

        // 正确:直接使用范围
        LocalDateTime startTime = LocalDateTime.now().minusDays(7);
        var criteria = org.springframework.data.mongodb.core.query.Criteria
                .where("timestamp").gte(startTime);
    }

    /**
     * 优化 4:使用批量操作
     */
    public void useBulkOperations() {
        // 使用 bulkWrite 替代多次 update
        var bulkOps = new java.util.ArrayList<org.springframework.data.mongodb.core.BulkOperation>();

        // 添加多个操作
        // bulkOps.add(...);

        // mongoTemplate.bulkOps(
        //     org.springframework.data.mongodb.core.BulkOperations.BulkMode.ORDERED,
        //     UserBehaviorLog.class
        // ).bulkOps(bulkOps).execute();
    }
}

6.3 监控与告警

# Prometheus + Grafana 监控配置

# prometheus.yml
scrape_configs:
  - job_name: 'mongodb'
    static_configs:
      - targets: ['mongodb-exporter:9216']

  - job_name: 'spring-boot'
    metrics_path: '/actuator/prometheus'
    static_configs:
      - targets: ['app:8080']
package com.example.mongodb.monitoring;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

/**
 * MongoDB 性能监控
 */
@Component
@RequiredArgsConstructor
public class MongoMetrics {

    private final MeterRegistry meterRegistry;

    private final Timer queryTimer;
    private final Timer writeTimer;

    /**
     * 记录查询耗时
     */
    public <T> T measureQuery(String collection, java.util.function.Supplier<T> query) {
        return Timer.builder("mongodb.query.duration")
                .tag("collection", collection)
                .register(meterRegistry)
                .recordCallable(query::get);
    }

    /**
     * 记录写入耗时
     */
    public void measureWrite(String collection, Runnable write) {
        Timer.builder("mongodb.write.duration")
                .tag("collection", collection)
                .register(meterRegistry)
                .record(write);
    }

    /**
     * 记录查询结果数量
     */
    public void recordResultCount(String collection, int count) {
        meterRegistry.counter("mongodb.query.results",
                "collection", collection).increment(count);
    }
}

七、总结与架构建议

7.1 MongoDB 适用场景总结

场景 推荐度 关键特性 注意事项
大数据存储 ⭐⭐⭐⭐⭐ 分片、TTL 索引 合理选择分片键
LBS 应用 ⭐⭐⭐⭐⭐ 2dsphere 索引 球面计算精度
消息系统 ⭐⭐⭐⭐⭐ Change Stream 消息过期策略
IoT 时序 ⭐⭐⭐⭐ 高频写入、TTL 定期聚合降成本
内容管理 ⭐⭐⭐⭐ 灵活 Schema 文档大小控制
实时分析 ⭐⭐⭐⭐ 聚合管道 避免复杂 JOIN
金融交易 ⭐⭐⭐ 多文档事务 性能开销较大

7.2 生产环境部署建议

┌─────────────────────────────────────────────────────────────────┐
│                    生产架构推荐                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  应用层:Spring Boot + Spring Data MongoDB                       │
│      ↓                                                          │
│  路由层:MongoDB Router (mongos) × 2                            │
│      ↓                                                          │
│  分片层:Shard × 3 (每分片 3 节点副本集)                         │
│      ↓                                                          │
│  配置层:Config Server × 3                                       │
│                                                                  │
│  监控层:Prometheus + Grafana + MongoDB Compass                  │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

硬件建议:  

  • Shard 节点:16 核 64G SSD  
  • Config 节点:4 核 8G  
  • 网络:万兆内网  
  • 存储:NVMe SSD(IOPS > 10000)  

完整代码仓库:

cd mongodb-ecommerce-demo

# Docker 快速启动
docker-compose up -d mongodb

# 运行示例
mvn spring-boot:run

本文基于 MongoDB 7.0 + Spring Data MongoDB 4.x 编写,生产环境请根据实际版本调整。




上一篇:企业级第三方API通信安全:AES+RSA混合加密架构与Spring Boot 3.x实战
下一篇:iOS 26.4 正式版发布时间确认,AirPods Max 2上市前的关键更新
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-3-18 09:23 , Processed in 0.516338 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表