最忌讳的是让“在线主链路任务”和“离线补偿、报表、批处理任务”共用线程池。 4.2 使用 Little‘s Law 做粗粒度容量估算高并发场景下,一个非常实用的估算公式是 Little’s Law:
例如:
则稳态并发度约为:
如果这是一个 IO 密集型线程池,那么线程数不应简单按 CPU 核数来定,而应该结合:
进行保守设计。 4.3 一个更实用的配置方法CPU 密集型线程池经验值:
适用于:
IO 密集型线程池经验值:
但这个区间不能机械照搬,需要结合等待时间占比:
4.4 队列到底应该多大很多线上事故都不是线程数不够,而是队列过大导致“慢性雪崩”。 队列越大,意味着:
因此,生产环境的建议通常是:
4.5 拒绝策略应该怎么选
在线接口中,很多场景更适合:
五、CompletableFuture 的价值,不只是“异步”5.1 它解决的核心问题是什么CompletableFuture的核心价值有三层:
例如一个订单预览页,需要并行查询:
如果串行调用,总时长约为四个依赖 RT 之和;如果改成并行,总时长约等于最慢依赖的 RT,再加少量调度开销。 5.2 常见编排方式
5.3 生产环境常见误区误区一:不指定线程池默认会使用
误区二:
|
| 依赖 | 类型 | 超时策略 | 失败策略 |
|---|---|---|---|
| 商品服务 | 强依赖 | 200ms | 失败即返回 |
| 库存服务 | 强依赖 | 150ms | 失败即返回 |
| 价格服务 | 强依赖 | 120ms | 失败即返回 |
| 优惠券服务 | 弱依赖 | 80ms | 超时则默认无券 |
| 营销服务 | 弱依赖 | 80ms | 超时则不展示标签 |
| 风控服务 | 准强依赖 | 200ms | 可根据场景同步或异步 |
假设接口总 SLA 为 1200ms,可以这样分配:
100ms50ms300ms150ms200ms400ms注意,预算不是平均切分,而是按照:
来做分层分配。
下面给出一套可直接用于 Spring Boot 项目的示例。代码重点体现:
CompletableFuture 并发编排package com.example.concurrent.config;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import com.example.concurrent.support.FutureHelper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Configuration
public class ExecutorConfig {
@Bean(destroyMethod = "shutdown")
public ScheduledExecutorService timeoutScheduler() {
return Executors.newScheduledThreadPool(
2,
namedThreadFactory("timeout-scheduler")
);
}
@Bean(destroyMethod = "shutdown")
public ThreadPoolExecutor remoteIoExecutor(MeterRegistry meterRegistry) {
ThreadPoolExecutor executor = buildExecutor(
"remote-io",
32,
64,
60,
new ArrayBlockingQueue<>(800),
new ThreadPoolExecutor.AbortPolicy()
);
bindMetrics("remote_io", executor, meterRegistry);
return executor;
}
@Bean(destroyMethod = "shutdown")
public ThreadPoolExecutor bizCpuExecutor(MeterRegistry meterRegistry) {
int cores = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = buildExecutor(
"biz-cpu",
Math.max(cores, 4),
Math.max(cores + 1, 8),
30,
new ArrayBlockingQueue<>(200),
new CallerRunsAndLogPolicy("biz-cpu")
);
bindMetrics("biz_cpu", executor, meterRegistry);
return executor;
}
@Bean(destroyMethod = "shutdown")
public ThreadPoolExecutor degradeExecutor(MeterRegistry meterRegistry) {
ThreadPoolExecutor executor = buildExecutor(
"degrade",
4,
8,
30,
new ArrayBlockingQueue<>(200),
new ThreadPoolExecutor.DiscardPolicy()
);
bindMetrics("degrade", executor, meterRegistry);
return executor;
}
@Bean
public FutureHelper remoteFutureHelper(
ThreadPoolExecutor remoteIoExecutor,
ScheduledExecutorService timeoutScheduler
) {
return new FutureHelper(remoteIoExecutor, timeoutScheduler);
}
private ThreadPoolExecutor buildExecutor(
String poolName,
int corePoolSize,
int maximumPoolSize,
long keepAliveSeconds,
BlockingQueue<Runnable> queue,
RejectedExecutionHandler rejectedExecutionHandler
) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveSeconds,
TimeUnit.SECONDS,
queue,
namedThreadFactory(poolName),
rejectedExecutionHandler
);
executor.allowCoreThreadTimeOut(false);
return executor;
}
private ThreadFactory namedThreadFactory(String prefix) {
AtomicInteger counter = new AtomicInteger(1);
return r -> {
Thread thread = new Thread(r);
thread.setName(prefix + "-" + counter.getAndIncrement());
thread.setDaemon(false);
thread.setUncaughtExceptionHandler((t, ex) ->
System.err.println("uncaught exception in " + t.getName() + ": " + ex.getMessage()));
return thread;
};
}
private void bindMetrics(String poolTag, ThreadPoolExecutor executor, MeterRegistry registry) {
Gauge.builder("app.thread.pool.size", executor, ThreadPoolExecutor::getPoolSize)
.tag("pool", poolTag)
.register(registry);
Gauge.builder("app.thread.active.count", executor, ThreadPoolExecutor::getActiveCount)
.tag("pool", poolTag)
.register(registry);
Gauge.builder("app.thread.queue.size", executor, e -> e.getQueue().size())
.tag("pool", poolTag)
.register(registry);
Gauge.builder("app.thread.completed.count", executor, e -> (double) e.getCompletedTaskCount())
.tag("pool", poolTag)
.register(registry);
}
static class CallerRunsAndLogPolicy implements RejectedExecutionHandler {
private final String poolName;
CallerRunsAndLogPolicy(String poolName) {
this.poolName = poolName;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown()) {
throw new RejectedExecutionException(poolName + " executor has been shutdown");
}
System.err.println("thread pool overload, fallback caller-runs, pool=" + poolName
+ ", active=" + executor.getActiveCount()
+ ", queue=" + executor.getQueue().size());
r.run();
}
}
}
这段配置体现了几个关键点:
package com.example.concurrent.support;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class FutureHelper {
private final ExecutorService executor;
private final ScheduledExecutorService scheduler;
public FutureHelper(ExecutorService executor, ScheduledExecutorService scheduler) {
this.executor = executor;
this.scheduler = scheduler;
}
public <T> CompletableFuture<T> supplyAsyncWithTimeout(
Callable<T> task,
Duration timeout
) {
CompletableFuture<T> result = new CompletableFuture<>();
Future<?> worker = executor.submit(() -> {
try {
T value = task.call();
result.complete(value);
} catch (Throwable ex) {
result.completeExceptionally(ex);
}
});
scheduler.schedule(() -> {
if (!result.isDone()) {
worker.cancel(true);
result.completeExceptionally(
new DependencyTimeoutException("task timeout after " + timeout.toMillis() + "ms")
);
}
}, timeout.toMillis(), TimeUnit.MILLISECONDS);
return result;
}
public static <T> T joinUnchecked(CompletableFuture<T> future) {
try {
return future.join();
} catch (CompletionException ex) {
Throwable cause = ex.getCause() == null ? ex : ex.getCause();
if (cause instanceof RuntimeException runtimeException) {
throw runtimeException;
}
throw new RuntimeException(cause);
}
}
}
package com.example.concurrent.support;
public class DependencyTimeoutException extends RuntimeException {
public DependencyTimeoutException(String message) {
super(message);
}
}
很多文章只用 orTimeout(),但不处理底层任务取消。上面的实现通过 Future.cancel(true) 尝试中断执行线程,并将超时统一收敛为业务可识别的运行时异常,更接近生产诉求。
当然要注意:
package com.example.order.service;
import com.example.concurrent.support.FutureHelper;
import com.example.order.model.CouponInfo;
import com.example.order.model.InventoryInfo;
import com.example.order.model.OrderPreview;
import com.example.order.model.PriceInfo;
import com.example.order.model.ProductInfo;
import com.example.order.model.SubmitOrderRequest;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
@Service
public class OrderPreviewService {
private final ProductService productService;
private final InventoryService inventoryService;
private final PriceService priceService;
private final CouponService couponService;
private final MarketingService marketingService;
private final FutureHelper remoteFutureHelper;
private final ThreadPoolExecutor bizCpuExecutor;
public OrderPreviewService(
ProductService productService,
InventoryService inventoryService,
PriceService priceService,
CouponService couponService,
MarketingService marketingService,
FutureHelper remoteFutureHelper,
ThreadPoolExecutor bizCpuExecutor
) {
this.productService = productService;
this.inventoryService = inventoryService;
this.priceService = priceService;
this.couponService = couponService;
this.marketingService = marketingService;
this.remoteFutureHelper = remoteFutureHelper;
this.bizCpuExecutor = bizCpuExecutor;
}
public OrderPreview previewOrder(SubmitOrderRequest request) {
CompletableFuture<ProductInfo> productFuture =
remoteFutureHelper.supplyAsyncWithTimeout(
() -> productService.queryProduct(request.productId()),
Duration.ofMillis(200)
);
CompletableFuture<InventoryInfo> inventoryFuture =
remoteFutureHelper.supplyAsyncWithTimeout(
() -> inventoryService.queryInventory(request.productId(), request.quantity()),
Duration.ofMillis(150)
);
CompletableFuture<PriceInfo> priceFuture =
remoteFutureHelper.supplyAsyncWithTimeout(
() -> priceService.queryPrice(request.productId(), request.userId(), request.quantity()),
Duration.ofMillis(120)
);
CompletableFuture<CouponInfo> couponFuture =
remoteFutureHelper.supplyAsyncWithTimeout(
() -> couponService.queryBestCoupon(request.userId(), request.productId()),
Duration.ofMillis(80)
).exceptionally(ex -> CouponInfo.empty("coupon service timeout or failed"));
CompletableFuture<String> marketingTagFuture =
remoteFutureHelper.supplyAsyncWithTimeout(
() -> marketingService.queryTag(request.productId()),
Duration.ofMillis(80)
).exceptionally(ex -> "DEFAULT_TAG");
CompletableFuture<Void> allCoreDone =
CompletableFuture.allOf(productFuture, inventoryFuture, priceFuture);
CompletableFuture<OrderPreview> previewFuture = allCoreDone.thenApplyAsync(ignored -> {
ProductInfo product = FutureHelper.joinUnchecked(productFuture);
InventoryInfo inventory = FutureHelper.joinUnchecked(inventoryFuture);
PriceInfo price = FutureHelper.joinUnchecked(priceFuture);
CouponInfo coupon = FutureHelper.joinUnchecked(couponFuture);
String marketingTag = FutureHelper.joinUnchecked(marketingTagFuture);
if (!inventory.available()) {
throw new IllegalStateException("inventory not enough");
}
return new OrderPreview(
request.productId(),
product.productName(),
request.quantity(),
price.originAmount(),
coupon.discountAmount(),
price.originAmount().subtract(coupon.discountAmount()),
marketingTag,
coupon.degradeReason()
);
}, bizCpuExecutor);
return FutureHelper.joinUnchecked(previewFuture);
}
}
这段代码有三个生产级特征:
package com.example.order.controller;
import com.example.order.model.OrderPreview;
import com.example.order.model.SubmitOrderRequest;
import com.example.order.service.OrderPreviewService;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
import com.example.concurrent.support.DependencyTimeoutException;
@RestController
@RequestMapping("/api/orders")
public class OrderController {
private final OrderPreviewService orderPreviewService;
public OrderController(OrderPreviewService orderPreviewService) {
this.orderPreviewService = orderPreviewService;
}
@PostMapping("/preview")
public ResponseEntity<OrderPreview> preview(@RequestBody SubmitOrderRequest request) {
return ResponseEntity.ok(orderPreviewService.previewOrder(request));
}
@ExceptionHandler(DependencyTimeoutException.class)
public ResponseEntity<Map<String, Object>> handleTimeout(DependencyTimeoutException ex) {
return ResponseEntity.status(HttpStatus.GATEWAY_TIMEOUT)
.body(Map.of(
"code", "ORDER_PREVIEW_TIMEOUT",
"message", ex.getMessage()
));
}
@ExceptionHandler(IllegalStateException.class)
public ResponseEntity<Map<String, Object>> handleBizException(IllegalStateException ex) {
return ResponseEntity.status(HttpStatus.CONFLICT)
.body(Map.of(
"code", "ORDER_PREVIEW_FAILED",
"message", ex.getMessage()
));
}
}
package com.example.order.model;
import java.math.BigDecimal;
public record SubmitOrderRequest(
Long userId,
Long productId,
Integer quantity
) {}
public record ProductInfo(
Long productId,
String productName
) {}
public record InventoryInfo(
Long productId,
Integer availableStock,
boolean available
) {}
public record PriceInfo(
BigDecimal originAmount
) {}
public record CouponInfo(
BigDecimal discountAmount,
String degradeReason
) {
public static CouponInfo empty(String reason) {
return new CouponInfo(BigDecimal.ZERO, reason);
}
}
public record OrderPreview(
Long productId,
String productName,
Integer quantity,
BigDecimal originAmount,
BigDecimal discountAmount,
BigDecimal payableAmount,
String marketingTag,
String couponRemark
) {}
线程池只是并发治理的一部分。生产系统还需要以下配套能力。
如果下游服务已经不稳定,仅靠超时会让本服务不断尝试失败请求,形成“重试风暴”。这时要叠加:
可以结合:
对于读多写少、高重复请求的场景,应结合:
否则再好的线程池,也只是把请求更快地打到慢下游上。
并发场景下常见的问题不是“查不到”,而是“重复执行”:
因此必须搭配:
至少包括:
poolSizeactiveCountqueueSizecompletedTaskCount如果只看 CPU 和内存,很容易错过真正的并发瓶颈。
可以设置如下告警:
压测不只是看 QPS,而要重点观察:
如果只看“系统还能扛住”,往往会误判为优化成功。
某内容平台使用 Executors.newFixedThreadPool(200) 执行推荐接口聚合查询。平时一切正常,但活动流量上涨后,下游标签服务 RT 从 30ms 升到 400ms,线程池虽然没有拒绝请求,但队列持续堆积,接口 RT 从 150ms 飙升到 6s,最终大量用户超时。
根因:
改进后:
结果是推荐主链路稳定恢复,即使标签服务抖动,也不会拖垮整体接口。
CompletableFuture 共用 commonPool,导致互相污染某风控系统将异步规则计算、日志异步落盘、画像查询全部放在默认 ForkJoinPool.commonPool()。压测时规则计算任务大量堆积,结果日志任务抢占执行资源,主流程反而变慢。
根因:
改进后:
biz-cpu-poolremote-io-pooldegrade-pool系统尾延迟显著下降。
某聚合接口使用 CompletableFuture.orTimeout(300, MILLISECONDS),看起来已经设置了超时,但底层 HTTP 客户端读超时仍是 5 秒。结果应用层已经返回超时,底层连接还在等待,连接池被逐步占满。
根因:
改进后:
在容器环境中,availableProcessors() 不一定等同于宿主机物理核数。线程池配置必须基于容器实际 CPU limit 来评估,否则线程数可能明显偏大。
很多团队把扩容当作并发优化手段,但如果单实例线程池和超时策略失控,扩出来的只是更多不稳定副本。
正确顺序应该是:
以下配置建议接入配置中心动态调整:
但要注意:动态配置必须有灰度和回滚能力,避免线上误改造成更大风险。
上线前建议逐项确认:
ThreadPoolExecutorCompletableFuture 都指定了线程池线程池调优、超时控制与 CompletableFuture 编排,表面上是三个独立主题,实际上它们共同解决的是同一个问题:
如何在高并发与多依赖的复杂环境下,让系统既能跑得快,又不会在压力和抖动下失控。
真正的生产级方案,绝不是“把线程池调大”或“给 future 加个 timeout”这么简单,而是要形成完整闭环:
CompletableFuture 做并行编排与异常收敛当你把这五件事串起来时,高并发系统才真正具备了工程上的稳定性。
如果你对这类生产级架构实践感兴趣,欢迎到 云栈社区 交流讨论,那里有更多关于分布式系统和后端开发的深度内容。
ThreadPoolExecutor 官方文档CompletableFuture 官方文档