Skip to content

JUC 并发工具类应用场景详解

简介

Java 并发工具包(java.util.concurrent,简称 JUC)是 Java 处理并发编程的核心 API,它提供了一系列高性能、线程安全的工具类,用于解决各种并发编程场景的问题。在互联网大厂的高并发、分布式系统环境中,JUC 工具类扮演着至关重要的角色,是构建高效稳定系统的基石。

本文将深入剖析 JUC 并发工具类在实际业务场景中的应用,包括常见的并发控制、线程同步、异步处理等方面的实践经验及优化方案。

基本概念

在深入应用场景之前,我们先简要回顾 JUC 包中的核心组件分类:

  1. 线程池工具:ExecutorService、ThreadPoolExecutor、ScheduledThreadPoolExecutor 等

  2. 同步工具类:CountDownLatch、CyclicBarrier、Semaphore、Phaser 等

  3. 并发集合:ConcurrentHashMap、CopyOnWriteArrayList、BlockingQueue 等

  4. 原子操作类:AtomicInteger、AtomicReference、LongAdder 等

  5. 锁机制:ReentrantLock、ReadWriteLock、StampedLock 等

  6. CompletableFuture:用于异步编程的工具类

以上工具类各有特点,针对不同的并发场景提供了专业的解决方案。接下来,我们将结合实际业务场景,深入分析这些工具类的应用。

线程池的应用场景

线程池是 JUC 中最常用的工具之一,在大厂的应用中几乎无处不在。线程池通过复用线程、控制并发数量,有效减少线程创建与销毁的开销,提高系统的整体性能。

场景一:电商平台的订单处理系统

在电商平台的订单系统中,下单后需要进行一系列异步操作,如库存扣减、支付确认、物流通知等。这些操作可以并行执行,且对实时性要求较高。

java
/**
 * 电商订单处理线程池配置
 */
@Configuration
public class OrderThreadPoolConfig {

    @Bean("orderProcessThreadPool")
    public ThreadPoolExecutor orderProcessThreadPool() {
        // 核心线程数:根据业务量和服务器配置确定
        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
        // 最大线程数:通常为核心线程数的2倍
        int maximumPoolSize = corePoolSize * 2;
        // 线程存活时间:非核心线程的空闲存活时间
        long keepAliveTime = 60L;
        // 使用有界队列,防止任务堆积导致OOM
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(1000);
        // 自定义线程工厂,便于排查问题
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("order-process-pool-%d")
                .setUncaughtExceptionHandler(new LoggingUncaughtExceptionHandler())
                .build();
        // 拒绝策略:使用CallerRunsPolicy,防止系统崩溃的同时减缓请求速度
        RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
        
        return new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                workQueue,
                threadFactory,
                handler);
    }
    
    // 自定义异常处理器,用于记录线程池中未捕获的异常
    private static class LoggingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
        private static final Logger logger = LoggerFactory.getLogger(LoggingUncaughtExceptionHandler.class);
        
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            logger.error("订单处理线程池发生未捕获异常, 线程名: {}", t.getName(), e);
        }
    }
}

实际应用分析

  1. 核心线程数设置:基于 CPU 核心数而非固定值,适应不同服务器配置

  2. 有界队列:使用 ArrayBlockingQueue 而非 LinkedBlockingQueue,防止无限制接收任务导致 OOM

  3. 拒绝策略:选择 CallerRunsPolicy,在系统负载过高时能够起到限流作用

  4. 自定义线程工厂:统一的线程命名和异常处理,便于问题定位

订单处理服务中的使用示例:

java
@Service
public class OrderProcessService {
    
    @Resource(name = "orderProcessThreadPool")
    private ThreadPoolExecutor orderProcessThreadPool;
    
    @Resource
    private InventoryService inventoryService;
    
    @Resource
    private PaymentService paymentService;
    
    @Resource
    private LogisticsService logisticsService;
    
    /**
     * 处理新订单
     */
    public void processNewOrder(Order order) {
        // 提交库存操作任务
        orderProcessThreadPool.execute(() -> {
            try {
                inventoryService.deductInventory(order);
            } catch (Exception e) {
                // 异常处理,可能涉及重试或补偿
                log.error("库存扣减失败,订单号: {}", order.getOrderId(), e);
            }
        });
        
        // 提交支付确认任务
        orderProcessThreadPool.execute(() -> {
            try {
                paymentService.confirmPayment(order);
            } catch (Exception e) {
                log.error("支付确认失败,订单号: {}", order.getOrderId(), e);
            }
        });
        
        // 提交物流任务
        orderProcessThreadPool.execute(() -> {
            try {
                logisticsService.createShippingOrder(order);
            } catch (Exception e) {
                log.error("创建物流单失败,订单号: {}", order.getOrderId(), e);
            }
        });
    }
}

场景二:定时任务调度系统

在大型互联网应用中,定时任务是常见需求,如数据统计、缓存更新、定时通知等。ScheduledThreadPoolExecutor 提供了高精度的定时执行能力。

java
/**
 * 定时任务线程池配置
 */
@Configuration
public class ScheduledTaskConfig {

    @Bean
    public ScheduledThreadPoolExecutor scheduledTaskExecutor() {
        // 核心线程数:基于定时任务数量和执行频率评估
        int corePoolSize = 10;
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("scheduled-task-%d")
                .setDaemon(true) // 设置为守护线程
                .build();
                
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
                corePoolSize, 
                threadFactory);
                
        // 配置任务取消后从队列中移除
        executor.setRemoveOnCancelPolicy(true);
        
        return executor;
    }
}

实际使用案例:

java
@Service
public class DataAnalysisService {

    @Resource
    private ScheduledThreadPoolExecutor scheduledTaskExecutor;
    
    @Resource
    private MetricsRepository metricsRepository;
    
    /**
     * 启动数据分析任务
     */
    @PostConstruct
    public void startAnalysisTasks() {
        // 每10分钟执行一次用户行为分析
        scheduledTaskExecutor.scheduleAtFixedRate(
                this::analyzeUserBehavior,
                10, 
                10, 
                TimeUnit.MINUTES);
                
        // 每小时执行一次交易数据汇总
        scheduledTaskExecutor.scheduleAtFixedRate(
                this::aggregateTransactionData,
                0, 
                1, 
                TimeUnit.HOURS);
                
        // 每天凌晨2点执行日报表生成
        scheduledTaskExecutor.scheduleAtFixedRate(
                this::generateDailyReport,
                getTimeToNextDay(2), // 计算到凌晨2点的延迟
                24, 
                TimeUnit.HOURS);
    }
    
    /**
     * 计算当前时间到次日指定小时的延迟(秒)
     */
    private long getTimeToNextDay(int hour) {
        LocalDateTime now = LocalDateTime.now();
        LocalDateTime nextRun = now.toLocalDate().plusDays(1).atTime(hour, 0);
        if (now.toLocalTime().isAfter(LocalTime.of(hour, 0))) {
            nextRun = nextRun.plusDays(1);
        }
        return ChronoUnit.SECONDS.between(now, nextRun);
    }
    
    // 具体任务实现方法...
    private void analyzeUserBehavior() {
        try {
            // 实现用户行为分析逻辑
            metricsRepository.analyzeUserMetrics();
        } catch (Exception e) {
            log.error("用户行为分析任务执行失败", e);
        }
    }
    
    private void aggregateTransactionData() {
        // 交易数据汇总逻辑
    }
    
    private void generateDailyReport() {
        // 日报表生成逻辑
    }
}

最佳实践与优化

  1. 避免任务堆积:ScheduledThreadPoolExecutor 对于固定频率任务,如果前一次执行未完成,会延迟执行下一次任务,但不会并发执行。在业务设计上要控制好单次任务的执行时间

  2. 优雅停机:系统关闭时应当调用 scheduledTaskExecutor.shutdown(),并设置合理的等待时间

  3. 任务隔离:不同类型、不同优先级的定时任务应使用不同的线程池,避免互相影响

  4. 监控告警:实时监控线程池状态,包括活跃线程数、队列深度等指标,发现异常及时告警

场景三:微服务 API 网关的请求处理

在微服务架构中,API 网关负责请求路由、限流、鉴权等功能,需要高效处理大量并发请求。

java
/**
 * API 网关线程池配置
 */
@Configuration
public class GatewayThreadPoolConfig {

    @Value("${gateway.thread.core-size:200}")
    private int coreSize;
    
    @Value("${gateway.thread.max-size:400}")
    private int maxSize;
    
    @Value("${gateway.thread.queue-capacity:1000}")
    private int queueCapacity;
    
    @Value("${gateway.thread.keep-alive:60}")
    private int keepAliveSeconds;

    @Bean("apiGatewayExecutor")
    public ThreadPoolExecutor apiGatewayExecutor() {
        // 使用自定义队列,支持获取队列剩余容量
        ResizableCapacityLinkedBlockingQueue<Runnable> workQueue = 
                new ResizableCapacityLinkedBlockingQueue<>(queueCapacity);
                
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("api-gateway-%d")
                .build();
                
        // 自定义拒绝策略:记录指标并返回特定错误码
        RejectedExecutionHandler handler = new GatewayRejectedExecutionHandler();
        
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                coreSize,
                maxSize,
                keepAliveSeconds,
                TimeUnit.SECONDS,
                workQueue,
                threadFactory,
                handler);
                
        // 允许核心线程超时,提高资源利用率
        executor.allowCoreThreadTimeOut(true);
        
        return executor;
    }
    
    /**
     * 自定义拒绝策略
     */
    private static class GatewayRejectedExecutionHandler implements RejectedExecutionHandler {
        private final Counter rejectedRequests = 
                Metrics.counter("gateway.rejected.requests");
                
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // 记录指标
            rejectedRequests.increment();
            
            // 抛出特定异常,由全局异常处理器转换为 HTTP 429 响应
            throw new ServiceBusyException("服务繁忙,请稍后重试");
        }
    }
    
    /**
     * 可调整容量的 LinkedBlockingQueue
     */
    private static class ResizableCapacityLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
        public ResizableCapacityLinkedBlockingQueue(int capacity) {
            super(capacity);
        }
        
        public boolean setCapacity(int capacity) {
            // 实现队列容量动态调整
            // 省略实现细节
            return true;
        }
    }
}

应用示例:

java
@Component
public class ApiGatewayHandler {

    @Resource(name = "apiGatewayExecutor")
    private ThreadPoolExecutor executor;
    
    @Resource
    private RouterService routerService;
    
    @Resource
    private RequestLimiter requestLimiter;
    
    /**
     * 处理 API 请求
     */
    public Mono<ServerResponse> handleRequest(ServerRequest request) {
        // 限流检查
        if (!requestLimiter.tryAcquire()) {
            return ServerResponse.status(HttpStatus.TOO_MANY_REQUESTS)
                    .bodyValue("请求过于频繁,请稍后重试");
        }
        
        return Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
            // 1. 请求预处理(鉴权、参数校验等)
            processRequest(request);
            
            // 2. 路由到目标服务
            return routerService.route(request);
        }, executor))
        .onErrorResume(this::handleError);
    }
    
    private Mono<ServerResponse> handleError(Throwable error) {
        if (error instanceof ServiceBusyException) {
            return ServerResponse.status(HttpStatus.TOO_MANY_REQUESTS)
                    .bodyValue(error.getMessage());
        }
        // 其他异常处理
        return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .bodyValue("服务器内部错误");
    }
    
    // 请求处理逻辑
    private void processRequest(ServerRequest request) {
        // 实现请求处理逻辑
    }
}

关键优化实践

  1. 参数可配置化:线程池核心参数通过配置中心动态调整,适应流量变化

  2. 自定义拒绝策略:结合业务需求,实现定制化的拒绝处理

  3. 动态队列容量:实现队列容量的动态调整,应对突发流量

  4. 监控指标:记录线程池饱和度、拒绝请求数等关键指标

  5. 超时控制:为每个任务设置超时控制,避免长时间运行的任务占用线程资源

同步工具类的应用场景

JUC 提供了多种同步工具类,用于协调多个线程之间的协作。这些同步工具类在不同场景下有各自的应用优势,下面是它们在大厂实际应用中的案例。

场景一:CountDownLatch 在分布式任务协调中的应用

CountDownLatch 是一个计数器闭锁,用于等待一组线程完成工作后再继续执行。在大厂环境中,常用于并行任务处理、批量数据导入等场景。

案例:大数据平台的并行 ETL 处理

在数据仓库 ETL 过程中,需要从多个数据源并行抽取数据,全部完成后再进行汇总处理:

java
/**
 * 并行 ETL 处理服务
 */
@Service
public class ParallelETLService {

    @Resource
    private ThreadPoolExecutor etlThreadPool;
    
    @Resource
    private List<DataSourceConnector> dataSourceConnectors;
    
    @Resource
    private DataMergeService dataMergeService;
    
    /**
     * 执行并行 ETL 任务
     */
    public ETLResult performParallelETL(ETLContext context) {
        int sourceCount = dataSourceConnectors.size();
        // 创建 CountDownLatch,计数器为数据源数量
        CountDownLatch latch = new CountDownLatch(sourceCount);
        
        // 用于收集各数据源处理结果
        List<DataExtractResult> extractResults = 
                Collections.synchronizedList(new ArrayList<>(sourceCount));
        
        // 提交并行抽取任务
        for (DataSourceConnector connector : dataSourceConnectors) {
            etlThreadPool.execute(() -> {
                try {
                    // 执行数据抽取转换
                    DataExtractResult result = connector.extractAndTransform(context);
                    extractResults.add(result);
                } catch (Exception e) {
                    log.error("数据源[{}]处理失败", connector.getName(), e);
                    // 添加错误结果
                    extractResults.add(new DataExtractResult(connector.getName(), e));
                } finally {
                    // 无论成功失败,都减少计数器
                    latch.countDown();
                }
            });
        }
        
        try {
            // 设置最大等待时间,防止永久阻塞
            boolean completed = latch.await(30, TimeUnit.MINUTES);
            if (!completed) {
                log.warn("ETL 任务执行超时,已完成{}/{}个数据源处理", 
                        sourceCount - latch.getCount(), sourceCount);
            }
            
            // 执行数据合并处理
            return dataMergeService.mergeAndLoad(extractResults, context);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("ETL 任务被中断", e);
            return new ETLResult(ETLStatus.INTERRUPTED, "任务被中断");
        }
    }
}

实际应用分析

  1. 超时控制:设置了 await 的超时时间,防止因某个数据源处理异常导致永久等待

  2. 结果收集:使用线程安全的集合收集各任务执行结果

  3. 异常处理:在 finally 块中执行 countDown,确保计数器正确递减

  4. 状态汇总:根据任务完成情况,返回相应的处理结果

场景二:CyclicBarrier 在并行计算中的应用

CyclicBarrier 允许一组线程相互等待,直到所有线程都到达屏障点,适用于需要分阶段执行的并行算法。

案例:推荐系统的并行特征计算

在推荐系统中,用户特征计算通常分为多个阶段,每个阶段都依赖上一阶段的结果:

java
/**
 * 用户特征并行计算服务
 */
@Service
public class UserFeatureCalculationService {

    @Resource
    private ThreadPoolExecutor computeThreadPool;
    
    @Resource
    private FeatureRepository featureRepository;
    
    /**
     * 并行计算用户特征
     */
    public UserFeatureModel calculateUserFeatures(String userId, FeatureContext context) {
        int threadCount = 4; // 并行线程数
        
        // 创建循环栅栏,所有线程到达后执行合并操作
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            // 每个阶段结束时执行的操作,如特征归一化
            log.info("所有线程已完成当前阶段计算,执行特征合并");
            context.normalizeFeatures();
        });
        
        // 创建并行计算任务
        List<Future<FeatureSegment>> futures = new ArrayList<>(threadCount);
        
        // 第一阶段:基础特征计算
        for (int i = 0; i < threadCount; i++) {
            final int segmentIndex = i;
            futures.add(computeThreadPool.submit(() -> {
                // 计算基础特征
                FeatureSegment segment = calculateBasicFeatures(userId, segmentIndex, context);
                try {
                    // 等待所有线程完成基础特征计算
                    barrier.await(10, TimeUnit.MINUTES);
                } catch (Exception e) {
                    log.error("等待基础特征计算完成时发生错误", e);
                    throw new FeatureCalculationException("基础特征计算等待失败", e);
                }
                
                // 第二阶段:行为特征计算
                calculateBehaviorFeatures(segment, context);
                try {
                    // 等待所有线程完成行为特征计算
                    barrier.await(10, TimeUnit.MINUTES);
                } catch (Exception e) {
                    log.error("等待行为特征计算完成时发生错误", e);
                    throw new FeatureCalculationException("行为特征计算等待失败", e);
                }
                
                // 第三阶段:社交特征计算
                calculateSocialFeatures(segment, context);
                try {
                    // 等待所有线程完成社交特征计算
                    barrier.await(10, TimeUnit.MINUTES);
                } catch (Exception e) {
                    log.error("等待社交特征计算完成时发生错误", e);
                    throw new FeatureCalculationException("社交特征计算等待失败", e);
                }
                
                return segment;
            }));
        }
        
        // 收集所有计算结果
        List<FeatureSegment> segments = new ArrayList<>(threadCount);
        for (Future<FeatureSegment> future : futures) {
            try {
                segments.add(future.get());
            } catch (Exception e) {
                log.error("获取特征计算结果失败", e);
                throw new FeatureCalculationException("特征计算失败", e);
            }
        }
        
        // 合并最终结果
        return featureRepository.mergeFeatureSegments(segments, context);
    }
    
    // 计算基础特征
    private FeatureSegment calculateBasicFeatures(String userId, int segmentIndex, FeatureContext context) {
        // 实现基础特征计算逻辑
        return new FeatureSegment();
    }
    
    // 计算行为特征
    private void calculateBehaviorFeatures(FeatureSegment segment, FeatureContext context) {
        // 实现行为特征计算逻辑
    }
    
    // 计算社交特征
    private void calculateSocialFeatures(FeatureSegment segment, FeatureContext context) {
        // 实现社交特征计算逻辑
    }
}

实际应用分析

  1. 分段计算:将特征计算任务划分为多个段,利用并行提高计算效率

  2. 阶段同步:使用 CyclicBarrier 确保所有线程完成当前阶段计算后再进入下一阶段

  3. 归一化处理:在栅栏动作中执行特征归一化,保证各阶段特征的一致性

  4. 超时控制:为每个 await 设置超时时间,防止因个别线程问题导致整体计算阻塞

场景三:Semaphore 在资源控制中的应用

Semaphore 用于控制同时访问特定资源的线程数量,在限流、连接池管理等场景中广泛应用。

案例:外部接口调用限流器

在微服务系统中,调用外部 API 时需要控制并发请求数,避免对外部系统造成过大压力:

java
/**
 * 外部 API 接口调用限流器
 */
@Component
public class ExternalApiLimiter {

    // 外部 API 限流配置
    private static class ApiConfig {
        private final String apiName;
        private final int maxConcurrency;
        private final Semaphore semaphore;
        
        public ApiConfig(String apiName, int maxConcurrency) {
            this.apiName = apiName;
            this.maxConcurrency = maxConcurrency;
            this.semaphore = new Semaphore(maxConcurrency, true); // 公平模式
        }
    }
    
    // API 配置映射
    private final Map<String, ApiConfig> apiConfigMap = new ConcurrentHashMap<>();
    
    // 监控指标
    private final Counter rejectedRequests = Metrics.counter("api.rejected.requests");
    private final Counter totalRequests = Metrics.counter("api.total.requests");
    
    /**
     * 初始化 API 配置
     */
    @PostConstruct
    public void init() {
        // 从配置中心获取各 API 的限流配置
        apiConfigMap.put("payment", new ApiConfig("payment", 50));
        apiConfigMap.put("inventory", new ApiConfig("inventory", 100));
        apiConfigMap.put("logistics", new ApiConfig("logistics", 30));
        // 可通过配置中心动态更新
    }
    
    /**
     * 执行受限流控制的 API 调用
     */
    public <T> T executeWithRateLimit(String apiName, Supplier<T> apiCall, long timeout, TimeUnit unit) 
            throws ApiRateLimitException {
        
        totalRequests.increment();
        
        ApiConfig config = apiConfigMap.get(apiName);
        if (config == null) {
            // 未配置的 API 默认使用较小的并发限制
            config = new ApiConfig(apiName, 10);
            apiConfigMap.put(apiName, config);
        }
        
        boolean acquired = false;
        try {
            // 尝试获取许可
            acquired = config.semaphore.tryAcquire(timeout, unit);
            if (!acquired) {
                rejectedRequests.increment();
                throw new ApiRateLimitException("调用 API[" + apiName + "]被限流");
            }
            
            // 执行实际 API 调用
            return apiCall.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ApiRateLimitException("获取 API 调用许可时被中断", e);
        } finally {
            // 释放许可
            if (acquired) {
                config.semaphore.release();
            }
        }
    }
    
    /**
     * 更新 API 限流配置
     */
    public void updateApiConfig(String apiName, int maxConcurrency) {
        ApiConfig oldConfig = apiConfigMap.get(apiName);
        if (oldConfig != null) {
            // 替换为新的配置
            apiConfigMap.put(apiName, new ApiConfig(apiName, maxConcurrency));
        }
    }
}

使用示例:

java
@Service
public class ExternalServiceClient {

    @Resource
    private ExternalApiLimiter apiLimiter;
    
    @Resource
    private RestTemplate restTemplate;
    
    /**
     * 调用支付服务
     */
    public PaymentResult processPayment(PaymentRequest request) {
        try {
            return apiLimiter.executeWithRateLimit(
                "payment",
                () -> restTemplate.postForObject(
                        "https://payment-api.example.com/process",
                        request,
                        PaymentResult.class
                ),
                5, // 等待超时时间
                TimeUnit.SECONDS
            );
        } catch (ApiRateLimitException e) {
            log.warn("支付请求被限流", e);
            throw new ServiceBusyException("支付服务繁忙,请稍后再试");
        }
    }
}

实际应用分析

  1. API 粒度限流:为不同 API 设置不同的并发限制,精细化控制资源

  2. 超时控制:使用 tryAcquire 设置获取许可的超时时间,避免长时间等待

  3. 公平模式:使用公平模式创建 Semaphore,避免线程饥饿

  4. 动态配置:支持运行时动态调整限流参数

  5. 监控指标:记录请求总数和被拒绝的请求数,方便系统监控

并发容器的应用场景

JUC 包提供了丰富的并发容器,如 ConcurrentHashMap、CopyOnWriteArrayList、BlockingQueue 等,这些容器在大厂的高并发场景中有着广泛应用。

场景一:ConcurrentHashMap 在缓存系统中的应用

ConcurrentHashMap 是使用最广泛的并发容器之一,它在高并发读多写少的场景中表现优秀,常用于各类缓存实现。

案例:分布式系统的本地缓存实现

java
/**
 * 基于 ConcurrentHashMap 的本地缓存实现
 */
@Component
public class LocalCacheManager<K, V> {

    // 缓存数据存储
    private final ConcurrentHashMap<K, CacheItem<V>> cacheMap;
    
    // 缓存统计信息
    private final LongAdder hitCount = new LongAdder();
    private final LongAdder missCount = new LongAdder();
    private final LongAdder evictionCount = new LongAdder();
    
    // 缓存清理线程
    private final ScheduledExecutorService cleanerExecutor;
    
    /**
     * 缓存项,包含值和过期时间
     */
    private static class CacheItem<V> {
        private final V value;
        private final long expireTime;
        
        public CacheItem(V value, long ttlMillis) {
            this.value = value;
            this.expireTime = System.currentTimeMillis() + ttlMillis;
        }
        
        public boolean isExpired() {
            return System.currentTimeMillis() > expireTime;
        }
        
        public V getValue() {
            return value;
        }
    }
    
    /**
     * 构造函数
     * @param initialCapacity 初始容量
     * @param cleanInterval 清理间隔(秒)
     */
    public LocalCacheManager(int initialCapacity, int cleanInterval) {
        this.cacheMap = new ConcurrentHashMap<>(initialCapacity);
        this.cleanerExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactoryBuilder()
                        .setNameFormat("cache-cleaner-%d")
                        .setDaemon(true)
                        .build());
                        
        // 启动定期清理任务
        this.cleanerExecutor.scheduleAtFixedRate(
                this::cleanExpiredItems,
                cleanInterval,
                cleanInterval,
                TimeUnit.SECONDS);
    }
    
    /**
     * 放入缓存项
     * @param key
     * @param value
     * @param ttlMillis 存活时间(毫秒)
     */
    public void put(K key, V value, long ttlMillis) {
        if (key == null || value == null) {
            throw new IllegalArgumentException("缓存的键和值不能为 null");
        }
        
        cacheMap.put(key, new CacheItem<>(value, ttlMillis));
    }
    
    /**
     * 获取缓存项
     * @param key
     * @return 缓存的值,如果不存在或已过期则返回 null
     */
    public V get(K key) {
        if (key == null) {
            return null;
        }
        
        CacheItem<V> item = cacheMap.get(key);
        
        // 缓存未命中
        if (item == null) {
            missCount.increment();
            return null;
        }
        
        // 检查是否过期
        if (item.isExpired()) {
            // 惰性删除过期项
            cacheMap.remove(key);
            missCount.increment();
            return null;
        }
        
        // 缓存命中
        hitCount.increment();
        return item.getValue();
    }
    
    /**
     * 获取缓存项,如果不存在则通过指定函数加载并缓存
     */
    public V getOrLoad(K key, Function<K, V> loadFunction, long ttlMillis) {
        V value = get(key);
        if (value != null) {
            return value;
        }
        
        // 缓存未命中,需要加载
        // 使用 computeIfAbsent 保证并发情况下只有一个线程会执行加载
        CacheItem<V> computedItem = cacheMap.computeIfAbsent(key, k -> {
            V newValue = loadFunction.apply(k);
            if (newValue == null) {
                // 不缓存 null 值
                return null;
            }
            return new CacheItem<>(newValue, ttlMillis);
        });
        
        // 如果计算结果为 null,表示加载函数返回 null
        return computedItem != null ? computedItem.getValue() : null;
    }
    
    /**
     * 移除缓存项
     */
    public void remove(K key) {
        cacheMap.remove(key);
    }
    
    /**
     * 清空缓存
     */
    public void clear() {
        cacheMap.clear();
    }
    
    /**
     * 获取缓存大小
     */
    public int size() {
        return cacheMap.size();
    }
    
    /**
     * 清理过期项
     */
    private void cleanExpiredItems() {
        try {
            long now = System.currentTimeMillis();
            int cleanedCount = 0;
            
            // 遍历所有缓存项,清理过期的
            for (Iterator<Map.Entry<K, CacheItem<V>>> iterator = cacheMap.entrySet().iterator(); 
                    iterator.hasNext();) {
                Map.Entry<K, CacheItem<V>> entry = iterator.next();
                if (entry.getValue().expireTime < now) {
                    iterator.remove();
                    cleanedCount++;
                }
            }
            
            if (cleanedCount > 0) {
                evictionCount.add(cleanedCount);
                log.debug("已清理 {} 个过期缓存项", cleanedCount);
            }
        } catch (Exception e) {
            log.error("清理过期缓存项时发生错误", e);
        }
    }
    
    /**
     * 获取缓存统计信息
     */
    public Map<String, Long> getStats() {
        Map<String, Long> stats = new HashMap<>();
        stats.put("size", (long) cacheMap.size());
        stats.put("hitCount", hitCount.sum());
        stats.put("missCount", missCount.sum());
        stats.put("evictionCount", evictionCount.sum());
        return stats;
    }
    
    /**
     * 关闭缓存,停止清理线程
     */
    public void shutdown() {
        cleanerExecutor.shutdown();
    }
}

使用示例:

java
@Service
public class ProductService {

    @Resource
    private LocalCacheManager<Long, ProductInfo> productCache;
    
    @Resource
    private ProductRepository productRepository;
    
    /**
     * 获取商品信息
     */
    public ProductInfo getProductInfo(Long productId) {
        // 首先尝试从缓存获取,如果不存在则从数据库加载
        return productCache.getOrLoad(
                productId,
                this::loadProductFromDb,
                TimeUnit.MINUTES.toMillis(10) // 缓存 10 分钟
        );
    }
    
    /**
     * 从数据库加载商品信息
     */
    private ProductInfo loadProductFromDb(Long productId) {
        log.info("从数据库加载商品信息: {}", productId);
        return productRepository.findById(productId)
                .orElse(null);
    }
    
    /**
     * 更新商品信息
     */
    @Transactional
    public void updateProduct(ProductInfo product) {
        // 更新数据库
        productRepository.save(product);
        // 更新缓存或使缓存失效
        productCache.put(product.getId(), product, TimeUnit.MINUTES.toMillis(10));
    }
}

实际应用分析

  1. 高性能读取:ConcurrentHashMap 适合读多写少的场景,提供了近乎无锁的读取性能

  2. 原子操作:利用 computeIfAbsent 实现原子的"检查并计算",避免缓存击穿问题

  3. 过期清理策略:结合定时清理和惰性清理,平衡 CPU 和内存资源

  4. 统计指标:使用 LongAdder 收集命中率等指标,最小化内存占用和性能影响

  5. 缓存一致性:通过更新或失效策略保持缓存与数据源的一致性

场景二:BlockingQueue 在生产者-消费者模式中的应用

BlockingQueue 是实现生产者-消费者模式的理想选择,在任务队列、消息缓冲等场景中广泛应用。

案例:日志异步处理系统

java
/**
 * 异步日志处理系统
 */
@Component
public class AsyncLogProcessor {

    // 日志处理队列
    private final BlockingQueue<LogEntry> logQueue;
    
    // 消费者线程池
    private final ExecutorService consumerPool;
    
    // 是否正在运行
    private volatile boolean running = true;
    
    /**
     * 日志条目
     */
    @Data
    @Builder
    public static class LogEntry {
        private String traceId;
        private String serviceId;
        private Level level;
        private String message;
        private Map<String, String> context;
        private long timestamp;
        private String threadName;
    }
    
    /**
     * 日志处理器接口
     */
    public interface LogHandler {
        void handleLog(List<LogEntry> logBatch);
    }
    
    @Resource
    private List<LogHandler> logHandlers;
    
    /**
     * 构造函数
     * @param queueCapacity 队列容量
     * @param consumerCount 消费者线程数
     */
    public AsyncLogProcessor(int queueCapacity, int consumerCount) {
        // 使用LinkedBlockingQueue作为日志队列
        this.logQueue = new LinkedBlockingQueue<>(queueCapacity);
        
        // 创建消费者线程池
        this.consumerPool = Executors.newFixedThreadPool(
                consumerCount,
                new ThreadFactoryBuilder()
                        .setNameFormat("log-consumer-%d")
                        .setDaemon(true)
                        .build());
                        
        // 启动消费者线程
        for (int i = 0; i < consumerCount; i++) {
            consumerPool.submit(this::consumeLogs);
        }
    }
    
    /**
     * 提交日志条目
     * @param logEntry 日志条目
     * @return 是否提交成功
     */
    public boolean submit(LogEntry logEntry) {
        if (!running) {
            return false;
        }
        
        try {
            // 尝试将日志放入队列,超时则放弃
            return logQueue.offer(logEntry, 100, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
    
    /**
     * 消费日志任务
     */
    private void consumeLogs() {
        final int batchSize = 100; // 批处理大小
        final long maxWaitTime = 200; // 最大等待时间(毫秒)
        
        List<LogEntry> logBatch = new ArrayList<>(batchSize);
        
        while (running) {
            try {
                // 获取第一个日志条目,可能会阻塞
                LogEntry firstLog = logQueue.poll(maxWaitTime, TimeUnit.MILLISECONDS);
                if (firstLog == null) {
                    // 超时未获取到日志,继续下一轮
                    continue;
                }
                
                // 添加到批处理集合
                logBatch.add(firstLog);
                
                // 尝试非阻塞地获取更多日志,直到达到批处理大小
                logQueue.drainTo(logBatch, batchSize - 1);
                
                // 处理日志批次
                processBatch(logBatch);
                
                // 清空当前批次
                logBatch.clear();
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.warn("日志消费线程被中断");
                break;
            } catch (Exception e) {
                log.error("处理日志批次时发生错误", e);
                // 继续处理下一批
            }
        }
    }
    
    /**
     * 处理日志批次
     */
    private void processBatch(List<LogEntry> logBatch) {
        if (logBatch.isEmpty()) {
            return;
        }
        
        // 调用所有注册的日志处理器
        for (LogHandler handler : logHandlers) {
            try {
                handler.handleLog(logBatch);
            } catch (Exception e) {
                log.error("日志处理器[{}]处理失败", handler.getClass().getSimpleName(), e);
                // 继续执行其他处理器
            }
        }
    }
    
    /**
     * 关闭处理器
     */
    public void shutdown() {
        running = false;
        
        // 停止接收新的日志
        consumerPool.shutdown();
        
        try {
            // 等待处理完当前日志
            if (!consumerPool.awaitTermination(30, TimeUnit.SECONDS)) {
                consumerPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            consumerPool.shutdownNow();
        }
    }
    
    /**
     * 获取队列状态
     */
    public Map<String, Integer> getQueueStats() {
        Map<String, Integer> stats = new HashMap<>();
        stats.put("queueSize", logQueue.size());
        stats.put("remainingCapacity", logQueue.remainingCapacity());
        return stats;
    }
}

日志处理器实现示例:

java
/**
 * ElasticSearch 日志处理器
 */
@Component
public class ElasticsearchLogHandler implements AsyncLogProcessor.LogHandler {

    @Resource
    private ElasticsearchClient esClient;
    
    /**
     * 处理日志批次
     */
    @Override
    public void handleLog(List<AsyncLogProcessor.LogEntry> logBatch) {
        if (logBatch.isEmpty()) {
            return;
        }
        
        // 转换为ES批量操作
        BulkRequest.Builder bulkRequest = new BulkRequest.Builder();
        
        for (AsyncLogProcessor.LogEntry log : logBatch) {
            // 根据日期和服务确定索引名
            String indexName = "logs-" + log.getServiceId() + "-" + formatDate(log.getTimestamp());
            
            // 添加到批量请求
            bulkRequest.operations(op -> op
                    .index(idx -> idx
                            .index(indexName)
                            .document(log)
                    )
            );
        }
        
        try {
            // 执行批量写入
            BulkResponse response = esClient.bulk(bulkRequest.build());
            if (response.errors()) {
                // 处理写入错误
                for (BulkResponseItem item : response.items()) {
                    if (item.error() != null) {
                        log.error("日志写入ES失败: {}", item.error().reason());
                    }
                }
            }
        } catch (Exception e) {
            log.error("批量写入ES失败", e);
            // 可以考虑重试或写入备用存储
        }
    }
    
    private String formatDate(long timestamp) {
        return DateTimeFormatter.ofPattern("yyyy-MM-dd")
                .format(Instant.ofEpochMilli(timestamp)
                        .atZone(ZoneId.systemDefault())
                        .toLocalDate());
    }
}

实际应用分析

  1. 解耦与缓冲:使用BlockingQueue解耦日志生产和消费,缓冲突发日志

  2. 批量处理:通过drainTo方法高效收集批量日志,减少I/O操作频率

  3. 多消费者模式:使用线程池并行处理日志,提高吞吐量

  4. 可靠性设计

    • 生产者超时提交,避免阻塞业务线程

    • 消费者异常隔离,单个处理器失败不影响整体

    • 优雅关闭,确保已提交日志得到处理

  5. 监控指标:提供队列大小和剩余容量指标,便于监控系统状态

场景三:CopyOnWriteArrayList 在配置管理中的应用

CopyOnWriteArrayList 适用于读多写少的场景,特别是对实时性要求不高的配置管理、监听器列表等场景。

案例:动态配置中心的本地订阅者管理

java
/**
 * 配置中心客户端
 */
@Component
public class ConfigCenterClient {

    /**
     * 配置变更监听器
     */
    public interface ConfigChangeListener {
        /**
         * 配置变更回调
         * @param key 配置键
         * @param oldValue 旧值
         * @param newValue 新值
         */
        void onConfigChanged(String key, String oldValue, String newValue);
    }
    
    // 当前配置缓存
    private final ConcurrentHashMap<String, String> configCache = new ConcurrentHashMap<>();
    
    // 监听器映射,key 为配置键,value 为该配置的所有监听器
    private final ConcurrentHashMap<String, CopyOnWriteArrayList<ConfigChangeListener>> listeners = 
            new ConcurrentHashMap<>();
    
    // 配置中心客户端
    @Resource
    private ConfigCenterApiClient apiClient;
    
    /**
     * 初始化客户端
     */
    @PostConstruct
    public void init() {
        // 拉取初始配置
        refreshConfigs();
        
        // 启动长轮询或 WebSocket 连接监听配置变更
        startConfigChangeListener();
    }
    
    /**
     * 添加配置变更监听器
     * @param key 配置键
     * @param listener 监听器
     */
    public void addListener(String key, ConfigChangeListener listener) {
        // 获取指定key的监听器列表,如果不存在则创建
        listeners.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>())
                .addIfAbsent(listener);
    }
    
    /**
     * 移除配置变更监听器
     */
    public void removeListener(String key, ConfigChangeListener listener) {
        CopyOnWriteArrayList<ConfigChangeListener> keyListeners = listeners.get(key);
        if (keyListeners != null) {
            keyListeners.remove(listener);
        }
    }
    
    /**
     * 获取配置值
     */
    public String getConfig(String key, String defaultValue) {
        return configCache.getOrDefault(key, defaultValue);
    }
    
    /**
     * 获取整型配置
     */
    public int getIntConfig(String key, int defaultValue) {
        String value = configCache.get(key);
        if (value == null) {
            return defaultValue;
        }
        
        try {
            return Integer.parseInt(value);
        } catch (NumberFormatException e) {
            log.warn("配置[{}]的值[{}]不是有效的整数,使用默认值{}", key, value, defaultValue);
            return defaultValue;
        }
    }
    
    /**
     * 刷新配置
     */
    public void refreshConfigs() {
        try {
            // 调用API获取最新配置
            Map<String, String> latestConfigs = apiClient.getAllConfigs();
            
            // 更新本地缓存并触发变更事件
            updateConfigs(latestConfigs);
        } catch (Exception e) {
            log.error("刷新配置失败", e);
        }
    }
    
    /**
     * 启动配置变更监听
     */
    private void startConfigChangeListener() {
        // 使用长轮询或WebSocket监听配置变更
        // 实现省略...
    }
    
    /**
     * 处理接收到的配置变更
     */
    public void handleConfigChanges(Map<String, String> changedConfigs) {
        updateConfigs(changedConfigs);
    }
    
    /**
     * 更新配置并触发监听器
     */
    private void updateConfigs(Map<String, String> newConfigs) {
        for (Map.Entry<String, String> entry : newConfigs.entrySet()) {
            String key = entry.getKey();
            String newValue = entry.getValue();
            String oldValue = configCache.put(key, newValue);
            
            // 如果值已变更,触发监听器
            if (!Objects.equals(oldValue, newValue)) {
                notifyListeners(key, oldValue, newValue);
            }
        }
    }
    
    /**
     * 通知监听器
     */
    private void notifyListeners(String key, String oldValue, String newValue) {
        // 获取该配置的所有监听器
        CopyOnWriteArrayList<ConfigChangeListener> keyListeners = listeners.get(key);
        if (keyListeners != null && !keyListeners.isEmpty()) {
            for (ConfigChangeListener listener : keyListeners) {
                try {
                    // 异步通知,避免监听器执行耗时操作阻塞
                    CompletableFuture.runAsync(() -> {
                        try {
                            listener.onConfigChanged(key, oldValue, newValue);
                        } catch (Exception e) {
                            log.error("配置变更监听器执行异常", e);
                        }
                    });
                } catch (Exception e) {
                    log.error("触发配置变更监听器异常", e);
                }
            }
        }
    }
}

使用示例:

java
@Service
public class UserServiceImpl implements UserService {

    @Resource
    private ConfigCenterClient configClient;
    
    // 用户缓存过期时间(默认5分钟)
    private volatile int userCacheExpireMinutes = 5;
    
    @PostConstruct
    public void init() {
        // 初始化配置值
        userCacheExpireMinutes = configClient.getIntConfig("user.cache.expireMinutes", 5);
        
        // 监听配置变更
        configClient.addListener("user.cache.expireMinutes", 
                (key, oldValue, newValue) -> {
                    try {
                        int newExpireMinutes = Integer.parseInt(newValue);
                        log.info("用户缓存过期时间已更新: {} -> {}", oldValue, newValue);
                        userCacheExpireMinutes = newExpireMinutes;
                        // 可以在这里刷新现有缓存的过期时间
                    } catch (NumberFormatException e) {
                        log.error("无效的缓存过期时间配置: {}", newValue);
                    }
                });
    }
    
    @Override
    public UserInfo getUserInfo(Long userId) {
        // 使用当前的缓存过期时间
        return userCache.getOrLoad(userId, 
                this::loadUserFromDb, 
                TimeUnit.MINUTES.toMillis(userCacheExpireMinutes));
    }
    
    private UserInfo loadUserFromDb(Long userId) {
        // 从数据库加载用户信息
        return userRepository.findById(userId).orElse(null);
    }
}

实际应用分析

  1. 读写分离:CopyOnWriteArrayList适合读多写少的场景,写入时复制整个数组,不影响读操作

  2. 线程安全迭代:无需额外同步,迭代器不会抛出ConcurrentModificationException

  3. 动态配置:可在运行时添加/移除监听器,无需重启应用

  4. 异步通知:监听器回调使用CompletableFuture异步执行,避免阻塞关键线程

  5. 配置热更新:实现了配置的动态更新和实时生效

原子操作类的应用场景

原子操作类(Atomic 包)提供了在单个变量上执行原子操作的工具,包括基本类型、数组元素和引用等。这些类基于底层的 CAS(Compare-And-Swap)机制,避免了锁的开销,在高并发场景下有着广泛应用。

场景一:高性能计数器

在大型系统中,计数器是一个常见需求,如统计 API 调用次数、记录用户行为、流量监控等。使用原子类可以实现高性能的计数器。

案例:电商平台的实时访问统计

java
/**
 * 实时访问量统计服务
 */
@Service
public class RealTimeStatisticsService {

    // 常规访问量计数器
    private final ConcurrentHashMap<String, LongAdder> pageViewCounters = new ConcurrentHashMap<>();
    
    // 独立访客计数器(按页面)
    private final ConcurrentHashMap<String, LongAdder> uniqueVisitorCounters = new ConcurrentHashMap<>();
    
    // 已访问用户集合(按页面)
    private final ConcurrentHashMap<String, Set<String>> visitedUsers = new ConcurrentHashMap<>();
    
    // 总订单金额统计
    private final LongAdder totalOrderAmount = new LongAdder();
    
    // 订单数统计
    private final LongAdder totalOrderCount = new LongAdder();
    
    /**
     * 记录页面访问
     * 
     * @param pageKey 页面标识
     * @param userId 用户ID,可能为null(未登录用户)
     * @param visitorId 访客ID(如Cookie标识)
     */
    public void recordPageView(String pageKey, String userId, String visitorId) {
        // 增加页面访问计数
        pageViewCounters.computeIfAbsent(pageKey, k -> new LongAdder()).increment();
        
        // 使用 visitorId 计算独立访客
        if (visitorId != null) {
            String visitorKey = pageKey + ":" + visitorId;
            Set<String> visitors = visitedUsers.computeIfAbsent(pageKey, 
                    k -> ConcurrentHashMap.newKeySet());
                    
            // 如果是新访客,增加独立访客计数
            if (visitors.add(visitorId)) {
                uniqueVisitorCounters.computeIfAbsent(pageKey, k -> new LongAdder()).increment();
            }
        }
        
        // 其他统计逻辑...
    }
    
    /**
     * 记录订单金额
     * @param orderAmount 订单金额(分)
     */
    public void recordOrder(long orderAmount) {
        // 增加订单数
        totalOrderCount.increment();
        
        // 增加订单总金额
        totalOrderAmount.add(orderAmount);
    }
    
    /**
     * 获取页面访问统计
     */
    public Map<String, Long> getPageViewStats() {
        Map<String, Long> result = new HashMap<>();
        pageViewCounters.forEach((key, counter) -> result.put(key, counter.sum()));
        return result;
    }
    
    /**
     * 获取页面独立访客统计
     */
    public Map<String, Long> getUniqueVisitorStats() {
        Map<String, Long> result = new HashMap<>();
        uniqueVisitorCounters.forEach((key, counter) -> result.put(key, counter.sum()));
        return result;
    }
    
    /**
     * 获取订单统计
     */
    public OrderStats getOrderStats() {
        return new OrderStats(
                totalOrderCount.sum(),
                totalOrderAmount.sum()
        );
    }
    
    /**
     * 重置统计数据(如每日统计重置)
     */
    public void resetStatistics() {
        pageViewCounters.clear();
        uniqueVisitorCounters.clear();
        visitedUsers.clear();
        
        // 重置累加器
        while (totalOrderAmount.sum() > 0) {
            totalOrderAmount.add(-totalOrderAmount.sum());
        }
        while (totalOrderCount.sum() > 0) {
            totalOrderCount.add(-totalOrderCount.sum());
        }
    }
    
    @Data
    @AllArgsConstructor
    public static class OrderStats {
        private long orderCount;
        private long orderAmount;
    }
}

实际应用分析

  1. 高并发支持:LongAdder 专为高并发计数场景设计,内部使用分段计数减小竞争

  2. 低开销:相比 AtomicLong,在高并发下 LongAdder 有更低的竞争开销

  3. 组合应用:结合 ConcurrentHashMap 实现多维度统计

  4. 灵活统计:支持增量计数和精确读取,满足实时监控需求

  5. 重置功能:支持统计周期性重置,适用于按天/小时统计场景

场景二:AtomicReference 在无锁数据结构中的应用

AtomicReference 提供了对引用类型的原子操作,常用于实现无锁数据结构,提高并发性能。

案例:高性能库存预占系统

在电商秒杀场景中,库存抢占是一个典型的高并发场景。使用 AtomicReference 可以实现无锁的库存预占机制。

java
/**
 * 高性能商品库存服务
 */
@Service
public class StockService {

    /**
     * 库存状态类,包含可用库存和已预占库存
     */
    @Data
    @AllArgsConstructor
    private static class StockState {
        // 可用库存
        private int availableStock;
        // 已预占库存
        private int reservedStock;
        // 版本号,用于乐观锁
        private long version;
        
        // 创建新的库存状态
        public StockState createNext(int availableDelta, int reservedDelta) {
            return new StockState(
                    availableStock + availableDelta,
                    reservedStock + reservedDelta,
                    version + 1
            );
        }
    }
    
    // 商品库存映射,商品ID -> 库存状态
    private final ConcurrentHashMap<Long, AtomicReference<StockState>> stockCache = 
            new ConcurrentHashMap<>();
    
    @Resource
    private StockRepository stockRepository;
    
    /**
     * 初始化商品库存
     */
    @PostConstruct
    public void init() {
        // 从数据库加载初始库存
        List<ProductStock> stocks = stockRepository.findAll();
        for (ProductStock stock : stocks) {
            StockState state = new StockState(
                    stock.getAvailableStock(),
                    stock.getReservedStock(),
                    0
            );
            stockCache.put(stock.getProductId(), new AtomicReference<>(state));
        }
    }
    
    /**
     * 尝试预占库存
     * @param productId 商品ID
     * @param quantity 预占数量
     * @return 是否预占成功
     */
    public boolean tryReserveStock(Long productId, int quantity) {
        // 获取商品库存引用
        AtomicReference<StockState> reference = stockCache.get(productId);
        if (reference == null) {
            throw new IllegalArgumentException("商品不存在: " + productId);
        }
        
        // 使用CAS操作尝试预占库存
        boolean reserved = false;
        int attempts = 0;
        int maxAttempts = 5; // 最大重试次数
        
        while (!reserved && attempts < maxAttempts) {
            StockState current = reference.get();
            
            // 检查库存是否充足
            if (current.availableStock < quantity) {
                log.warn("商品{}库存不足,当前可用{},需要{}", 
                        productId, current.availableStock, quantity);
                return false;
            }
            
            // 创建新的库存状态
            StockState next = current.createNext(-quantity, quantity);
            
            // 尝试原子更新
            reserved = reference.compareAndSet(current, next);
            attempts++;
            
            if (!reserved && attempts < maxAttempts) {
                // 短暂等待后重试
                LockSupport.parkNanos(10);
            }
        }
        
        if (reserved) {
            // 异步更新数据库
            updateStockAsync(productId, reference.get());
        } else {
            log.warn("商品{}库存预占失败,已重试{}次", productId, attempts);
        }
        
        return reserved;
    }
    
    /**
     * 释放预占库存
     */
    public boolean releaseReservedStock(Long productId, int quantity) {
        AtomicReference<StockState> reference = stockCache.get(productId);
        if (reference == null) {
            throw new IllegalArgumentException("商品不存在: " + productId);
        }
        
        boolean released = false;
        int attempts = 0;
        int maxAttempts = 5;
        
        while (!released && attempts < maxAttempts) {
            StockState current = reference.get();
            
            // 检查预占库存是否足够
            if (current.reservedStock < quantity) {
                log.warn("商品{}预占库存不足,当前已预占{},需要释放{}", 
                        productId, current.reservedStock, quantity);
                return false;
            }
            
            // 创建新的库存状态
            StockState next = current.createNext(quantity, -quantity);
            
            // 尝试原子更新
            released = reference.compareAndSet(current, next);
            attempts++;
            
            if (!released && attempts < maxAttempts) {
                LockSupport.parkNanos(10);
            }
        }
        
        if (released) {
            // 异步更新数据库
            updateStockAsync(productId, reference.get());
        }
        
        return released;
    }
    
    /**
     * 获取当前商品库存状态
     */
    public StockInfo getStockInfo(Long productId) {
        AtomicReference<StockState> reference = stockCache.get(productId);
        if (reference == null) {
            throw new IllegalArgumentException("商品不存在: " + productId);
        }
        
        StockState state = reference.get();
        return new StockInfo(
                productId,
                state.availableStock,
                state.reservedStock,
                state.availableStock + state.reservedStock
        );
    }
    
    /**
     * 异步更新数据库库存
     */
    private void updateStockAsync(Long productId, StockState state) {
        CompletableFuture.runAsync(() -> {
            try {
                // 乐观锁更新数据库
                int updated = stockRepository.updateStock(
                        productId, 
                        state.availableStock, 
                        state.reservedStock,
                        state.version);
                
                if (updated == 0) {
                    log.warn("数据库库存更新失败,可能发生并发冲突,商品ID: {}", productId);
                }
            } catch (Exception e) {
                log.error("更新商品{}库存失败", productId, e);
            }
        });
    }
    
    @Data
    @AllArgsConstructor
    public static class StockInfo {
        private Long productId;
        private int availableStock;
        private int reservedStock;
        private int totalStock;
    }
}

实际应用分析

  1. 无锁设计:使用 CAS 操作代替传统锁,减少线程阻塞

  2. 乐观并发控制:采用乐观锁思想,在冲突较少的情况下性能更佳

  3. 内存与数据库分离:内存操作与数据库更新解耦,提高性能

  4. 异步持久化:库存变更异步写入数据库,减少关键路径延迟

  5. 有限重试:设置最大重试次数,避免活锁

场景三:AtomicStampedReference 解决 ABA 问题

在使用 CAS 操作时,可能遇到 ABA 问题(即值从 A 变为 B,又变回 A,导致 CAS 操作误判为没有变化)。AtomicStampedReference 通过引入版本号解决这个问题。

案例:分布式节点状态管理器

在分布式系统中,需要维护集群节点的状态,并且需要避免由于网络延迟等因素导致的错误状态判断。

java
/**
 * 分布式集群节点状态管理器
 */
@Component
public class ClusterNodeManager {

    /**
     * 节点状态枚举
     */
    public enum NodeStatus {
        ONLINE,       // 在线
        OFFLINE,      // 离线
        SUSPECTED,    // 疑似故障
        MAINTAINING;  // 维护中
    }
    
    /**
     * 节点状态类
     */
    @Data
    @AllArgsConstructor
    private static class NodeState {
        private NodeStatus status;
        private long lastHeartbeatTime;
        
        @Override
        public String toString() {
            return status + "@" + lastHeartbeatTime;
        }
    }
    
    // 节点状态映射,key为节点ID
    private final ConcurrentHashMap<String, AtomicStampedReference<NodeState>> nodeStates = 
            new ConcurrentHashMap<>();
    
    // 心跳超时时间(毫秒)
    private static final long HEARTBEAT_TIMEOUT = 30000;
    
    /**
     * 节点注册
     */
    public void registerNode(String nodeId) {
        NodeState initialState = new NodeState(NodeStatus.ONLINE, System.currentTimeMillis());
        // 初始版本号为0
        nodeStates.put(nodeId, new AtomicStampedReference<>(initialState, 0));
        log.info("节点已注册: {}, 状态: {}", nodeId, initialState);
    }
    
    /**
     * 更新节点心跳
     */
    public boolean updateHeartbeat(String nodeId, long currentTimestamp) {
        AtomicStampedReference<NodeState> reference = nodeStates.get(nodeId);
        if (reference == null) {
            log.warn("未知节点心跳更新: {}", nodeId);
            return false;
        }
        
        int maxAttempts = 3;
        for (int i = 0; i < maxAttempts; i++) {
            NodeState currentState = reference.getReference();
            int currentStamp = reference.getStamp();
            
            // 已离线节点不接受心跳更新
            if (currentState.status == NodeStatus.OFFLINE) {
                log.warn("离线节点心跳更新被拒绝: {}", nodeId);
                return false;
            }
            
            // 忽略过期的心跳
            if (currentTimestamp < currentState.lastHeartbeatTime) {
                log.debug("忽略过期心跳: {}, current={}, received={}", 
                        nodeId, currentState.lastHeartbeatTime, currentTimestamp);
                return false;
            }
            
            // 创建新状态
            NodeState newState = new NodeState(
                    // 如果节点之前是疑似故障状态,恢复为在线状态
                    currentState.status == NodeStatus.SUSPECTED 
                            ? NodeStatus.ONLINE : currentState.status,
                    currentTimestamp
            );
            
            // 使用CAS更新,带版本戳
            if (reference.compareAndSet(currentState, newState, currentStamp, currentStamp + 1)) {
                log.debug("节点{}心跳已更新: {} -> {}, stamp: {}", 
                        nodeId, currentState, newState, currentStamp + 1);
                return true;
            }
            
            // CAS失败,稍后重试
            LockSupport.parkNanos(100);
        }
        
        log.warn("节点{}心跳更新失败,已达到最大重试次数", nodeId);
        return false;
    }
    
    /**
     * 将节点标记为疑似故障
     */
    public boolean markNodeSuspected(String nodeId) {
        return updateNodeStatus(nodeId, NodeStatus.SUSPECTED);
    }
    
    /**
     * 将节点标记为维护中
     */
    public boolean markNodeMaintaining(String nodeId) {
        return updateNodeStatus(nodeId, NodeStatus.MAINTAINING);
    }
    
    /**
     * 将节点标记为离线
     */
    public boolean markNodeOffline(String nodeId) {
        return updateNodeStatus(nodeId, NodeStatus.OFFLINE);
    }
    
    /**
     * 更新节点状态
     */
    private boolean updateNodeStatus(String nodeId, NodeStatus newStatus) {
        AtomicStampedReference<NodeState> reference = nodeStates.get(nodeId);
        if (reference == null) {
            log.warn("未知节点状态更新: {}", nodeId);
            return false;
        }
        
        int maxAttempts = 3;
        for (int i = 0; i < maxAttempts; i++) {
            NodeState currentState = reference.getReference();
            int currentStamp = reference.getStamp();
            
            // 已离线节点状态不可更改
            if (currentState.status == NodeStatus.OFFLINE && newStatus != NodeStatus.OFFLINE) {
                log.warn("离线节点状态更新被拒绝: {}", nodeId);
                return false;
            }
            
            // 创建新状态
            NodeState newState = new NodeState(newStatus, currentState.lastHeartbeatTime);
            
            // 使用CAS更新,带版本戳
            if (reference.compareAndSet(currentState, newState, currentStamp, currentStamp + 1)) {
                log.info("节点{}状态已更新: {} -> {}, stamp: {}", 
                        nodeId, currentState.status, newStatus, currentStamp + 1);
                return true;
            }
            
            // CAS失败,稍后重试
            LockSupport.parkNanos(100);
        }
        
        log.warn("节点{}状态更新失败,已达到最大重试次数", nodeId);
        return false;
    }
    
    /**
     * 检查节点状态
     */
    public NodeStatusInfo getNodeStatus(String nodeId) {
        AtomicStampedReference<NodeState> reference = nodeStates.get(nodeId);
        if (reference == null) {
            return null;
        }
        
        // 获取当前状态和版本戳
        NodeState state = reference.getReference();
        int stamp = reference.getStamp();
        
        return new NodeStatusInfo(
                nodeId,
                state.status,
                state.lastHeartbeatTime,
                stamp,
                System.currentTimeMillis() - state.lastHeartbeatTime > HEARTBEAT_TIMEOUT
        );
    }
    
    /**
     * 定期检查节点心跳,将超时节点标记为疑似故障
     */
    @Scheduled(fixedRate = 10000) // 每10秒检查一次
    public void checkNodeHeartbeats() {
        long now = System.currentTimeMillis();
        
        for (Map.Entry<String, AtomicStampedReference<NodeState>> entry : nodeStates.entrySet()) {
            String nodeId = entry.getKey();
            AtomicStampedReference<NodeState> reference = entry.getValue();
            
            NodeState state = reference.getReference();
            
            // 检查是否超时
            if (state.status == NodeStatus.ONLINE && 
                    now - state.lastHeartbeatTime > HEARTBEAT_TIMEOUT) {
                
                // 标记为疑似故障
                if (markNodeSuspected(nodeId)) {
                    log.warn("节点{}心跳超时,已标记为疑似故障", nodeId);
                    
                    // 触发故障检测或告警
                    triggerNodeFailureDetection(nodeId);
                }
            }
        }
    }
    
    /**
     * 触发节点故障检测
     */
    private void triggerNodeFailureDetection(String nodeId) {
        // 实现故障检测逻辑,如发送告警等
    }
    
    @Data
    @AllArgsConstructor
    public static class NodeStatusInfo {
        private String nodeId;
        private NodeStatus status;
        private long lastHeartbeatTime;
        private int versionStamp;
        private boolean heartbeatTimeout;
    }
}

实际应用分析

  1. 版本戳控制:使用 AtomicStampedReference 的版本戳解决 ABA 问题

  2. 心跳机制:通过心跳更新和超时检测维护节点状态

  3. 可靠状态转换:防止非法的状态转换,如离线节点被误标记为在线

  4. 乐观并发控制:使用 CAS 操作进行无锁更新

  5. 透明版本追踪:通过版本戳追踪状态变更历史

锁机制的应用场景

JUC 提供了比 synchronized 更加灵活的锁机制,如 ReentrantLock、ReadWriteLock 和 StampedLock 等。这些锁在功能、性能特性上各有优势,在大厂的复杂业务场景中有着广泛应用。

场景一:ReentrantLock 实现细粒度锁

ReentrantLock 相比 synchronized 提供了更多的灵活性,包括尝试获取锁、可中断锁等特性,常用于需要精细控制的场景。

案例:余额支付系统的并发控制

在金融系统中,账户余额操作需要严格的并发控制,确保数据一致性。

java
/**
 * 账户余额服务
 */
@Service
public class AccountBalanceService {

    /**
     * 账户余额锁映射
     * 使用 ConcurrentHashMap 存储每个账户的锁对象
     */
    private final ConcurrentHashMap<Long, ReentrantLock> accountLocks = new ConcurrentHashMap<>();
    
    /**
     * 账户余额缓存
     */
    private final ConcurrentHashMap<Long, BigDecimal> balanceCache = new ConcurrentHashMap<>();
    
    @Resource
    private AccountRepository accountRepository;
    
    @Resource
    private TransactionLogRepository transactionLogRepository;
    
    /**
     * 获取指定账户的锁
     */
    private ReentrantLock getAccountLock(Long accountId) {
        return accountLocks.computeIfAbsent(accountId, k -> new ReentrantLock(true));
    }
    
    /**
     * 充值金额
     * @param accountId 账户ID
     * @param amount 充值金额
     * @param transactionId 交易ID
     * @return 操作结果
     */
    public OperationResult deposit(Long accountId, BigDecimal amount, String transactionId) {
        if (amount.compareTo(BigDecimal.ZERO) <= 0) {
            return OperationResult.fail("充值金额必须大于0");
        }
        
        // 获取账户锁
        ReentrantLock lock = getAccountLock(accountId);
        
        try {
            // 尝试获取锁,设置超时时间
            if (!lock.tryLock(5, TimeUnit.SECONDS)) {
                return OperationResult.fail("账户操作繁忙,请稍后再试");
            }
            
            try {
                // 检查交易ID是否重复
                if (transactionLogRepository.existsByTransactionId(transactionId)) {
                    return OperationResult.fail("交易已存在,请勿重复提交");
                }
                
                // 读取当前余额
                BigDecimal currentBalance = getBalance(accountId);
                
                // 计算新余额
                BigDecimal newBalance = currentBalance.add(amount);
                
                // 执行更新操作
                int updated = accountRepository.updateBalance(accountId, newBalance);
                if (updated != 1) {
                    return OperationResult.fail("余额更新失败,账户可能不存在");
                }
                
                // 更新缓存
                balanceCache.put(accountId, newBalance);
                
                // 记录交易日志
                TransactionLog log = new TransactionLog();
                log.setAccountId(accountId);
                log.setTransactionId(transactionId);
                log.setAmount(amount);
                log.setType(TransactionType.DEPOSIT);
                log.setBalance(newBalance);
                log.setCreateTime(new Date());
                transactionLogRepository.save(log);
                
                return OperationResult.success("充值成功", newBalance);
            } finally {
                // 释放锁
                lock.unlock();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return OperationResult.fail("操作被中断");
        } catch (Exception e) {
            log.error("充值操作异常", e);
            return OperationResult.fail("系统异常: " + e.getMessage());
        }
    }
    
    /**
     * 扣款操作
     * @param accountId 账户ID
     * @param amount 扣款金额
     * @param transactionId 交易ID
     * @return 操作结果
     */
    public OperationResult withdraw(Long accountId, BigDecimal amount, String transactionId) {
        if (amount.compareTo(BigDecimal.ZERO) <= 0) {
            return OperationResult.fail("扣款金额必须大于0");
        }
        
        // 获取账户锁
        ReentrantLock lock = getAccountLock(accountId);
        
        try {
            // 尝试获取锁,设置超时时间
            if (!lock.tryLock(5, TimeUnit.SECONDS)) {
                return OperationResult.fail("账户操作繁忙,请稍后再试");
            }
            
            try {
                // 检查交易ID是否重复
                if (transactionLogRepository.existsByTransactionId(transactionId)) {
                    return OperationResult.fail("交易已存在,请勿重复提交");
                }
                
                // 读取当前余额
                BigDecimal currentBalance = getBalance(accountId);
                
                // 检查余额是否充足
                if (currentBalance.compareTo(amount) < 0) {
                    return OperationResult.fail("账户余额不足");
                }
                
                // 计算新余额
                BigDecimal newBalance = currentBalance.subtract(amount);
                
                // 执行更新操作
                int updated = accountRepository.updateBalance(accountId, newBalance);
                if (updated != 1) {
                    return OperationResult.fail("余额更新失败,账户可能不存在");
                }
                
                // 更新缓存
                balanceCache.put(accountId, newBalance);
                
                // 记录交易日志
                TransactionLog log = new TransactionLog();
                log.setAccountId(accountId);
                log.setTransactionId(transactionId);
                log.setAmount(amount.negate());
                log.setType(TransactionType.WITHDRAW);
                log.setBalance(newBalance);
                log.setCreateTime(new Date());
                transactionLogRepository.save(log);
                
                return OperationResult.success("扣款成功", newBalance);
            } finally {
                // 释放锁
                lock.unlock();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return OperationResult.fail("操作被中断");
        } catch (Exception e) {
            log.error("扣款操作异常", e);
            return OperationResult.fail("系统异常: " + e.getMessage());
        }
    }
    
    /**
     * 转账操作
     */
    public OperationResult transfer(Long fromAccountId, Long toAccountId, 
            BigDecimal amount, String transactionId) {
        if (amount.compareTo(BigDecimal.ZERO) <= 0) {
            return OperationResult.fail("转账金额必须大于0");
        }
        
        if (fromAccountId.equals(toAccountId)) {
            return OperationResult.fail("不能转账给自己");
        }
        
        // 按账户ID排序,避免死锁
        Long firstLockAccount = Math.min(fromAccountId, toAccountId);
        Long secondLockAccount = Math.max(fromAccountId, toAccountId);
        
        ReentrantLock firstLock = getAccountLock(firstLockAccount);
        ReentrantLock secondLock = getAccountLock(secondLockAccount);
        
        try {
            // 尝试获取第一个锁
            if (!firstLock.tryLock(5, TimeUnit.SECONDS)) {
                return OperationResult.fail("账户操作繁忙,请稍后再试");
            }
            
            try {
                // 尝试获取第二个锁
                if (!secondLock.tryLock(5, TimeUnit.SECONDS)) {
                    return OperationResult.fail("账户操作繁忙,请稍后再试");
                }
                
                try {
                    // 检查交易ID是否重复
                    if (transactionLogRepository.existsByTransactionId(transactionId)) {
                        return OperationResult.fail("交易已存在,请勿重复提交");
                    }
                    
                    // 读取转出账户余额
                    BigDecimal fromBalance = getBalance(fromAccountId);
                    
                    // 检查余额是否充足
                    if (fromBalance.compareTo(amount) < 0) {
                        return OperationResult.fail("转出账户余额不足");
                    }
                    
                    // 读取转入账户余额
                    BigDecimal toBalance = getBalance(toAccountId);
                    
                    // 计算新余额
                    BigDecimal newFromBalance = fromBalance.subtract(amount);
                    BigDecimal newToBalance = toBalance.add(amount);
                    
                    // 执行更新操作(事务保证)
                    boolean success = accountRepository.transferBalance(
                            fromAccountId, toAccountId, 
                            newFromBalance, newToBalance);
                            
                    if (!success) {
                        return OperationResult.fail("转账失败,请稍后再试");
                    }
                    
                    // 更新缓存
                    balanceCache.put(fromAccountId, newFromBalance);
                    balanceCache.put(toAccountId, newToBalance);
                    
                    // 记录交易日志
                    saveTransferLogs(fromAccountId, toAccountId, amount, transactionId, 
                            newFromBalance, newToBalance);
                    
                    return OperationResult.success("转账成功", newFromBalance);
                } finally {
                    secondLock.unlock();
                }
            } finally {
                firstLock.unlock();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return OperationResult.fail("操作被中断");
        } catch (Exception e) {
            log.error("转账操作异常", e);
            return OperationResult.fail("系统异常: " + e.getMessage());
        }
    }
    
    /**
     * 获取账户余额
     */
    public BigDecimal getBalance(Long accountId) {
        // 优先从缓存获取
        BigDecimal cachedBalance = balanceCache.get(accountId);
        if (cachedBalance != null) {
            return cachedBalance;
        }
        
        // 缓存未命中,从数据库加载
        BigDecimal balance = accountRepository.findBalanceById(accountId);
        if (balance != null) {
            balanceCache.put(accountId, balance);
        }
        
        return balance != null ? balance : BigDecimal.ZERO;
    }
    
    /**
     * 保存转账日志
     */
    private void saveTransferLogs(Long fromAccountId, Long toAccountId, 
            BigDecimal amount, String transactionId,
            BigDecimal fromBalance, BigDecimal toBalance) {
        // 转出日志
        TransactionLog fromLog = new TransactionLog();
        fromLog.setAccountId(fromAccountId);
        fromLog.setTransactionId(transactionId + "-from");
        fromLog.setAmount(amount.negate());
        fromLog.setType(TransactionType.TRANSFER_OUT);
        fromLog.setRelatedAccountId(toAccountId);
        fromLog.setBalance(fromBalance);
        fromLog.setCreateTime(new Date());
        
        // 转入日志
        TransactionLog toLog = new TransactionLog();
        toLog.setAccountId(toAccountId);
        toLog.setTransactionId(transactionId + "-to");
        toLog.setAmount(amount);
        toLog.setType(TransactionType.TRANSFER_IN);
        toLog.setRelatedAccountId(fromAccountId);
        toLog.setBalance(toBalance);
        toLog.setCreateTime(new Date());
        
        // 批量保存
        transactionLogRepository.saveAll(Arrays.asList(fromLog, toLog));
    }
    
    /**
     * 清理长时间不使用的锁
     */
    @Scheduled(fixedRate = 3600000) // 每小时执行一次
    public void cleanupUnusedLocks() {
        // 实际实现可能需要额外的锁使用跟踪逻辑
        // 这里简化处理,仅模拟清理过程
        int count = 0;
        for (Iterator<Map.Entry<Long, ReentrantLock>> it = accountLocks.entrySet().iterator(); 
                it.hasNext();) {
            Map.Entry<Long, ReentrantLock> entry = it.next();
            ReentrantLock lock = entry.getValue();
            if (!lock.isLocked()) {
                it.remove();
                count++;
            }
        }
        if (count > 0) {
            log.info("已清理{}个未使用的账户锁", count);
        }
    }
    
    @Data
    @AllArgsConstructor
    public static class OperationResult {
        private boolean success;
        private String message;
        private BigDecimal balance;
        
        public static OperationResult success(String message, BigDecimal balance) {
            return new OperationResult(true, message, balance);
        }
        
        public static OperationResult fail(String message) {
            return new OperationResult(false, message, null);
        }
    }
    
    public enum TransactionType {
        DEPOSIT,        // 充值
        WITHDRAW,       // 提现
        TRANSFER_OUT,   // 转出
        TRANSFER_IN     // 转入
    }
}

实际应用分析

  1. 细粒度锁控制:按账户ID创建锁,避免不同账户间的相互影响

  2. 公平锁设置:使用公平锁模式,确保先请求的线程先获得锁,防止饥饿

  3. 锁超时控制:通过 tryLock 设置超时时间,避免长时间等待导致系统卡顿

  4. 有序锁获取:转账时按ID排序获取锁,防止死锁

  5. 锁清理机制:定期清理不再使用的锁对象,避免内存泄漏

场景二:ReadWriteLock 实现高并发读写分离

ReadWriteLock 提供了读写分离的锁机制,允许多个线程同时读取共享资源,但写操作需要独占锁。这种机制在读多写少的场景中能显著提高并发性能。

案例:分布式配置中心的本地缓存

在分布式系统中,配置中心常需要高性能的本地缓存,以减少配置查询的延迟。

java
/**
 * 配置中心本地缓存
 */
@Component
public class ConfigurationCache {

    // 配置数据存储
    private final Map<String, String> configMap = new HashMap<>();
    
    // 读写锁
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = rwLock.readLock();
    private final Lock writeLock = rwLock.writeLock();
    
    // 配置版本号
    private volatile long configVersion = 0;
    
    @Resource
    private RemoteConfigService remoteConfigService;
    
    /**
     * 初始化加载配置
     */
    @PostConstruct
    public void init() {
        refreshConfigs();
    }
    
    /**
     * 获取配置项
     * @param key 配置键
     * @param defaultValue 默认值
     * @return 配置值
     */
    public String getConfig(String key, String defaultValue) {
        readLock.lock();
        try {
            return configMap.getOrDefault(key, defaultValue);
        } finally {
            readLock.unlock();
        }
    }
    
    /**
     * 批量获取配置项
     * @param keys 配置键集合
     * @return 配置映射
     */
    public Map<String, String> getConfigs(Collection<String> keys) {
        Map<String, String> result = new HashMap<>();
        
        readLock.lock();
        try {
            for (String key : keys) {
                String value = configMap.get(key);
                if (value != null) {
                    result.put(key, value);
                }
            }
            return result;
        } finally {
            readLock.unlock();
        }
    }
    
    /**
     * 获取当前配置版本
     */
    public long getConfigVersion() {
        return configVersion;
    }
    
    /**
     * 本地更新配置
     * @param key 配置键
     * @param value 配置值
     */
    public void updateConfig(String key, String value) {
        writeLock.lock();
        try {
            configMap.put(key, value);
            configVersion++;
            
            // 异步同步到远程配置中心
            CompletableFuture.runAsync(() -> {
                try {
                    remoteConfigService.updateConfig(key, value);
                } catch (Exception e) {
                    log.error("同步配置到远程服务失败: {}", key, e);
                }
            });
        } finally {
            writeLock.unlock();
        }
    }
    
    /**
     * 本地批量更新配置
     * @param configs 配置映射
     */
    public void updateConfigs(Map<String, String> configs) {
        if (configs == null || configs.isEmpty()) {
            return;
        }
        
        writeLock.lock();
        try {
            configMap.putAll(configs);
            configVersion++;
            
            // 异步同步到远程配置中心
            CompletableFuture.runAsync(() -> {
                try {
                    remoteConfigService.updateConfigs(configs);
                } catch (Exception e) {
                    log.error("批量同步配置到远程服务失败", e);
                }
            });
        } finally {
            writeLock.unlock();
        }
    }
    
    /**
     * 从远程配置中心刷新配置
     * @return 是否刷新成功
     */
    public boolean refreshConfigs() {
        try {
            // 获取远程配置和版本号
            RemoteConfigResult result = remoteConfigService.getAllConfigs();
            
            // 检查版本号,如果本地版本已经是最新,则无需更新
            if (result.getVersion() <= configVersion) {
                log.debug("本地配置已是最新版本,无需更新, 当前版本: {}", configVersion);
                return true;
            }
            
            writeLock.lock();
            try {
                // 再次检查版本号(双重检查,避免锁等待期间版本已更新)
                if (result.getVersion() <= configVersion) {
                    return true;
                }
                
                // 清空并重新加载所有配置
                configMap.clear();
                configMap.putAll(result.getConfigs());
                configVersion = result.getVersion();
                
                log.info("配置已刷新,当前版本: {}", configVersion);
                return true;
            } finally {
                writeLock.unlock();
            }
        } catch (Exception e) {
            log.error("刷新配置失败", e);
            return false;
        }
    }
    
    /**
     * 远程配置结果
     */
    @Data
    @AllArgsConstructor
    public static class RemoteConfigResult {
        private Map<String, String> configs;
        private long version;
    }
}

实际应用分析

  1. 读写分离:使用 ReadWriteLock 允许多个线程同时读取配置,提高读取性能

  2. 写入独占:更新配置时获取写锁,确保数据一致性

  3. 版本控制:使用版本号跟踪配置变更,避免不必要的更新

  4. 双重检查:获取写锁前后都检查版本号,减少锁竞争

  5. 异步同步:配置更新异步同步到远程服务,不阻塞本地操作

场景三:StampedLock 实现乐观读取

StampedLock 是 Java 8 引入的一种新型锁机制,提供了乐观读模式,可以在不获取读锁的情况下读取数据,在读多写极少的场景下性能更优。

案例:地理位置服务的坐标缓存

在地图服务中,需要高性能地读取和更新位置坐标,而更新频率远低于读取频率。

java
/**
 * 位置坐标缓存
 */
@Component
public class LocationCache {

    /**
     * 位置数据
     */
    @Data
    @AllArgsConstructor
    public static class LocationData {
        private double latitude;   // 纬度
        private double longitude;  // 经度
        private String address;    // 地址描述
        private long updateTime;   // 更新时间
        
        // 深拷贝
        public LocationData copy() {
            return new LocationData(latitude, longitude, address, updateTime);
        }
    }
    
    // 位置缓存
    private final ConcurrentHashMap<String, LocationData> locationCache = new ConcurrentHashMap<>();
    
    // 使用 StampedLock 实现高性能读取
    private final ConcurrentHashMap<String, StampedLock> locationLocks = new ConcurrentHashMap<>();
    
    // 缓存过期时间(毫秒)
    private static final long CACHE_EXPIRE_TIME = TimeUnit.MINUTES.toMillis(30);
    
    /**
     * 获取位置锁
     */
    private StampedLock getLocationLock(String id) {
        return locationLocks.computeIfAbsent(id, k -> new StampedLock());
    }
    
    /**
     * 获取位置数据
     * @param id 位置ID
     * @param loadFunction 加载函数,当缓存未命中或失效时调用
     * @return 位置数据
     */
    public LocationData getLocation(String id, Function<String, LocationData> loadFunction) {
        // 1. 尝试从缓存获取
        LocationData cachedData = locationCache.get(id);
        if (cachedData != null && !isExpired(cachedData)) {
            return cachedData.copy();
        }
        
        // 2. 缓存未命中或已过期,加载数据
        StampedLock lock = getLocationLock(id);
        
        // 3. 尝试乐观读
        long stamp = lock.tryOptimisticRead();
        cachedData = locationCache.get(id);
        
        // 4. 验证乐观读有效性,并检查是否已有其他线程更新了缓存
        if (stamp != 0 && lock.validate(stamp) && cachedData != null && !isExpired(cachedData)) {
            return cachedData.copy();
        }
        
        // 5. 乐观读失败,转换为悲观读
        stamp = lock.readLock();
        try {
            // 再次检查缓存
            cachedData = locationCache.get(id);
            if (cachedData != null && !isExpired(cachedData)) {
                return cachedData.copy();
            }
            
            // 6. 缓存仍未命中或已过期,释放读锁,获取写锁
            long ws = lock.tryConvertToWriteLock(stamp);
            if (ws != 0) {
                // 写锁获取成功
                stamp = ws;
                
                // 加载数据
                LocationData newData = loadFunction.apply(id);
                if (newData != null) {
                    locationCache.put(id, newData);
                    return newData.copy();
                } else {
                    return null;
                }
            } else {
                // 转换为写锁失败,手动释放读锁并获取写锁
                lock.unlockRead(stamp);
                stamp = lock.writeLock();
                try {
                    // 再次检查缓存(可能已有其他线程更新)
                    cachedData = locationCache.get(id);
                    if (cachedData != null && !isExpired(cachedData)) {
                        return cachedData.copy();
                    }
                    
                    // 加载数据
                    LocationData newData = loadFunction.apply(id);
                    if (newData != null) {
                        locationCache.put(id, newData);
                        return newData.copy();
                    } else {
                        return null;
                    }
                } finally {
                    lock.unlockWrite(stamp);
                }
            }
        } finally {
            // 如果仍持有读锁,释放它
            if (StampedLock.isReadLockStamp(stamp)) {
                lock.unlockRead(stamp);
            }
        }
    }
    
    /**
     * 更新位置数据
     * @param id 位置ID
     * @param location 新的位置数据
     */
    public void updateLocation(String id, LocationData location) {
        if (id == null || location == null) {
            throw new IllegalArgumentException("位置ID和数据不能为空");
        }
        
        // 设置更新时间
        location.setUpdateTime(System.currentTimeMillis());
        
        // 获取写锁
        StampedLock lock = getLocationLock(id);
        long stamp = lock.writeLock();
        try {
            // 更新缓存
            locationCache.put(id, location);
        } finally {
            lock.unlockWrite(stamp);
        }
    }
    
    /**
     * 检查数据是否过期
     */
    private boolean isExpired(LocationData data) {
        return System.currentTimeMillis() - data.getUpdateTime() > CACHE_EXPIRE_TIME;
    }
    
    /**
     * 清理过期缓存
     */
    @Scheduled(fixedRate = 300000) // 每5分钟执行一次
    public void cleanupExpiredCache() {
        int removedCount = 0;
        for (Iterator<Map.Entry<String, LocationData>> it = locationCache.entrySet().iterator(); 
                it.hasNext();) {
            Map.Entry<String, LocationData> entry = it.next();
            if (isExpired(entry.getValue())) {
                it.remove();
                removedCount++;
            }
        }
        
        if (removedCount > 0) {
            log.info("已清理{}个过期位置缓存", removedCount);
        }
        
        // 清理不再使用的锁
        for (Iterator<Map.Entry<String, StampedLock>> it = locationLocks.entrySet().iterator(); 
                it.hasNext();) {
            Map.Entry<String, StampedLock> entry = it.next();
            if (!locationCache.containsKey(entry.getKey())) {
                it.remove();
            }
        }
    }
}

使用示例:

java
@Service
public class LocationService {

    @Resource
    private LocationCache locationCache;
    
    @Resource
    private LocationRepository locationRepository;
    
    /**
     * 获取位置信息
     */
    public LocationInfo getLocationInfo(String locationId) {
        LocationCache.LocationData location = locationCache.getLocation(
                locationId,
                this::loadLocationFromDb
        );
        
        if (location == null) {
            return null;
        }
        
        return new LocationInfo(
                locationId,
                location.getLatitude(),
                location.getLongitude(),
                location.getAddress()
        );
    }
    
    /**
     * 从数据库加载位置数据
     */
    private LocationCache.LocationData loadLocationFromDb(String locationId) {
        LocationEntity entity = locationRepository.findById(locationId).orElse(null);
        if (entity == null) {
            return null;
        }
        
        return new LocationCache.LocationData(
                entity.getLatitude(),
                entity.getLongitude(),
                entity.getAddress(),
                System.currentTimeMillis()
        );
    }
    
    /**
     * 更新位置信息
     */
    @Transactional
    public void updateLocation(LocationUpdateRequest request) {
        // 更新数据库
        LocationEntity entity = locationRepository.findById(request.getLocationId())
                .orElseThrow(() -> new NotFoundException("位置信息不存在"));
                
        entity.setLatitude(request.getLatitude());
        entity.setLongitude(request.getLongitude());
        entity.setAddress(request.getAddress());
        entity.setUpdateTime(new Date());
        
        locationRepository.save(entity);
        
        // 更新缓存
        LocationCache.LocationData locationData = new LocationCache.LocationData(
                request.getLatitude(),
                request.getLongitude(),
                request.getAddress(),
                System.currentTimeMillis()
        );
        
        locationCache.updateLocation(request.getLocationId(), locationData);
    }
}

实际应用分析

  1. 乐观读优先:首先尝试乐观读,在大多数情况下无需加锁,提高并发性能

  2. 锁升级机制:失败时逐级升级,从乐观读到悲观读,再到写锁

  3. 锁转换:尝试将读锁直接转换为写锁,减少锁释放和获取的开销

  4. 深拷贝返回:返回对象的副本而非直接引用,避免外部修改影响缓存

  5. 定期清理:定期清理过期缓存和不再使用的锁对象,防止内存泄漏

结论与最佳实践

JUC 并发工具类在大厂的实际应用中展现出强大的性能和灵活性。通过本文的案例分析,我们可以总结出以下最佳实践:

线程池最佳实践

  1. 合理设置参数:核心线程数、最大线程数和队列容量应当根据业务特性和服务器资源合理配置

  2. 区分业务类型:不同业务类型使用独立的线程池,避免互相影响

  3. 监控和告警:实时监控线程池状态,包括活跃线程数、队列深度、拒绝次数等指标

  4. 优雅关闭:系统停止时,应当给予线程池足够的时间处理完已提交的任务

  5. 自定义拒绝策略:根据业务需求实现自定义的拒绝策略,如延迟重试、降级处理等

同步工具类最佳实践

  1. 超时控制:使用带超时参数的 await 方法,避免永久等待

  2. 异常处理:在 finally 块中确保 countDown 或释放信号量,避免死锁

  3. 合理使用:根据场景选择合适的同步工具,如需多次使用选择 CyclicBarrier,一次性等待选择 CountDownLatch

  4. 避免嵌套:避免在持有一个同步工具的情况下请求另一个,可能导致死锁

并发容器最佳实践

  1. 场景匹配:根据读写比例选择合适的容器,如读多写少场景优先考虑 ConcurrentHashMap 和 CopyOnWriteArrayList

  2. 批量操作:优先使用批量添加/获取方法,如 putAll、getAll 等,减少操作次数

  3. 原子操作利用:使用容器提供的原子操作方法,如 computeIfAbsent、putIfAbsent 等

  4. 容量预估:合理预估初始容量,减少扩容开销

原子操作类最佳实践

  1. 选择合适的实现:高并发计数场景优先考虑 LongAdder 而非 AtomicLong

  2. 避免忙等待:使用带 BackOff 策略的重试机制,而非简单循环尝试 CAS

  3. 复合操作注意:多个原子变量的操作不能保证整体原子性,必要时配合锁使用

  4. 版本控制:使用 AtomicStampedReference 解决 ABA 问题

锁机制最佳实践

  1. 最小化锁范围:尽量减小锁的粒度和持有时间

  2. 读写锁区分:在读多写少场景中使用 ReadWriteLock 或 StampedLock

  3. 避免死锁:获取多个锁时保持一致的顺序,使用 tryLock 避免无限等待

  4. 锁分离:使用不同的锁对象控制不同的资源,提高并发性

  5. 锁升级策略:优先使用乐观策略,失败时再升级为悲观锁

实际应用中,通常需要综合使用多种并发工具类,根据具体场景选择最合适的组合。更重要的是,需要持续监控系统性能,识别潜在的并发瓶颈,并不断优化线程模型和并发策略。

在大厂的工程实践中,除了正确使用 JUC 工具类外,还需要结合微服务架构、分布式系统设计原则,以及 DevOps 最佳实践,才能构建出真正高性能、高可用的并发系统。