工作概述


Activity

contributed projects

一、FMS

  • 梅山换电站

change

密钥交换

AES

内部模拟站控

监控心跳

交换密钥

二、api-server

onlineTrucks.stream().parallel().forEach(onlineTruck -> {
          StopPoint stopPoint = finalStopPointsMap.get(onlineTruck.getTRUCK_NO());
          guideTruckJob(onlineTruck, stopPoint);
});

基本原理:

  • parallelStream() 把任务拆分成多个子任务(split),通过 分而治之 的方式处理。
  • 使用 ForkJoinPool.commonPool 共享线程池,默认并行度为 CPU核心数 - 1)。
  • 将每个子任务提交到线程池并行执行。
  • 最后将每个子任务的结果合并(join)。

缺点:

  • 线程数固定:默认线程数可能不足或过多,且难以按任务调优,每秒要稳定处理几十甚至上百辆车,可能线程数不够,会处理不完拖延到下一帧
  • 难以监控和调试:排查某个车处理慢了、卡住了、没执行,parallelStream 会让整个过程比较黑箱化;用线程池可以做细粒度监控、限速、统计等。
  • 公共线程池不稳定:和系统中其他并发任务共享资源,造成处理不及时,延迟抖动

异步任务调度 + 并发控制 + 批处理机制

1.生产者-消费者模型(Producer-Consumer Pattern)

  • messageService.getOnlineTrucks() 是数据“生产”;
  • guideTruckJob(…) 是对每辆车的业务处理,“消费”;
  • 中间用线程池 + 并发控制机制完成调度。

2.线程池线程管理(Thread Pool)

  • 用 ExecutorService 控制线程资源复用;
  • 避免每次都创建线程,降低系统开销。

3.任务并发限流(Concurrency Throttling)

  • Semaphore 控制“并发窗口”,避免同时跑太多任务导致资源爆炸;
  • 类似 限流器 作用。

4.异步任务编排(Async Orchestration)

  • CompletableFuture 提供非阻塞异步执行;
  • CompletableFuture.allOf(…).join() 聚合结果,统一等待结束。
  1. 调度循环 + 周期控制(Looping Scheduler)
  • 整体是一个 while(true) 轮询框架;
  • 每帧执行后用 sleep 控制调度频率(典型的周期性调度器写法)。
import java.util.concurrent.*;

private static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
private static final ExecutorService truckJobExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
private static final int MAX_CONCURRENT_JOBS = THREAD_POOL_SIZE; // 限制同时运行的任务数
private static final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_JOBS);

public void guideJob() {
    while (true) {
        try {
            long beginTimeMs = System.currentTimeMillis();

            List<VehicleWiStatus> onlineTrucks = messageService.getOnlineTrucks();
            Map<String, StopPoint> finalStopPointsMap = messageService.fetchStopPoints();

            long beginTimeMs3 = System.currentTimeMillis();

            List<CompletableFuture<Void>> futures = new ArrayList<>();
            for (VehicleWiStatus onlineTruck : onlineTrucks) {
                StopPoint stopPoint = finalStopPointsMap.get(onlineTruck.getTRUCK_NO());

                futures.add(CompletableFuture.runAsync(() -> {
                    try {
                        semaphore.acquire(); // 控制并发任务数
                        guideTruckJob(onlineTruck, stopPoint);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } finally {
                        semaphore.release();
                    }
                }, truckJobExecutor));
            }

            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

            long beginTimeMs4 = System.currentTimeMillis();
            log.info("单帧多车执行总时长: {} ms.", beginTimeMs4 - beginTimeMs3);

            long endTimeMs = System.currentTimeMillis();
            long gapTimeMs = endTimeMs - beginTimeMs;
            if (gapTimeMs < 1000) {
                Thread.sleep(1000 - gapTimeMs);
            }
            if (gapTimeMs > 500) {
                log.warn("单帧执行时长 {}", gapTimeMs);
            }
        } catch (Exception e) {
            log.error("guideJob Exception: ", e);
        }
    }
}