diff --git a/rition-center/service/collector/src/main/java/rition/service/collector/MetricCollectingService.java b/rition-center/service/collector/src/main/java/rition/service/collector/MetricCollectingService.java index 996f4c1..9a9e6d5 100644 --- a/rition-center/service/collector/src/main/java/rition/service/collector/MetricCollectingService.java +++ b/rition-center/service/collector/src/main/java/rition/service/collector/MetricCollectingService.java @@ -11,6 +11,8 @@ import rition.service.collector.dao.entity.MetricRecordEntity; import rition.service.collector.dao.mapper.MetricRecordMapper; import java.time.Instant; +import java.util.ArrayList; +import java.util.List; import java.util.Map; /** @@ -32,19 +34,39 @@ public class MetricCollectingService { this.metricRecordMapper = metricRecordMapper; } + // 目前设定为40轮作为一个批次 + private static final int DEFAULT_DB_BATCH_SAVE_TURN = 40; + /** * 接收处理好的监控指标数据,以时间戳为分组入库和发送mq * * @param collectedMetricData 监控指标数据,以时间戳为组 */ public void receiveData(Map collectedMetricData) { + List batchSaveEntityList = new ArrayList<>(DEFAULT_DB_BATCH_SAVE_TURN * 23); + int turn = 0; for (Long timestamp : collectedMetricData.keySet()) { var metricDataDto = collectedMetricData.get(timestamp); var metricDataList = metricDataDto.getDataList(); var metricRecordEntityList = metricDataList.stream().map(this::convert).toList(); kafkaTemplate.send(this.collectedDataTopic, metricDataDto); - metricRecordMapper.insertBatchSomeColumn(metricRecordEntityList); + + // 分批入库 + batchSaveEntityList.addAll(metricRecordEntityList); + if (turn >= DEFAULT_DB_BATCH_SAVE_TURN) { + metricRecordMapper.insertBatchSomeColumn(batchSaveEntityList); + // 下一轮 + batchSaveEntityList.clear(); + turn = 0; + } else { + turn++; + } + } + + // 还有没入库的 + if (!batchSaveEntityList.isEmpty()) { + metricRecordMapper.insertBatchSomeColumn(batchSaveEntityList); } }