工作概述
一、FMS
- 梅山换电站
内部模拟站控
二、api-server
onlineTrucks.stream().parallel().forEach(onlineTruck -> {
StopPoint stopPoint = finalStopPointsMap.get(onlineTruck.getTRUCK_NO());
guideTruckJob(onlineTruck, stopPoint);
});
基本原理:
- parallelStream() 把任务拆分成多个子任务(split),通过 分而治之 的方式处理。
- 使用 ForkJoinPool.commonPool 共享线程池,默认并行度为 CPU核心数 - 1)。
- 将每个子任务提交到线程池并行执行。
- 最后将每个子任务的结果合并(join)。
缺点:
- 线程数固定:默认线程数可能不足或过多,且难以按任务调优,每秒要稳定处理几十甚至上百辆车,可能线程数不够,会处理不完 或 拖延到下一帧。
- 难以监控和调试:排查某个车处理慢了、卡住了、没执行,parallelStream 会让整个过程比较黑箱化;用线程池可以做细粒度监控、限速、统计等。
- 公共线程池不稳定:和系统中其他并发任务共享资源,造成处理不及时,延迟抖动
异步任务调度 + 并发控制 + 批处理机制
1.生产者-消费者模型(Producer-Consumer Pattern)
- messageService.getOnlineTrucks() 是数据“生产”;
- guideTruckJob(…) 是对每辆车的业务处理,“消费”;
- 中间用线程池 + 并发控制机制完成调度。
2.线程池线程管理(Thread Pool)
- 用 ExecutorService 控制线程资源复用;
- 避免每次都创建线程,降低系统开销。
3.任务并发限流(Concurrency Throttling)
- Semaphore 控制“并发窗口”,避免同时跑太多任务导致资源爆炸;
- 类似 限流器 作用。
4.异步任务编排(Async Orchestration)
- CompletableFuture 提供非阻塞异步执行;
- CompletableFuture.allOf(…).join() 聚合结果,统一等待结束。
- 调度循环 + 周期控制(Looping Scheduler)
- 整体是一个 while(true) 轮询框架;
- 每帧执行后用 sleep 控制调度频率(典型的周期性调度器写法)。
import java.util.concurrent.*;
private static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
private static final ExecutorService truckJobExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
private static final int MAX_CONCURRENT_JOBS = THREAD_POOL_SIZE; // 限制同时运行的任务数
private static final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_JOBS);
public void guideJob() {
while (true) {
try {
long beginTimeMs = System.currentTimeMillis();
List<VehicleWiStatus> onlineTrucks = messageService.getOnlineTrucks();
Map<String, StopPoint> finalStopPointsMap = messageService.fetchStopPoints();
long beginTimeMs3 = System.currentTimeMillis();
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (VehicleWiStatus onlineTruck : onlineTrucks) {
StopPoint stopPoint = finalStopPointsMap.get(onlineTruck.getTRUCK_NO());
futures.add(CompletableFuture.runAsync(() -> {
try {
semaphore.acquire(); // 控制并发任务数
guideTruckJob(onlineTruck, stopPoint);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release();
}
}, truckJobExecutor));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
long beginTimeMs4 = System.currentTimeMillis();
log.info("单帧多车执行总时长: {} ms.", beginTimeMs4 - beginTimeMs3);
long endTimeMs = System.currentTimeMillis();
long gapTimeMs = endTimeMs - beginTimeMs;
if (gapTimeMs < 1000) {
Thread.sleep(1000 - gapTimeMs);
}
if (gapTimeMs > 500) {
log.warn("单帧执行时长 {}", gapTimeMs);
}
} catch (Exception e) {
log.error("guideJob Exception: ", e);
}
}
}