JUC 并发工具类应用场景详解
简介
Java 并发工具包(java.util.concurrent,简称 JUC)是 Java 处理并发编程的核心 API,它提供了一系列高性能、线程安全的工具类,用于解决各种并发编程场景的问题。在互联网大厂的高并发、分布式系统环境中,JUC 工具类扮演着至关重要的角色,是构建高效稳定系统的基石。
本文将深入剖析 JUC 并发工具类在实际业务场景中的应用,包括常见的并发控制、线程同步、异步处理等方面的实践经验及优化方案。
基本概念
在深入应用场景之前,我们先简要回顾 JUC 包中的核心组件分类:
线程池工具:ExecutorService、ThreadPoolExecutor、ScheduledThreadPoolExecutor 等
同步工具类:CountDownLatch、CyclicBarrier、Semaphore、Phaser 等
并发集合:ConcurrentHashMap、CopyOnWriteArrayList、BlockingQueue 等
原子操作类:AtomicInteger、AtomicReference、LongAdder 等
锁机制:ReentrantLock、ReadWriteLock、StampedLock 等
CompletableFuture:用于异步编程的工具类
以上工具类各有特点,针对不同的并发场景提供了专业的解决方案。接下来,我们将结合实际业务场景,深入分析这些工具类的应用。
线程池的应用场景
线程池是 JUC 中最常用的工具之一,在大厂的应用中几乎无处不在。线程池通过复用线程、控制并发数量,有效减少线程创建与销毁的开销,提高系统的整体性能。
场景一:电商平台的订单处理系统
在电商平台的订单系统中,下单后需要进行一系列异步操作,如库存扣减、支付确认、物流通知等。这些操作可以并行执行,且对实时性要求较高。
/**
* 电商订单处理线程池配置
*/
@Configuration
public class OrderThreadPoolConfig {
@Bean("orderProcessThreadPool")
public ThreadPoolExecutor orderProcessThreadPool() {
// 核心线程数:根据业务量和服务器配置确定
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
// 最大线程数:通常为核心线程数的2倍
int maximumPoolSize = corePoolSize * 2;
// 线程存活时间:非核心线程的空闲存活时间
long keepAliveTime = 60L;
// 使用有界队列,防止任务堆积导致OOM
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(1000);
// 自定义线程工厂,便于排查问题
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("order-process-pool-%d")
.setUncaughtExceptionHandler(new LoggingUncaughtExceptionHandler())
.build();
// 拒绝策略:使用CallerRunsPolicy,防止系统崩溃的同时减缓请求速度
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
threadFactory,
handler);
}
// 自定义异常处理器,用于记录线程池中未捕获的异常
private static class LoggingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
private static final Logger logger = LoggerFactory.getLogger(LoggingUncaughtExceptionHandler.class);
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("订单处理线程池发生未捕获异常, 线程名: {}", t.getName(), e);
}
}
}
实际应用分析:
核心线程数设置:基于 CPU 核心数而非固定值,适应不同服务器配置
有界队列:使用 ArrayBlockingQueue 而非 LinkedBlockingQueue,防止无限制接收任务导致 OOM
拒绝策略:选择 CallerRunsPolicy,在系统负载过高时能够起到限流作用
自定义线程工厂:统一的线程命名和异常处理,便于问题定位
订单处理服务中的使用示例:
@Service
public class OrderProcessService {
@Resource(name = "orderProcessThreadPool")
private ThreadPoolExecutor orderProcessThreadPool;
@Resource
private InventoryService inventoryService;
@Resource
private PaymentService paymentService;
@Resource
private LogisticsService logisticsService;
/**
* 处理新订单
*/
public void processNewOrder(Order order) {
// 提交库存操作任务
orderProcessThreadPool.execute(() -> {
try {
inventoryService.deductInventory(order);
} catch (Exception e) {
// 异常处理,可能涉及重试或补偿
log.error("库存扣减失败,订单号: {}", order.getOrderId(), e);
}
});
// 提交支付确认任务
orderProcessThreadPool.execute(() -> {
try {
paymentService.confirmPayment(order);
} catch (Exception e) {
log.error("支付确认失败,订单号: {}", order.getOrderId(), e);
}
});
// 提交物流任务
orderProcessThreadPool.execute(() -> {
try {
logisticsService.createShippingOrder(order);
} catch (Exception e) {
log.error("创建物流单失败,订单号: {}", order.getOrderId(), e);
}
});
}
}
场景二:定时任务调度系统
在大型互联网应用中,定时任务是常见需求,如数据统计、缓存更新、定时通知等。ScheduledThreadPoolExecutor 提供了高精度的定时执行能力。
/**
* 定时任务线程池配置
*/
@Configuration
public class ScheduledTaskConfig {
@Bean
public ScheduledThreadPoolExecutor scheduledTaskExecutor() {
// 核心线程数:基于定时任务数量和执行频率评估
int corePoolSize = 10;
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("scheduled-task-%d")
.setDaemon(true) // 设置为守护线程
.build();
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
corePoolSize,
threadFactory);
// 配置任务取消后从队列中移除
executor.setRemoveOnCancelPolicy(true);
return executor;
}
}
实际使用案例:
@Service
public class DataAnalysisService {
@Resource
private ScheduledThreadPoolExecutor scheduledTaskExecutor;
@Resource
private MetricsRepository metricsRepository;
/**
* 启动数据分析任务
*/
@PostConstruct
public void startAnalysisTasks() {
// 每10分钟执行一次用户行为分析
scheduledTaskExecutor.scheduleAtFixedRate(
this::analyzeUserBehavior,
10,
10,
TimeUnit.MINUTES);
// 每小时执行一次交易数据汇总
scheduledTaskExecutor.scheduleAtFixedRate(
this::aggregateTransactionData,
0,
1,
TimeUnit.HOURS);
// 每天凌晨2点执行日报表生成
scheduledTaskExecutor.scheduleAtFixedRate(
this::generateDailyReport,
getTimeToNextDay(2), // 计算到凌晨2点的延迟
24,
TimeUnit.HOURS);
}
/**
* 计算当前时间到次日指定小时的延迟(秒)
*/
private long getTimeToNextDay(int hour) {
LocalDateTime now = LocalDateTime.now();
LocalDateTime nextRun = now.toLocalDate().plusDays(1).atTime(hour, 0);
if (now.toLocalTime().isAfter(LocalTime.of(hour, 0))) {
nextRun = nextRun.plusDays(1);
}
return ChronoUnit.SECONDS.between(now, nextRun);
}
// 具体任务实现方法...
private void analyzeUserBehavior() {
try {
// 实现用户行为分析逻辑
metricsRepository.analyzeUserMetrics();
} catch (Exception e) {
log.error("用户行为分析任务执行失败", e);
}
}
private void aggregateTransactionData() {
// 交易数据汇总逻辑
}
private void generateDailyReport() {
// 日报表生成逻辑
}
}
最佳实践与优化:
避免任务堆积:ScheduledThreadPoolExecutor 对于固定频率任务,如果前一次执行未完成,会延迟执行下一次任务,但不会并发执行。在业务设计上要控制好单次任务的执行时间
优雅停机:系统关闭时应当调用 scheduledTaskExecutor.shutdown(),并设置合理的等待时间
任务隔离:不同类型、不同优先级的定时任务应使用不同的线程池,避免互相影响
监控告警:实时监控线程池状态,包括活跃线程数、队列深度等指标,发现异常及时告警
场景三:微服务 API 网关的请求处理
在微服务架构中,API 网关负责请求路由、限流、鉴权等功能,需要高效处理大量并发请求。
/**
* API 网关线程池配置
*/
@Configuration
public class GatewayThreadPoolConfig {
@Value("${gateway.thread.core-size:200}")
private int coreSize;
@Value("${gateway.thread.max-size:400}")
private int maxSize;
@Value("${gateway.thread.queue-capacity:1000}")
private int queueCapacity;
@Value("${gateway.thread.keep-alive:60}")
private int keepAliveSeconds;
@Bean("apiGatewayExecutor")
public ThreadPoolExecutor apiGatewayExecutor() {
// 使用自定义队列,支持获取队列剩余容量
ResizableCapacityLinkedBlockingQueue<Runnable> workQueue =
new ResizableCapacityLinkedBlockingQueue<>(queueCapacity);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("api-gateway-%d")
.build();
// 自定义拒绝策略:记录指标并返回特定错误码
RejectedExecutionHandler handler = new GatewayRejectedExecutionHandler();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
coreSize,
maxSize,
keepAliveSeconds,
TimeUnit.SECONDS,
workQueue,
threadFactory,
handler);
// 允许核心线程超时,提高资源利用率
executor.allowCoreThreadTimeOut(true);
return executor;
}
/**
* 自定义拒绝策略
*/
private static class GatewayRejectedExecutionHandler implements RejectedExecutionHandler {
private final Counter rejectedRequests =
Metrics.counter("gateway.rejected.requests");
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录指标
rejectedRequests.increment();
// 抛出特定异常,由全局异常处理器转换为 HTTP 429 响应
throw new ServiceBusyException("服务繁忙,请稍后重试");
}
}
/**
* 可调整容量的 LinkedBlockingQueue
*/
private static class ResizableCapacityLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
public ResizableCapacityLinkedBlockingQueue(int capacity) {
super(capacity);
}
public boolean setCapacity(int capacity) {
// 实现队列容量动态调整
// 省略实现细节
return true;
}
}
}
应用示例:
@Component
public class ApiGatewayHandler {
@Resource(name = "apiGatewayExecutor")
private ThreadPoolExecutor executor;
@Resource
private RouterService routerService;
@Resource
private RequestLimiter requestLimiter;
/**
* 处理 API 请求
*/
public Mono<ServerResponse> handleRequest(ServerRequest request) {
// 限流检查
if (!requestLimiter.tryAcquire()) {
return ServerResponse.status(HttpStatus.TOO_MANY_REQUESTS)
.bodyValue("请求过于频繁,请稍后重试");
}
return Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
// 1. 请求预处理(鉴权、参数校验等)
processRequest(request);
// 2. 路由到目标服务
return routerService.route(request);
}, executor))
.onErrorResume(this::handleError);
}
private Mono<ServerResponse> handleError(Throwable error) {
if (error instanceof ServiceBusyException) {
return ServerResponse.status(HttpStatus.TOO_MANY_REQUESTS)
.bodyValue(error.getMessage());
}
// 其他异常处理
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue("服务器内部错误");
}
// 请求处理逻辑
private void processRequest(ServerRequest request) {
// 实现请求处理逻辑
}
}
关键优化实践:
参数可配置化:线程池核心参数通过配置中心动态调整,适应流量变化
自定义拒绝策略:结合业务需求,实现定制化的拒绝处理
动态队列容量:实现队列容量的动态调整,应对突发流量
监控指标:记录线程池饱和度、拒绝请求数等关键指标
超时控制:为每个任务设置超时控制,避免长时间运行的任务占用线程资源
同步工具类的应用场景
JUC 提供了多种同步工具类,用于协调多个线程之间的协作。这些同步工具类在不同场景下有各自的应用优势,下面是它们在大厂实际应用中的案例。
场景一:CountDownLatch 在分布式任务协调中的应用
CountDownLatch 是一个计数器闭锁,用于等待一组线程完成工作后再继续执行。在大厂环境中,常用于并行任务处理、批量数据导入等场景。
案例:大数据平台的并行 ETL 处理
在数据仓库 ETL 过程中,需要从多个数据源并行抽取数据,全部完成后再进行汇总处理:
/**
* 并行 ETL 处理服务
*/
@Service
public class ParallelETLService {
@Resource
private ThreadPoolExecutor etlThreadPool;
@Resource
private List<DataSourceConnector> dataSourceConnectors;
@Resource
private DataMergeService dataMergeService;
/**
* 执行并行 ETL 任务
*/
public ETLResult performParallelETL(ETLContext context) {
int sourceCount = dataSourceConnectors.size();
// 创建 CountDownLatch,计数器为数据源数量
CountDownLatch latch = new CountDownLatch(sourceCount);
// 用于收集各数据源处理结果
List<DataExtractResult> extractResults =
Collections.synchronizedList(new ArrayList<>(sourceCount));
// 提交并行抽取任务
for (DataSourceConnector connector : dataSourceConnectors) {
etlThreadPool.execute(() -> {
try {
// 执行数据抽取转换
DataExtractResult result = connector.extractAndTransform(context);
extractResults.add(result);
} catch (Exception e) {
log.error("数据源[{}]处理失败", connector.getName(), e);
// 添加错误结果
extractResults.add(new DataExtractResult(connector.getName(), e));
} finally {
// 无论成功失败,都减少计数器
latch.countDown();
}
});
}
try {
// 设置最大等待时间,防止永久阻塞
boolean completed = latch.await(30, TimeUnit.MINUTES);
if (!completed) {
log.warn("ETL 任务执行超时,已完成{}/{}个数据源处理",
sourceCount - latch.getCount(), sourceCount);
}
// 执行数据合并处理
return dataMergeService.mergeAndLoad(extractResults, context);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("ETL 任务被中断", e);
return new ETLResult(ETLStatus.INTERRUPTED, "任务被中断");
}
}
}
实际应用分析:
超时控制:设置了 await 的超时时间,防止因某个数据源处理异常导致永久等待
结果收集:使用线程安全的集合收集各任务执行结果
异常处理:在 finally 块中执行 countDown,确保计数器正确递减
状态汇总:根据任务完成情况,返回相应的处理结果
场景二:CyclicBarrier 在并行计算中的应用
CyclicBarrier 允许一组线程相互等待,直到所有线程都到达屏障点,适用于需要分阶段执行的并行算法。
案例:推荐系统的并行特征计算
在推荐系统中,用户特征计算通常分为多个阶段,每个阶段都依赖上一阶段的结果:
/**
* 用户特征并行计算服务
*/
@Service
public class UserFeatureCalculationService {
@Resource
private ThreadPoolExecutor computeThreadPool;
@Resource
private FeatureRepository featureRepository;
/**
* 并行计算用户特征
*/
public UserFeatureModel calculateUserFeatures(String userId, FeatureContext context) {
int threadCount = 4; // 并行线程数
// 创建循环栅栏,所有线程到达后执行合并操作
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
// 每个阶段结束时执行的操作,如特征归一化
log.info("所有线程已完成当前阶段计算,执行特征合并");
context.normalizeFeatures();
});
// 创建并行计算任务
List<Future<FeatureSegment>> futures = new ArrayList<>(threadCount);
// 第一阶段:基础特征计算
for (int i = 0; i < threadCount; i++) {
final int segmentIndex = i;
futures.add(computeThreadPool.submit(() -> {
// 计算基础特征
FeatureSegment segment = calculateBasicFeatures(userId, segmentIndex, context);
try {
// 等待所有线程完成基础特征计算
barrier.await(10, TimeUnit.MINUTES);
} catch (Exception e) {
log.error("等待基础特征计算完成时发生错误", e);
throw new FeatureCalculationException("基础特征计算等待失败", e);
}
// 第二阶段:行为特征计算
calculateBehaviorFeatures(segment, context);
try {
// 等待所有线程完成行为特征计算
barrier.await(10, TimeUnit.MINUTES);
} catch (Exception e) {
log.error("等待行为特征计算完成时发生错误", e);
throw new FeatureCalculationException("行为特征计算等待失败", e);
}
// 第三阶段:社交特征计算
calculateSocialFeatures(segment, context);
try {
// 等待所有线程完成社交特征计算
barrier.await(10, TimeUnit.MINUTES);
} catch (Exception e) {
log.error("等待社交特征计算完成时发生错误", e);
throw new FeatureCalculationException("社交特征计算等待失败", e);
}
return segment;
}));
}
// 收集所有计算结果
List<FeatureSegment> segments = new ArrayList<>(threadCount);
for (Future<FeatureSegment> future : futures) {
try {
segments.add(future.get());
} catch (Exception e) {
log.error("获取特征计算结果失败", e);
throw new FeatureCalculationException("特征计算失败", e);
}
}
// 合并最终结果
return featureRepository.mergeFeatureSegments(segments, context);
}
// 计算基础特征
private FeatureSegment calculateBasicFeatures(String userId, int segmentIndex, FeatureContext context) {
// 实现基础特征计算逻辑
return new FeatureSegment();
}
// 计算行为特征
private void calculateBehaviorFeatures(FeatureSegment segment, FeatureContext context) {
// 实现行为特征计算逻辑
}
// 计算社交特征
private void calculateSocialFeatures(FeatureSegment segment, FeatureContext context) {
// 实现社交特征计算逻辑
}
}
实际应用分析:
分段计算:将特征计算任务划分为多个段,利用并行提高计算效率
阶段同步:使用 CyclicBarrier 确保所有线程完成当前阶段计算后再进入下一阶段
归一化处理:在栅栏动作中执行特征归一化,保证各阶段特征的一致性
超时控制:为每个 await 设置超时时间,防止因个别线程问题导致整体计算阻塞
场景三:Semaphore 在资源控制中的应用
Semaphore 用于控制同时访问特定资源的线程数量,在限流、连接池管理等场景中广泛应用。
案例:外部接口调用限流器
在微服务系统中,调用外部 API 时需要控制并发请求数,避免对外部系统造成过大压力:
/**
* 外部 API 接口调用限流器
*/
@Component
public class ExternalApiLimiter {
// 外部 API 限流配置
private static class ApiConfig {
private final String apiName;
private final int maxConcurrency;
private final Semaphore semaphore;
public ApiConfig(String apiName, int maxConcurrency) {
this.apiName = apiName;
this.maxConcurrency = maxConcurrency;
this.semaphore = new Semaphore(maxConcurrency, true); // 公平模式
}
}
// API 配置映射
private final Map<String, ApiConfig> apiConfigMap = new ConcurrentHashMap<>();
// 监控指标
private final Counter rejectedRequests = Metrics.counter("api.rejected.requests");
private final Counter totalRequests = Metrics.counter("api.total.requests");
/**
* 初始化 API 配置
*/
@PostConstruct
public void init() {
// 从配置中心获取各 API 的限流配置
apiConfigMap.put("payment", new ApiConfig("payment", 50));
apiConfigMap.put("inventory", new ApiConfig("inventory", 100));
apiConfigMap.put("logistics", new ApiConfig("logistics", 30));
// 可通过配置中心动态更新
}
/**
* 执行受限流控制的 API 调用
*/
public <T> T executeWithRateLimit(String apiName, Supplier<T> apiCall, long timeout, TimeUnit unit)
throws ApiRateLimitException {
totalRequests.increment();
ApiConfig config = apiConfigMap.get(apiName);
if (config == null) {
// 未配置的 API 默认使用较小的并发限制
config = new ApiConfig(apiName, 10);
apiConfigMap.put(apiName, config);
}
boolean acquired = false;
try {
// 尝试获取许可
acquired = config.semaphore.tryAcquire(timeout, unit);
if (!acquired) {
rejectedRequests.increment();
throw new ApiRateLimitException("调用 API[" + apiName + "]被限流");
}
// 执行实际 API 调用
return apiCall.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ApiRateLimitException("获取 API 调用许可时被中断", e);
} finally {
// 释放许可
if (acquired) {
config.semaphore.release();
}
}
}
/**
* 更新 API 限流配置
*/
public void updateApiConfig(String apiName, int maxConcurrency) {
ApiConfig oldConfig = apiConfigMap.get(apiName);
if (oldConfig != null) {
// 替换为新的配置
apiConfigMap.put(apiName, new ApiConfig(apiName, maxConcurrency));
}
}
}
使用示例:
@Service
public class ExternalServiceClient {
@Resource
private ExternalApiLimiter apiLimiter;
@Resource
private RestTemplate restTemplate;
/**
* 调用支付服务
*/
public PaymentResult processPayment(PaymentRequest request) {
try {
return apiLimiter.executeWithRateLimit(
"payment",
() -> restTemplate.postForObject(
"https://payment-api.example.com/process",
request,
PaymentResult.class
),
5, // 等待超时时间
TimeUnit.SECONDS
);
} catch (ApiRateLimitException e) {
log.warn("支付请求被限流", e);
throw new ServiceBusyException("支付服务繁忙,请稍后再试");
}
}
}
实际应用分析:
API 粒度限流:为不同 API 设置不同的并发限制,精细化控制资源
超时控制:使用 tryAcquire 设置获取许可的超时时间,避免长时间等待
公平模式:使用公平模式创建 Semaphore,避免线程饥饿
动态配置:支持运行时动态调整限流参数
监控指标:记录请求总数和被拒绝的请求数,方便系统监控
并发容器的应用场景
JUC 包提供了丰富的并发容器,如 ConcurrentHashMap、CopyOnWriteArrayList、BlockingQueue 等,这些容器在大厂的高并发场景中有着广泛应用。
场景一:ConcurrentHashMap 在缓存系统中的应用
ConcurrentHashMap 是使用最广泛的并发容器之一,它在高并发读多写少的场景中表现优秀,常用于各类缓存实现。
案例:分布式系统的本地缓存实现
/**
* 基于 ConcurrentHashMap 的本地缓存实现
*/
@Component
public class LocalCacheManager<K, V> {
// 缓存数据存储
private final ConcurrentHashMap<K, CacheItem<V>> cacheMap;
// 缓存统计信息
private final LongAdder hitCount = new LongAdder();
private final LongAdder missCount = new LongAdder();
private final LongAdder evictionCount = new LongAdder();
// 缓存清理线程
private final ScheduledExecutorService cleanerExecutor;
/**
* 缓存项,包含值和过期时间
*/
private static class CacheItem<V> {
private final V value;
private final long expireTime;
public CacheItem(V value, long ttlMillis) {
this.value = value;
this.expireTime = System.currentTimeMillis() + ttlMillis;
}
public boolean isExpired() {
return System.currentTimeMillis() > expireTime;
}
public V getValue() {
return value;
}
}
/**
* 构造函数
* @param initialCapacity 初始容量
* @param cleanInterval 清理间隔(秒)
*/
public LocalCacheManager(int initialCapacity, int cleanInterval) {
this.cacheMap = new ConcurrentHashMap<>(initialCapacity);
this.cleanerExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("cache-cleaner-%d")
.setDaemon(true)
.build());
// 启动定期清理任务
this.cleanerExecutor.scheduleAtFixedRate(
this::cleanExpiredItems,
cleanInterval,
cleanInterval,
TimeUnit.SECONDS);
}
/**
* 放入缓存项
* @param key 键
* @param value 值
* @param ttlMillis 存活时间(毫秒)
*/
public void put(K key, V value, long ttlMillis) {
if (key == null || value == null) {
throw new IllegalArgumentException("缓存的键和值不能为 null");
}
cacheMap.put(key, new CacheItem<>(value, ttlMillis));
}
/**
* 获取缓存项
* @param key 键
* @return 缓存的值,如果不存在或已过期则返回 null
*/
public V get(K key) {
if (key == null) {
return null;
}
CacheItem<V> item = cacheMap.get(key);
// 缓存未命中
if (item == null) {
missCount.increment();
return null;
}
// 检查是否过期
if (item.isExpired()) {
// 惰性删除过期项
cacheMap.remove(key);
missCount.increment();
return null;
}
// 缓存命中
hitCount.increment();
return item.getValue();
}
/**
* 获取缓存项,如果不存在则通过指定函数加载并缓存
*/
public V getOrLoad(K key, Function<K, V> loadFunction, long ttlMillis) {
V value = get(key);
if (value != null) {
return value;
}
// 缓存未命中,需要加载
// 使用 computeIfAbsent 保证并发情况下只有一个线程会执行加载
CacheItem<V> computedItem = cacheMap.computeIfAbsent(key, k -> {
V newValue = loadFunction.apply(k);
if (newValue == null) {
// 不缓存 null 值
return null;
}
return new CacheItem<>(newValue, ttlMillis);
});
// 如果计算结果为 null,表示加载函数返回 null
return computedItem != null ? computedItem.getValue() : null;
}
/**
* 移除缓存项
*/
public void remove(K key) {
cacheMap.remove(key);
}
/**
* 清空缓存
*/
public void clear() {
cacheMap.clear();
}
/**
* 获取缓存大小
*/
public int size() {
return cacheMap.size();
}
/**
* 清理过期项
*/
private void cleanExpiredItems() {
try {
long now = System.currentTimeMillis();
int cleanedCount = 0;
// 遍历所有缓存项,清理过期的
for (Iterator<Map.Entry<K, CacheItem<V>>> iterator = cacheMap.entrySet().iterator();
iterator.hasNext();) {
Map.Entry<K, CacheItem<V>> entry = iterator.next();
if (entry.getValue().expireTime < now) {
iterator.remove();
cleanedCount++;
}
}
if (cleanedCount > 0) {
evictionCount.add(cleanedCount);
log.debug("已清理 {} 个过期缓存项", cleanedCount);
}
} catch (Exception e) {
log.error("清理过期缓存项时发生错误", e);
}
}
/**
* 获取缓存统计信息
*/
public Map<String, Long> getStats() {
Map<String, Long> stats = new HashMap<>();
stats.put("size", (long) cacheMap.size());
stats.put("hitCount", hitCount.sum());
stats.put("missCount", missCount.sum());
stats.put("evictionCount", evictionCount.sum());
return stats;
}
/**
* 关闭缓存,停止清理线程
*/
public void shutdown() {
cleanerExecutor.shutdown();
}
}
使用示例:
@Service
public class ProductService {
@Resource
private LocalCacheManager<Long, ProductInfo> productCache;
@Resource
private ProductRepository productRepository;
/**
* 获取商品信息
*/
public ProductInfo getProductInfo(Long productId) {
// 首先尝试从缓存获取,如果不存在则从数据库加载
return productCache.getOrLoad(
productId,
this::loadProductFromDb,
TimeUnit.MINUTES.toMillis(10) // 缓存 10 分钟
);
}
/**
* 从数据库加载商品信息
*/
private ProductInfo loadProductFromDb(Long productId) {
log.info("从数据库加载商品信息: {}", productId);
return productRepository.findById(productId)
.orElse(null);
}
/**
* 更新商品信息
*/
@Transactional
public void updateProduct(ProductInfo product) {
// 更新数据库
productRepository.save(product);
// 更新缓存或使缓存失效
productCache.put(product.getId(), product, TimeUnit.MINUTES.toMillis(10));
}
}
实际应用分析:
高性能读取:ConcurrentHashMap 适合读多写少的场景,提供了近乎无锁的读取性能
原子操作:利用 computeIfAbsent 实现原子的"检查并计算",避免缓存击穿问题
过期清理策略:结合定时清理和惰性清理,平衡 CPU 和内存资源
统计指标:使用 LongAdder 收集命中率等指标,最小化内存占用和性能影响
缓存一致性:通过更新或失效策略保持缓存与数据源的一致性
场景二:BlockingQueue 在生产者-消费者模式中的应用
BlockingQueue 是实现生产者-消费者模式的理想选择,在任务队列、消息缓冲等场景中广泛应用。
案例:日志异步处理系统
/**
* 异步日志处理系统
*/
@Component
public class AsyncLogProcessor {
// 日志处理队列
private final BlockingQueue<LogEntry> logQueue;
// 消费者线程池
private final ExecutorService consumerPool;
// 是否正在运行
private volatile boolean running = true;
/**
* 日志条目
*/
@Data
@Builder
public static class LogEntry {
private String traceId;
private String serviceId;
private Level level;
private String message;
private Map<String, String> context;
private long timestamp;
private String threadName;
}
/**
* 日志处理器接口
*/
public interface LogHandler {
void handleLog(List<LogEntry> logBatch);
}
@Resource
private List<LogHandler> logHandlers;
/**
* 构造函数
* @param queueCapacity 队列容量
* @param consumerCount 消费者线程数
*/
public AsyncLogProcessor(int queueCapacity, int consumerCount) {
// 使用LinkedBlockingQueue作为日志队列
this.logQueue = new LinkedBlockingQueue<>(queueCapacity);
// 创建消费者线程池
this.consumerPool = Executors.newFixedThreadPool(
consumerCount,
new ThreadFactoryBuilder()
.setNameFormat("log-consumer-%d")
.setDaemon(true)
.build());
// 启动消费者线程
for (int i = 0; i < consumerCount; i++) {
consumerPool.submit(this::consumeLogs);
}
}
/**
* 提交日志条目
* @param logEntry 日志条目
* @return 是否提交成功
*/
public boolean submit(LogEntry logEntry) {
if (!running) {
return false;
}
try {
// 尝试将日志放入队列,超时则放弃
return logQueue.offer(logEntry, 100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
/**
* 消费日志任务
*/
private void consumeLogs() {
final int batchSize = 100; // 批处理大小
final long maxWaitTime = 200; // 最大等待时间(毫秒)
List<LogEntry> logBatch = new ArrayList<>(batchSize);
while (running) {
try {
// 获取第一个日志条目,可能会阻塞
LogEntry firstLog = logQueue.poll(maxWaitTime, TimeUnit.MILLISECONDS);
if (firstLog == null) {
// 超时未获取到日志,继续下一轮
continue;
}
// 添加到批处理集合
logBatch.add(firstLog);
// 尝试非阻塞地获取更多日志,直到达到批处理大小
logQueue.drainTo(logBatch, batchSize - 1);
// 处理日志批次
processBatch(logBatch);
// 清空当前批次
logBatch.clear();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("日志消费线程被中断");
break;
} catch (Exception e) {
log.error("处理日志批次时发生错误", e);
// 继续处理下一批
}
}
}
/**
* 处理日志批次
*/
private void processBatch(List<LogEntry> logBatch) {
if (logBatch.isEmpty()) {
return;
}
// 调用所有注册的日志处理器
for (LogHandler handler : logHandlers) {
try {
handler.handleLog(logBatch);
} catch (Exception e) {
log.error("日志处理器[{}]处理失败", handler.getClass().getSimpleName(), e);
// 继续执行其他处理器
}
}
}
/**
* 关闭处理器
*/
public void shutdown() {
running = false;
// 停止接收新的日志
consumerPool.shutdown();
try {
// 等待处理完当前日志
if (!consumerPool.awaitTermination(30, TimeUnit.SECONDS)) {
consumerPool.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
consumerPool.shutdownNow();
}
}
/**
* 获取队列状态
*/
public Map<String, Integer> getQueueStats() {
Map<String, Integer> stats = new HashMap<>();
stats.put("queueSize", logQueue.size());
stats.put("remainingCapacity", logQueue.remainingCapacity());
return stats;
}
}
日志处理器实现示例:
/**
* ElasticSearch 日志处理器
*/
@Component
public class ElasticsearchLogHandler implements AsyncLogProcessor.LogHandler {
@Resource
private ElasticsearchClient esClient;
/**
* 处理日志批次
*/
@Override
public void handleLog(List<AsyncLogProcessor.LogEntry> logBatch) {
if (logBatch.isEmpty()) {
return;
}
// 转换为ES批量操作
BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
for (AsyncLogProcessor.LogEntry log : logBatch) {
// 根据日期和服务确定索引名
String indexName = "logs-" + log.getServiceId() + "-" + formatDate(log.getTimestamp());
// 添加到批量请求
bulkRequest.operations(op -> op
.index(idx -> idx
.index(indexName)
.document(log)
)
);
}
try {
// 执行批量写入
BulkResponse response = esClient.bulk(bulkRequest.build());
if (response.errors()) {
// 处理写入错误
for (BulkResponseItem item : response.items()) {
if (item.error() != null) {
log.error("日志写入ES失败: {}", item.error().reason());
}
}
}
} catch (Exception e) {
log.error("批量写入ES失败", e);
// 可以考虑重试或写入备用存储
}
}
private String formatDate(long timestamp) {
return DateTimeFormatter.ofPattern("yyyy-MM-dd")
.format(Instant.ofEpochMilli(timestamp)
.atZone(ZoneId.systemDefault())
.toLocalDate());
}
}
实际应用分析:
解耦与缓冲:使用BlockingQueue解耦日志生产和消费,缓冲突发日志
批量处理:通过drainTo方法高效收集批量日志,减少I/O操作频率
多消费者模式:使用线程池并行处理日志,提高吞吐量
可靠性设计:
生产者超时提交,避免阻塞业务线程
消费者异常隔离,单个处理器失败不影响整体
优雅关闭,确保已提交日志得到处理
监控指标:提供队列大小和剩余容量指标,便于监控系统状态
场景三:CopyOnWriteArrayList 在配置管理中的应用
CopyOnWriteArrayList 适用于读多写少的场景,特别是对实时性要求不高的配置管理、监听器列表等场景。
案例:动态配置中心的本地订阅者管理
/**
* 配置中心客户端
*/
@Component
public class ConfigCenterClient {
/**
* 配置变更监听器
*/
public interface ConfigChangeListener {
/**
* 配置变更回调
* @param key 配置键
* @param oldValue 旧值
* @param newValue 新值
*/
void onConfigChanged(String key, String oldValue, String newValue);
}
// 当前配置缓存
private final ConcurrentHashMap<String, String> configCache = new ConcurrentHashMap<>();
// 监听器映射,key 为配置键,value 为该配置的所有监听器
private final ConcurrentHashMap<String, CopyOnWriteArrayList<ConfigChangeListener>> listeners =
new ConcurrentHashMap<>();
// 配置中心客户端
@Resource
private ConfigCenterApiClient apiClient;
/**
* 初始化客户端
*/
@PostConstruct
public void init() {
// 拉取初始配置
refreshConfigs();
// 启动长轮询或 WebSocket 连接监听配置变更
startConfigChangeListener();
}
/**
* 添加配置变更监听器
* @param key 配置键
* @param listener 监听器
*/
public void addListener(String key, ConfigChangeListener listener) {
// 获取指定key的监听器列表,如果不存在则创建
listeners.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>())
.addIfAbsent(listener);
}
/**
* 移除配置变更监听器
*/
public void removeListener(String key, ConfigChangeListener listener) {
CopyOnWriteArrayList<ConfigChangeListener> keyListeners = listeners.get(key);
if (keyListeners != null) {
keyListeners.remove(listener);
}
}
/**
* 获取配置值
*/
public String getConfig(String key, String defaultValue) {
return configCache.getOrDefault(key, defaultValue);
}
/**
* 获取整型配置
*/
public int getIntConfig(String key, int defaultValue) {
String value = configCache.get(key);
if (value == null) {
return defaultValue;
}
try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
log.warn("配置[{}]的值[{}]不是有效的整数,使用默认值{}", key, value, defaultValue);
return defaultValue;
}
}
/**
* 刷新配置
*/
public void refreshConfigs() {
try {
// 调用API获取最新配置
Map<String, String> latestConfigs = apiClient.getAllConfigs();
// 更新本地缓存并触发变更事件
updateConfigs(latestConfigs);
} catch (Exception e) {
log.error("刷新配置失败", e);
}
}
/**
* 启动配置变更监听
*/
private void startConfigChangeListener() {
// 使用长轮询或WebSocket监听配置变更
// 实现省略...
}
/**
* 处理接收到的配置变更
*/
public void handleConfigChanges(Map<String, String> changedConfigs) {
updateConfigs(changedConfigs);
}
/**
* 更新配置并触发监听器
*/
private void updateConfigs(Map<String, String> newConfigs) {
for (Map.Entry<String, String> entry : newConfigs.entrySet()) {
String key = entry.getKey();
String newValue = entry.getValue();
String oldValue = configCache.put(key, newValue);
// 如果值已变更,触发监听器
if (!Objects.equals(oldValue, newValue)) {
notifyListeners(key, oldValue, newValue);
}
}
}
/**
* 通知监听器
*/
private void notifyListeners(String key, String oldValue, String newValue) {
// 获取该配置的所有监听器
CopyOnWriteArrayList<ConfigChangeListener> keyListeners = listeners.get(key);
if (keyListeners != null && !keyListeners.isEmpty()) {
for (ConfigChangeListener listener : keyListeners) {
try {
// 异步通知,避免监听器执行耗时操作阻塞
CompletableFuture.runAsync(() -> {
try {
listener.onConfigChanged(key, oldValue, newValue);
} catch (Exception e) {
log.error("配置变更监听器执行异常", e);
}
});
} catch (Exception e) {
log.error("触发配置变更监听器异常", e);
}
}
}
}
}
使用示例:
@Service
public class UserServiceImpl implements UserService {
@Resource
private ConfigCenterClient configClient;
// 用户缓存过期时间(默认5分钟)
private volatile int userCacheExpireMinutes = 5;
@PostConstruct
public void init() {
// 初始化配置值
userCacheExpireMinutes = configClient.getIntConfig("user.cache.expireMinutes", 5);
// 监听配置变更
configClient.addListener("user.cache.expireMinutes",
(key, oldValue, newValue) -> {
try {
int newExpireMinutes = Integer.parseInt(newValue);
log.info("用户缓存过期时间已更新: {} -> {}", oldValue, newValue);
userCacheExpireMinutes = newExpireMinutes;
// 可以在这里刷新现有缓存的过期时间
} catch (NumberFormatException e) {
log.error("无效的缓存过期时间配置: {}", newValue);
}
});
}
@Override
public UserInfo getUserInfo(Long userId) {
// 使用当前的缓存过期时间
return userCache.getOrLoad(userId,
this::loadUserFromDb,
TimeUnit.MINUTES.toMillis(userCacheExpireMinutes));
}
private UserInfo loadUserFromDb(Long userId) {
// 从数据库加载用户信息
return userRepository.findById(userId).orElse(null);
}
}
实际应用分析:
读写分离:CopyOnWriteArrayList适合读多写少的场景,写入时复制整个数组,不影响读操作
线程安全迭代:无需额外同步,迭代器不会抛出ConcurrentModificationException
动态配置:可在运行时添加/移除监听器,无需重启应用
异步通知:监听器回调使用CompletableFuture异步执行,避免阻塞关键线程
配置热更新:实现了配置的动态更新和实时生效
原子操作类的应用场景
原子操作类(Atomic 包)提供了在单个变量上执行原子操作的工具,包括基本类型、数组元素和引用等。这些类基于底层的 CAS(Compare-And-Swap)机制,避免了锁的开销,在高并发场景下有着广泛应用。
场景一:高性能计数器
在大型系统中,计数器是一个常见需求,如统计 API 调用次数、记录用户行为、流量监控等。使用原子类可以实现高性能的计数器。
案例:电商平台的实时访问统计
/**
* 实时访问量统计服务
*/
@Service
public class RealTimeStatisticsService {
// 常规访问量计数器
private final ConcurrentHashMap<String, LongAdder> pageViewCounters = new ConcurrentHashMap<>();
// 独立访客计数器(按页面)
private final ConcurrentHashMap<String, LongAdder> uniqueVisitorCounters = new ConcurrentHashMap<>();
// 已访问用户集合(按页面)
private final ConcurrentHashMap<String, Set<String>> visitedUsers = new ConcurrentHashMap<>();
// 总订单金额统计
private final LongAdder totalOrderAmount = new LongAdder();
// 订单数统计
private final LongAdder totalOrderCount = new LongAdder();
/**
* 记录页面访问
*
* @param pageKey 页面标识
* @param userId 用户ID,可能为null(未登录用户)
* @param visitorId 访客ID(如Cookie标识)
*/
public void recordPageView(String pageKey, String userId, String visitorId) {
// 增加页面访问计数
pageViewCounters.computeIfAbsent(pageKey, k -> new LongAdder()).increment();
// 使用 visitorId 计算独立访客
if (visitorId != null) {
String visitorKey = pageKey + ":" + visitorId;
Set<String> visitors = visitedUsers.computeIfAbsent(pageKey,
k -> ConcurrentHashMap.newKeySet());
// 如果是新访客,增加独立访客计数
if (visitors.add(visitorId)) {
uniqueVisitorCounters.computeIfAbsent(pageKey, k -> new LongAdder()).increment();
}
}
// 其他统计逻辑...
}
/**
* 记录订单金额
* @param orderAmount 订单金额(分)
*/
public void recordOrder(long orderAmount) {
// 增加订单数
totalOrderCount.increment();
// 增加订单总金额
totalOrderAmount.add(orderAmount);
}
/**
* 获取页面访问统计
*/
public Map<String, Long> getPageViewStats() {
Map<String, Long> result = new HashMap<>();
pageViewCounters.forEach((key, counter) -> result.put(key, counter.sum()));
return result;
}
/**
* 获取页面独立访客统计
*/
public Map<String, Long> getUniqueVisitorStats() {
Map<String, Long> result = new HashMap<>();
uniqueVisitorCounters.forEach((key, counter) -> result.put(key, counter.sum()));
return result;
}
/**
* 获取订单统计
*/
public OrderStats getOrderStats() {
return new OrderStats(
totalOrderCount.sum(),
totalOrderAmount.sum()
);
}
/**
* 重置统计数据(如每日统计重置)
*/
public void resetStatistics() {
pageViewCounters.clear();
uniqueVisitorCounters.clear();
visitedUsers.clear();
// 重置累加器
while (totalOrderAmount.sum() > 0) {
totalOrderAmount.add(-totalOrderAmount.sum());
}
while (totalOrderCount.sum() > 0) {
totalOrderCount.add(-totalOrderCount.sum());
}
}
@Data
@AllArgsConstructor
public static class OrderStats {
private long orderCount;
private long orderAmount;
}
}
实际应用分析:
高并发支持:LongAdder 专为高并发计数场景设计,内部使用分段计数减小竞争
低开销:相比 AtomicLong,在高并发下 LongAdder 有更低的竞争开销
组合应用:结合 ConcurrentHashMap 实现多维度统计
灵活统计:支持增量计数和精确读取,满足实时监控需求
重置功能:支持统计周期性重置,适用于按天/小时统计场景
场景二:AtomicReference 在无锁数据结构中的应用
AtomicReference 提供了对引用类型的原子操作,常用于实现无锁数据结构,提高并发性能。
案例:高性能库存预占系统
在电商秒杀场景中,库存抢占是一个典型的高并发场景。使用 AtomicReference 可以实现无锁的库存预占机制。
/**
* 高性能商品库存服务
*/
@Service
public class StockService {
/**
* 库存状态类,包含可用库存和已预占库存
*/
@Data
@AllArgsConstructor
private static class StockState {
// 可用库存
private int availableStock;
// 已预占库存
private int reservedStock;
// 版本号,用于乐观锁
private long version;
// 创建新的库存状态
public StockState createNext(int availableDelta, int reservedDelta) {
return new StockState(
availableStock + availableDelta,
reservedStock + reservedDelta,
version + 1
);
}
}
// 商品库存映射,商品ID -> 库存状态
private final ConcurrentHashMap<Long, AtomicReference<StockState>> stockCache =
new ConcurrentHashMap<>();
@Resource
private StockRepository stockRepository;
/**
* 初始化商品库存
*/
@PostConstruct
public void init() {
// 从数据库加载初始库存
List<ProductStock> stocks = stockRepository.findAll();
for (ProductStock stock : stocks) {
StockState state = new StockState(
stock.getAvailableStock(),
stock.getReservedStock(),
0
);
stockCache.put(stock.getProductId(), new AtomicReference<>(state));
}
}
/**
* 尝试预占库存
* @param productId 商品ID
* @param quantity 预占数量
* @return 是否预占成功
*/
public boolean tryReserveStock(Long productId, int quantity) {
// 获取商品库存引用
AtomicReference<StockState> reference = stockCache.get(productId);
if (reference == null) {
throw new IllegalArgumentException("商品不存在: " + productId);
}
// 使用CAS操作尝试预占库存
boolean reserved = false;
int attempts = 0;
int maxAttempts = 5; // 最大重试次数
while (!reserved && attempts < maxAttempts) {
StockState current = reference.get();
// 检查库存是否充足
if (current.availableStock < quantity) {
log.warn("商品{}库存不足,当前可用{},需要{}",
productId, current.availableStock, quantity);
return false;
}
// 创建新的库存状态
StockState next = current.createNext(-quantity, quantity);
// 尝试原子更新
reserved = reference.compareAndSet(current, next);
attempts++;
if (!reserved && attempts < maxAttempts) {
// 短暂等待后重试
LockSupport.parkNanos(10);
}
}
if (reserved) {
// 异步更新数据库
updateStockAsync(productId, reference.get());
} else {
log.warn("商品{}库存预占失败,已重试{}次", productId, attempts);
}
return reserved;
}
/**
* 释放预占库存
*/
public boolean releaseReservedStock(Long productId, int quantity) {
AtomicReference<StockState> reference = stockCache.get(productId);
if (reference == null) {
throw new IllegalArgumentException("商品不存在: " + productId);
}
boolean released = false;
int attempts = 0;
int maxAttempts = 5;
while (!released && attempts < maxAttempts) {
StockState current = reference.get();
// 检查预占库存是否足够
if (current.reservedStock < quantity) {
log.warn("商品{}预占库存不足,当前已预占{},需要释放{}",
productId, current.reservedStock, quantity);
return false;
}
// 创建新的库存状态
StockState next = current.createNext(quantity, -quantity);
// 尝试原子更新
released = reference.compareAndSet(current, next);
attempts++;
if (!released && attempts < maxAttempts) {
LockSupport.parkNanos(10);
}
}
if (released) {
// 异步更新数据库
updateStockAsync(productId, reference.get());
}
return released;
}
/**
* 获取当前商品库存状态
*/
public StockInfo getStockInfo(Long productId) {
AtomicReference<StockState> reference = stockCache.get(productId);
if (reference == null) {
throw new IllegalArgumentException("商品不存在: " + productId);
}
StockState state = reference.get();
return new StockInfo(
productId,
state.availableStock,
state.reservedStock,
state.availableStock + state.reservedStock
);
}
/**
* 异步更新数据库库存
*/
private void updateStockAsync(Long productId, StockState state) {
CompletableFuture.runAsync(() -> {
try {
// 乐观锁更新数据库
int updated = stockRepository.updateStock(
productId,
state.availableStock,
state.reservedStock,
state.version);
if (updated == 0) {
log.warn("数据库库存更新失败,可能发生并发冲突,商品ID: {}", productId);
}
} catch (Exception e) {
log.error("更新商品{}库存失败", productId, e);
}
});
}
@Data
@AllArgsConstructor
public static class StockInfo {
private Long productId;
private int availableStock;
private int reservedStock;
private int totalStock;
}
}
实际应用分析:
无锁设计:使用 CAS 操作代替传统锁,减少线程阻塞
乐观并发控制:采用乐观锁思想,在冲突较少的情况下性能更佳
内存与数据库分离:内存操作与数据库更新解耦,提高性能
异步持久化:库存变更异步写入数据库,减少关键路径延迟
有限重试:设置最大重试次数,避免活锁
场景三:AtomicStampedReference 解决 ABA 问题
在使用 CAS 操作时,可能遇到 ABA 问题(即值从 A 变为 B,又变回 A,导致 CAS 操作误判为没有变化)。AtomicStampedReference 通过引入版本号解决这个问题。
案例:分布式节点状态管理器
在分布式系统中,需要维护集群节点的状态,并且需要避免由于网络延迟等因素导致的错误状态判断。
/**
* 分布式集群节点状态管理器
*/
@Component
public class ClusterNodeManager {
/**
* 节点状态枚举
*/
public enum NodeStatus {
ONLINE, // 在线
OFFLINE, // 离线
SUSPECTED, // 疑似故障
MAINTAINING; // 维护中
}
/**
* 节点状态类
*/
@Data
@AllArgsConstructor
private static class NodeState {
private NodeStatus status;
private long lastHeartbeatTime;
@Override
public String toString() {
return status + "@" + lastHeartbeatTime;
}
}
// 节点状态映射,key为节点ID
private final ConcurrentHashMap<String, AtomicStampedReference<NodeState>> nodeStates =
new ConcurrentHashMap<>();
// 心跳超时时间(毫秒)
private static final long HEARTBEAT_TIMEOUT = 30000;
/**
* 节点注册
*/
public void registerNode(String nodeId) {
NodeState initialState = new NodeState(NodeStatus.ONLINE, System.currentTimeMillis());
// 初始版本号为0
nodeStates.put(nodeId, new AtomicStampedReference<>(initialState, 0));
log.info("节点已注册: {}, 状态: {}", nodeId, initialState);
}
/**
* 更新节点心跳
*/
public boolean updateHeartbeat(String nodeId, long currentTimestamp) {
AtomicStampedReference<NodeState> reference = nodeStates.get(nodeId);
if (reference == null) {
log.warn("未知节点心跳更新: {}", nodeId);
return false;
}
int maxAttempts = 3;
for (int i = 0; i < maxAttempts; i++) {
NodeState currentState = reference.getReference();
int currentStamp = reference.getStamp();
// 已离线节点不接受心跳更新
if (currentState.status == NodeStatus.OFFLINE) {
log.warn("离线节点心跳更新被拒绝: {}", nodeId);
return false;
}
// 忽略过期的心跳
if (currentTimestamp < currentState.lastHeartbeatTime) {
log.debug("忽略过期心跳: {}, current={}, received={}",
nodeId, currentState.lastHeartbeatTime, currentTimestamp);
return false;
}
// 创建新状态
NodeState newState = new NodeState(
// 如果节点之前是疑似故障状态,恢复为在线状态
currentState.status == NodeStatus.SUSPECTED
? NodeStatus.ONLINE : currentState.status,
currentTimestamp
);
// 使用CAS更新,带版本戳
if (reference.compareAndSet(currentState, newState, currentStamp, currentStamp + 1)) {
log.debug("节点{}心跳已更新: {} -> {}, stamp: {}",
nodeId, currentState, newState, currentStamp + 1);
return true;
}
// CAS失败,稍后重试
LockSupport.parkNanos(100);
}
log.warn("节点{}心跳更新失败,已达到最大重试次数", nodeId);
return false;
}
/**
* 将节点标记为疑似故障
*/
public boolean markNodeSuspected(String nodeId) {
return updateNodeStatus(nodeId, NodeStatus.SUSPECTED);
}
/**
* 将节点标记为维护中
*/
public boolean markNodeMaintaining(String nodeId) {
return updateNodeStatus(nodeId, NodeStatus.MAINTAINING);
}
/**
* 将节点标记为离线
*/
public boolean markNodeOffline(String nodeId) {
return updateNodeStatus(nodeId, NodeStatus.OFFLINE);
}
/**
* 更新节点状态
*/
private boolean updateNodeStatus(String nodeId, NodeStatus newStatus) {
AtomicStampedReference<NodeState> reference = nodeStates.get(nodeId);
if (reference == null) {
log.warn("未知节点状态更新: {}", nodeId);
return false;
}
int maxAttempts = 3;
for (int i = 0; i < maxAttempts; i++) {
NodeState currentState = reference.getReference();
int currentStamp = reference.getStamp();
// 已离线节点状态不可更改
if (currentState.status == NodeStatus.OFFLINE && newStatus != NodeStatus.OFFLINE) {
log.warn("离线节点状态更新被拒绝: {}", nodeId);
return false;
}
// 创建新状态
NodeState newState = new NodeState(newStatus, currentState.lastHeartbeatTime);
// 使用CAS更新,带版本戳
if (reference.compareAndSet(currentState, newState, currentStamp, currentStamp + 1)) {
log.info("节点{}状态已更新: {} -> {}, stamp: {}",
nodeId, currentState.status, newStatus, currentStamp + 1);
return true;
}
// CAS失败,稍后重试
LockSupport.parkNanos(100);
}
log.warn("节点{}状态更新失败,已达到最大重试次数", nodeId);
return false;
}
/**
* 检查节点状态
*/
public NodeStatusInfo getNodeStatus(String nodeId) {
AtomicStampedReference<NodeState> reference = nodeStates.get(nodeId);
if (reference == null) {
return null;
}
// 获取当前状态和版本戳
NodeState state = reference.getReference();
int stamp = reference.getStamp();
return new NodeStatusInfo(
nodeId,
state.status,
state.lastHeartbeatTime,
stamp,
System.currentTimeMillis() - state.lastHeartbeatTime > HEARTBEAT_TIMEOUT
);
}
/**
* 定期检查节点心跳,将超时节点标记为疑似故障
*/
@Scheduled(fixedRate = 10000) // 每10秒检查一次
public void checkNodeHeartbeats() {
long now = System.currentTimeMillis();
for (Map.Entry<String, AtomicStampedReference<NodeState>> entry : nodeStates.entrySet()) {
String nodeId = entry.getKey();
AtomicStampedReference<NodeState> reference = entry.getValue();
NodeState state = reference.getReference();
// 检查是否超时
if (state.status == NodeStatus.ONLINE &&
now - state.lastHeartbeatTime > HEARTBEAT_TIMEOUT) {
// 标记为疑似故障
if (markNodeSuspected(nodeId)) {
log.warn("节点{}心跳超时,已标记为疑似故障", nodeId);
// 触发故障检测或告警
triggerNodeFailureDetection(nodeId);
}
}
}
}
/**
* 触发节点故障检测
*/
private void triggerNodeFailureDetection(String nodeId) {
// 实现故障检测逻辑,如发送告警等
}
@Data
@AllArgsConstructor
public static class NodeStatusInfo {
private String nodeId;
private NodeStatus status;
private long lastHeartbeatTime;
private int versionStamp;
private boolean heartbeatTimeout;
}
}
实际应用分析:
版本戳控制:使用 AtomicStampedReference 的版本戳解决 ABA 问题
心跳机制:通过心跳更新和超时检测维护节点状态
可靠状态转换:防止非法的状态转换,如离线节点被误标记为在线
乐观并发控制:使用 CAS 操作进行无锁更新
透明版本追踪:通过版本戳追踪状态变更历史
锁机制的应用场景
JUC 提供了比 synchronized 更加灵活的锁机制,如 ReentrantLock、ReadWriteLock 和 StampedLock 等。这些锁在功能、性能特性上各有优势,在大厂的复杂业务场景中有着广泛应用。
场景一:ReentrantLock 实现细粒度锁
ReentrantLock 相比 synchronized 提供了更多的灵活性,包括尝试获取锁、可中断锁等特性,常用于需要精细控制的场景。
案例:余额支付系统的并发控制
在金融系统中,账户余额操作需要严格的并发控制,确保数据一致性。
/**
* 账户余额服务
*/
@Service
public class AccountBalanceService {
/**
* 账户余额锁映射
* 使用 ConcurrentHashMap 存储每个账户的锁对象
*/
private final ConcurrentHashMap<Long, ReentrantLock> accountLocks = new ConcurrentHashMap<>();
/**
* 账户余额缓存
*/
private final ConcurrentHashMap<Long, BigDecimal> balanceCache = new ConcurrentHashMap<>();
@Resource
private AccountRepository accountRepository;
@Resource
private TransactionLogRepository transactionLogRepository;
/**
* 获取指定账户的锁
*/
private ReentrantLock getAccountLock(Long accountId) {
return accountLocks.computeIfAbsent(accountId, k -> new ReentrantLock(true));
}
/**
* 充值金额
* @param accountId 账户ID
* @param amount 充值金额
* @param transactionId 交易ID
* @return 操作结果
*/
public OperationResult deposit(Long accountId, BigDecimal amount, String transactionId) {
if (amount.compareTo(BigDecimal.ZERO) <= 0) {
return OperationResult.fail("充值金额必须大于0");
}
// 获取账户锁
ReentrantLock lock = getAccountLock(accountId);
try {
// 尝试获取锁,设置超时时间
if (!lock.tryLock(5, TimeUnit.SECONDS)) {
return OperationResult.fail("账户操作繁忙,请稍后再试");
}
try {
// 检查交易ID是否重复
if (transactionLogRepository.existsByTransactionId(transactionId)) {
return OperationResult.fail("交易已存在,请勿重复提交");
}
// 读取当前余额
BigDecimal currentBalance = getBalance(accountId);
// 计算新余额
BigDecimal newBalance = currentBalance.add(amount);
// 执行更新操作
int updated = accountRepository.updateBalance(accountId, newBalance);
if (updated != 1) {
return OperationResult.fail("余额更新失败,账户可能不存在");
}
// 更新缓存
balanceCache.put(accountId, newBalance);
// 记录交易日志
TransactionLog log = new TransactionLog();
log.setAccountId(accountId);
log.setTransactionId(transactionId);
log.setAmount(amount);
log.setType(TransactionType.DEPOSIT);
log.setBalance(newBalance);
log.setCreateTime(new Date());
transactionLogRepository.save(log);
return OperationResult.success("充值成功", newBalance);
} finally {
// 释放锁
lock.unlock();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return OperationResult.fail("操作被中断");
} catch (Exception e) {
log.error("充值操作异常", e);
return OperationResult.fail("系统异常: " + e.getMessage());
}
}
/**
* 扣款操作
* @param accountId 账户ID
* @param amount 扣款金额
* @param transactionId 交易ID
* @return 操作结果
*/
public OperationResult withdraw(Long accountId, BigDecimal amount, String transactionId) {
if (amount.compareTo(BigDecimal.ZERO) <= 0) {
return OperationResult.fail("扣款金额必须大于0");
}
// 获取账户锁
ReentrantLock lock = getAccountLock(accountId);
try {
// 尝试获取锁,设置超时时间
if (!lock.tryLock(5, TimeUnit.SECONDS)) {
return OperationResult.fail("账户操作繁忙,请稍后再试");
}
try {
// 检查交易ID是否重复
if (transactionLogRepository.existsByTransactionId(transactionId)) {
return OperationResult.fail("交易已存在,请勿重复提交");
}
// 读取当前余额
BigDecimal currentBalance = getBalance(accountId);
// 检查余额是否充足
if (currentBalance.compareTo(amount) < 0) {
return OperationResult.fail("账户余额不足");
}
// 计算新余额
BigDecimal newBalance = currentBalance.subtract(amount);
// 执行更新操作
int updated = accountRepository.updateBalance(accountId, newBalance);
if (updated != 1) {
return OperationResult.fail("余额更新失败,账户可能不存在");
}
// 更新缓存
balanceCache.put(accountId, newBalance);
// 记录交易日志
TransactionLog log = new TransactionLog();
log.setAccountId(accountId);
log.setTransactionId(transactionId);
log.setAmount(amount.negate());
log.setType(TransactionType.WITHDRAW);
log.setBalance(newBalance);
log.setCreateTime(new Date());
transactionLogRepository.save(log);
return OperationResult.success("扣款成功", newBalance);
} finally {
// 释放锁
lock.unlock();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return OperationResult.fail("操作被中断");
} catch (Exception e) {
log.error("扣款操作异常", e);
return OperationResult.fail("系统异常: " + e.getMessage());
}
}
/**
* 转账操作
*/
public OperationResult transfer(Long fromAccountId, Long toAccountId,
BigDecimal amount, String transactionId) {
if (amount.compareTo(BigDecimal.ZERO) <= 0) {
return OperationResult.fail("转账金额必须大于0");
}
if (fromAccountId.equals(toAccountId)) {
return OperationResult.fail("不能转账给自己");
}
// 按账户ID排序,避免死锁
Long firstLockAccount = Math.min(fromAccountId, toAccountId);
Long secondLockAccount = Math.max(fromAccountId, toAccountId);
ReentrantLock firstLock = getAccountLock(firstLockAccount);
ReentrantLock secondLock = getAccountLock(secondLockAccount);
try {
// 尝试获取第一个锁
if (!firstLock.tryLock(5, TimeUnit.SECONDS)) {
return OperationResult.fail("账户操作繁忙,请稍后再试");
}
try {
// 尝试获取第二个锁
if (!secondLock.tryLock(5, TimeUnit.SECONDS)) {
return OperationResult.fail("账户操作繁忙,请稍后再试");
}
try {
// 检查交易ID是否重复
if (transactionLogRepository.existsByTransactionId(transactionId)) {
return OperationResult.fail("交易已存在,请勿重复提交");
}
// 读取转出账户余额
BigDecimal fromBalance = getBalance(fromAccountId);
// 检查余额是否充足
if (fromBalance.compareTo(amount) < 0) {
return OperationResult.fail("转出账户余额不足");
}
// 读取转入账户余额
BigDecimal toBalance = getBalance(toAccountId);
// 计算新余额
BigDecimal newFromBalance = fromBalance.subtract(amount);
BigDecimal newToBalance = toBalance.add(amount);
// 执行更新操作(事务保证)
boolean success = accountRepository.transferBalance(
fromAccountId, toAccountId,
newFromBalance, newToBalance);
if (!success) {
return OperationResult.fail("转账失败,请稍后再试");
}
// 更新缓存
balanceCache.put(fromAccountId, newFromBalance);
balanceCache.put(toAccountId, newToBalance);
// 记录交易日志
saveTransferLogs(fromAccountId, toAccountId, amount, transactionId,
newFromBalance, newToBalance);
return OperationResult.success("转账成功", newFromBalance);
} finally {
secondLock.unlock();
}
} finally {
firstLock.unlock();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return OperationResult.fail("操作被中断");
} catch (Exception e) {
log.error("转账操作异常", e);
return OperationResult.fail("系统异常: " + e.getMessage());
}
}
/**
* 获取账户余额
*/
public BigDecimal getBalance(Long accountId) {
// 优先从缓存获取
BigDecimal cachedBalance = balanceCache.get(accountId);
if (cachedBalance != null) {
return cachedBalance;
}
// 缓存未命中,从数据库加载
BigDecimal balance = accountRepository.findBalanceById(accountId);
if (balance != null) {
balanceCache.put(accountId, balance);
}
return balance != null ? balance : BigDecimal.ZERO;
}
/**
* 保存转账日志
*/
private void saveTransferLogs(Long fromAccountId, Long toAccountId,
BigDecimal amount, String transactionId,
BigDecimal fromBalance, BigDecimal toBalance) {
// 转出日志
TransactionLog fromLog = new TransactionLog();
fromLog.setAccountId(fromAccountId);
fromLog.setTransactionId(transactionId + "-from");
fromLog.setAmount(amount.negate());
fromLog.setType(TransactionType.TRANSFER_OUT);
fromLog.setRelatedAccountId(toAccountId);
fromLog.setBalance(fromBalance);
fromLog.setCreateTime(new Date());
// 转入日志
TransactionLog toLog = new TransactionLog();
toLog.setAccountId(toAccountId);
toLog.setTransactionId(transactionId + "-to");
toLog.setAmount(amount);
toLog.setType(TransactionType.TRANSFER_IN);
toLog.setRelatedAccountId(fromAccountId);
toLog.setBalance(toBalance);
toLog.setCreateTime(new Date());
// 批量保存
transactionLogRepository.saveAll(Arrays.asList(fromLog, toLog));
}
/**
* 清理长时间不使用的锁
*/
@Scheduled(fixedRate = 3600000) // 每小时执行一次
public void cleanupUnusedLocks() {
// 实际实现可能需要额外的锁使用跟踪逻辑
// 这里简化处理,仅模拟清理过程
int count = 0;
for (Iterator<Map.Entry<Long, ReentrantLock>> it = accountLocks.entrySet().iterator();
it.hasNext();) {
Map.Entry<Long, ReentrantLock> entry = it.next();
ReentrantLock lock = entry.getValue();
if (!lock.isLocked()) {
it.remove();
count++;
}
}
if (count > 0) {
log.info("已清理{}个未使用的账户锁", count);
}
}
@Data
@AllArgsConstructor
public static class OperationResult {
private boolean success;
private String message;
private BigDecimal balance;
public static OperationResult success(String message, BigDecimal balance) {
return new OperationResult(true, message, balance);
}
public static OperationResult fail(String message) {
return new OperationResult(false, message, null);
}
}
public enum TransactionType {
DEPOSIT, // 充值
WITHDRAW, // 提现
TRANSFER_OUT, // 转出
TRANSFER_IN // 转入
}
}
实际应用分析:
细粒度锁控制:按账户ID创建锁,避免不同账户间的相互影响
公平锁设置:使用公平锁模式,确保先请求的线程先获得锁,防止饥饿
锁超时控制:通过 tryLock 设置超时时间,避免长时间等待导致系统卡顿
有序锁获取:转账时按ID排序获取锁,防止死锁
锁清理机制:定期清理不再使用的锁对象,避免内存泄漏
场景二:ReadWriteLock 实现高并发读写分离
ReadWriteLock 提供了读写分离的锁机制,允许多个线程同时读取共享资源,但写操作需要独占锁。这种机制在读多写少的场景中能显著提高并发性能。
案例:分布式配置中心的本地缓存
在分布式系统中,配置中心常需要高性能的本地缓存,以减少配置查询的延迟。
/**
* 配置中心本地缓存
*/
@Component
public class ConfigurationCache {
// 配置数据存储
private final Map<String, String> configMap = new HashMap<>();
// 读写锁
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
// 配置版本号
private volatile long configVersion = 0;
@Resource
private RemoteConfigService remoteConfigService;
/**
* 初始化加载配置
*/
@PostConstruct
public void init() {
refreshConfigs();
}
/**
* 获取配置项
* @param key 配置键
* @param defaultValue 默认值
* @return 配置值
*/
public String getConfig(String key, String defaultValue) {
readLock.lock();
try {
return configMap.getOrDefault(key, defaultValue);
} finally {
readLock.unlock();
}
}
/**
* 批量获取配置项
* @param keys 配置键集合
* @return 配置映射
*/
public Map<String, String> getConfigs(Collection<String> keys) {
Map<String, String> result = new HashMap<>();
readLock.lock();
try {
for (String key : keys) {
String value = configMap.get(key);
if (value != null) {
result.put(key, value);
}
}
return result;
} finally {
readLock.unlock();
}
}
/**
* 获取当前配置版本
*/
public long getConfigVersion() {
return configVersion;
}
/**
* 本地更新配置
* @param key 配置键
* @param value 配置值
*/
public void updateConfig(String key, String value) {
writeLock.lock();
try {
configMap.put(key, value);
configVersion++;
// 异步同步到远程配置中心
CompletableFuture.runAsync(() -> {
try {
remoteConfigService.updateConfig(key, value);
} catch (Exception e) {
log.error("同步配置到远程服务失败: {}", key, e);
}
});
} finally {
writeLock.unlock();
}
}
/**
* 本地批量更新配置
* @param configs 配置映射
*/
public void updateConfigs(Map<String, String> configs) {
if (configs == null || configs.isEmpty()) {
return;
}
writeLock.lock();
try {
configMap.putAll(configs);
configVersion++;
// 异步同步到远程配置中心
CompletableFuture.runAsync(() -> {
try {
remoteConfigService.updateConfigs(configs);
} catch (Exception e) {
log.error("批量同步配置到远程服务失败", e);
}
});
} finally {
writeLock.unlock();
}
}
/**
* 从远程配置中心刷新配置
* @return 是否刷新成功
*/
public boolean refreshConfigs() {
try {
// 获取远程配置和版本号
RemoteConfigResult result = remoteConfigService.getAllConfigs();
// 检查版本号,如果本地版本已经是最新,则无需更新
if (result.getVersion() <= configVersion) {
log.debug("本地配置已是最新版本,无需更新, 当前版本: {}", configVersion);
return true;
}
writeLock.lock();
try {
// 再次检查版本号(双重检查,避免锁等待期间版本已更新)
if (result.getVersion() <= configVersion) {
return true;
}
// 清空并重新加载所有配置
configMap.clear();
configMap.putAll(result.getConfigs());
configVersion = result.getVersion();
log.info("配置已刷新,当前版本: {}", configVersion);
return true;
} finally {
writeLock.unlock();
}
} catch (Exception e) {
log.error("刷新配置失败", e);
return false;
}
}
/**
* 远程配置结果
*/
@Data
@AllArgsConstructor
public static class RemoteConfigResult {
private Map<String, String> configs;
private long version;
}
}
实际应用分析:
读写分离:使用 ReadWriteLock 允许多个线程同时读取配置,提高读取性能
写入独占:更新配置时获取写锁,确保数据一致性
版本控制:使用版本号跟踪配置变更,避免不必要的更新
双重检查:获取写锁前后都检查版本号,减少锁竞争
异步同步:配置更新异步同步到远程服务,不阻塞本地操作
场景三:StampedLock 实现乐观读取
StampedLock 是 Java 8 引入的一种新型锁机制,提供了乐观读模式,可以在不获取读锁的情况下读取数据,在读多写极少的场景下性能更优。
案例:地理位置服务的坐标缓存
在地图服务中,需要高性能地读取和更新位置坐标,而更新频率远低于读取频率。
/**
* 位置坐标缓存
*/
@Component
public class LocationCache {
/**
* 位置数据
*/
@Data
@AllArgsConstructor
public static class LocationData {
private double latitude; // 纬度
private double longitude; // 经度
private String address; // 地址描述
private long updateTime; // 更新时间
// 深拷贝
public LocationData copy() {
return new LocationData(latitude, longitude, address, updateTime);
}
}
// 位置缓存
private final ConcurrentHashMap<String, LocationData> locationCache = new ConcurrentHashMap<>();
// 使用 StampedLock 实现高性能读取
private final ConcurrentHashMap<String, StampedLock> locationLocks = new ConcurrentHashMap<>();
// 缓存过期时间(毫秒)
private static final long CACHE_EXPIRE_TIME = TimeUnit.MINUTES.toMillis(30);
/**
* 获取位置锁
*/
private StampedLock getLocationLock(String id) {
return locationLocks.computeIfAbsent(id, k -> new StampedLock());
}
/**
* 获取位置数据
* @param id 位置ID
* @param loadFunction 加载函数,当缓存未命中或失效时调用
* @return 位置数据
*/
public LocationData getLocation(String id, Function<String, LocationData> loadFunction) {
// 1. 尝试从缓存获取
LocationData cachedData = locationCache.get(id);
if (cachedData != null && !isExpired(cachedData)) {
return cachedData.copy();
}
// 2. 缓存未命中或已过期,加载数据
StampedLock lock = getLocationLock(id);
// 3. 尝试乐观读
long stamp = lock.tryOptimisticRead();
cachedData = locationCache.get(id);
// 4. 验证乐观读有效性,并检查是否已有其他线程更新了缓存
if (stamp != 0 && lock.validate(stamp) && cachedData != null && !isExpired(cachedData)) {
return cachedData.copy();
}
// 5. 乐观读失败,转换为悲观读
stamp = lock.readLock();
try {
// 再次检查缓存
cachedData = locationCache.get(id);
if (cachedData != null && !isExpired(cachedData)) {
return cachedData.copy();
}
// 6. 缓存仍未命中或已过期,释放读锁,获取写锁
long ws = lock.tryConvertToWriteLock(stamp);
if (ws != 0) {
// 写锁获取成功
stamp = ws;
// 加载数据
LocationData newData = loadFunction.apply(id);
if (newData != null) {
locationCache.put(id, newData);
return newData.copy();
} else {
return null;
}
} else {
// 转换为写锁失败,手动释放读锁并获取写锁
lock.unlockRead(stamp);
stamp = lock.writeLock();
try {
// 再次检查缓存(可能已有其他线程更新)
cachedData = locationCache.get(id);
if (cachedData != null && !isExpired(cachedData)) {
return cachedData.copy();
}
// 加载数据
LocationData newData = loadFunction.apply(id);
if (newData != null) {
locationCache.put(id, newData);
return newData.copy();
} else {
return null;
}
} finally {
lock.unlockWrite(stamp);
}
}
} finally {
// 如果仍持有读锁,释放它
if (StampedLock.isReadLockStamp(stamp)) {
lock.unlockRead(stamp);
}
}
}
/**
* 更新位置数据
* @param id 位置ID
* @param location 新的位置数据
*/
public void updateLocation(String id, LocationData location) {
if (id == null || location == null) {
throw new IllegalArgumentException("位置ID和数据不能为空");
}
// 设置更新时间
location.setUpdateTime(System.currentTimeMillis());
// 获取写锁
StampedLock lock = getLocationLock(id);
long stamp = lock.writeLock();
try {
// 更新缓存
locationCache.put(id, location);
} finally {
lock.unlockWrite(stamp);
}
}
/**
* 检查数据是否过期
*/
private boolean isExpired(LocationData data) {
return System.currentTimeMillis() - data.getUpdateTime() > CACHE_EXPIRE_TIME;
}
/**
* 清理过期缓存
*/
@Scheduled(fixedRate = 300000) // 每5分钟执行一次
public void cleanupExpiredCache() {
int removedCount = 0;
for (Iterator<Map.Entry<String, LocationData>> it = locationCache.entrySet().iterator();
it.hasNext();) {
Map.Entry<String, LocationData> entry = it.next();
if (isExpired(entry.getValue())) {
it.remove();
removedCount++;
}
}
if (removedCount > 0) {
log.info("已清理{}个过期位置缓存", removedCount);
}
// 清理不再使用的锁
for (Iterator<Map.Entry<String, StampedLock>> it = locationLocks.entrySet().iterator();
it.hasNext();) {
Map.Entry<String, StampedLock> entry = it.next();
if (!locationCache.containsKey(entry.getKey())) {
it.remove();
}
}
}
}
使用示例:
@Service
public class LocationService {
@Resource
private LocationCache locationCache;
@Resource
private LocationRepository locationRepository;
/**
* 获取位置信息
*/
public LocationInfo getLocationInfo(String locationId) {
LocationCache.LocationData location = locationCache.getLocation(
locationId,
this::loadLocationFromDb
);
if (location == null) {
return null;
}
return new LocationInfo(
locationId,
location.getLatitude(),
location.getLongitude(),
location.getAddress()
);
}
/**
* 从数据库加载位置数据
*/
private LocationCache.LocationData loadLocationFromDb(String locationId) {
LocationEntity entity = locationRepository.findById(locationId).orElse(null);
if (entity == null) {
return null;
}
return new LocationCache.LocationData(
entity.getLatitude(),
entity.getLongitude(),
entity.getAddress(),
System.currentTimeMillis()
);
}
/**
* 更新位置信息
*/
@Transactional
public void updateLocation(LocationUpdateRequest request) {
// 更新数据库
LocationEntity entity = locationRepository.findById(request.getLocationId())
.orElseThrow(() -> new NotFoundException("位置信息不存在"));
entity.setLatitude(request.getLatitude());
entity.setLongitude(request.getLongitude());
entity.setAddress(request.getAddress());
entity.setUpdateTime(new Date());
locationRepository.save(entity);
// 更新缓存
LocationCache.LocationData locationData = new LocationCache.LocationData(
request.getLatitude(),
request.getLongitude(),
request.getAddress(),
System.currentTimeMillis()
);
locationCache.updateLocation(request.getLocationId(), locationData);
}
}
实际应用分析:
乐观读优先:首先尝试乐观读,在大多数情况下无需加锁,提高并发性能
锁升级机制:失败时逐级升级,从乐观读到悲观读,再到写锁
锁转换:尝试将读锁直接转换为写锁,减少锁释放和获取的开销
深拷贝返回:返回对象的副本而非直接引用,避免外部修改影响缓存
定期清理:定期清理过期缓存和不再使用的锁对象,防止内存泄漏
结论与最佳实践
JUC 并发工具类在大厂的实际应用中展现出强大的性能和灵活性。通过本文的案例分析,我们可以总结出以下最佳实践:
线程池最佳实践
合理设置参数:核心线程数、最大线程数和队列容量应当根据业务特性和服务器资源合理配置
区分业务类型:不同业务类型使用独立的线程池,避免互相影响
监控和告警:实时监控线程池状态,包括活跃线程数、队列深度、拒绝次数等指标
优雅关闭:系统停止时,应当给予线程池足够的时间处理完已提交的任务
自定义拒绝策略:根据业务需求实现自定义的拒绝策略,如延迟重试、降级处理等
同步工具类最佳实践
超时控制:使用带超时参数的 await 方法,避免永久等待
异常处理:在 finally 块中确保 countDown 或释放信号量,避免死锁
合理使用:根据场景选择合适的同步工具,如需多次使用选择 CyclicBarrier,一次性等待选择 CountDownLatch
避免嵌套:避免在持有一个同步工具的情况下请求另一个,可能导致死锁
并发容器最佳实践
场景匹配:根据读写比例选择合适的容器,如读多写少场景优先考虑 ConcurrentHashMap 和 CopyOnWriteArrayList
批量操作:优先使用批量添加/获取方法,如 putAll、getAll 等,减少操作次数
原子操作利用:使用容器提供的原子操作方法,如 computeIfAbsent、putIfAbsent 等
容量预估:合理预估初始容量,减少扩容开销
原子操作类最佳实践
选择合适的实现:高并发计数场景优先考虑 LongAdder 而非 AtomicLong
避免忙等待:使用带 BackOff 策略的重试机制,而非简单循环尝试 CAS
复合操作注意:多个原子变量的操作不能保证整体原子性,必要时配合锁使用
版本控制:使用 AtomicStampedReference 解决 ABA 问题
锁机制最佳实践
最小化锁范围:尽量减小锁的粒度和持有时间
读写锁区分:在读多写少场景中使用 ReadWriteLock 或 StampedLock
避免死锁:获取多个锁时保持一致的顺序,使用 tryLock 避免无限等待
锁分离:使用不同的锁对象控制不同的资源,提高并发性
锁升级策略:优先使用乐观策略,失败时再升级为悲观锁
实际应用中,通常需要综合使用多种并发工具类,根据具体场景选择最合适的组合。更重要的是,需要持续监控系统性能,识别潜在的并发瓶颈,并不断优化线程模型和并发策略。
在大厂的工程实践中,除了正确使用 JUC 工具类外,还需要结合微服务架构、分布式系统设计原则,以及 DevOps 最佳实践,才能构建出真正高性能、高可用的并发系统。