From f58fdd14a440e29136b18ff20e66fcc8371ee273 Mon Sep 17 00:00:00 2001 From: lensfrex Date: Mon, 6 May 2024 22:18:26 +0800 Subject: [PATCH] =?UTF-8?q?ref:=20=E4=BC=98=E5=8C=96collector=E9=83=A8?= =?UTF-8?q?=E5=88=86=E5=A4=A7=E6=89=B9=E9=87=8F=E6=95=B0=E6=8D=AE=E5=85=A5?= =?UTF-8?q?=E5=BA=93db=E6=97=B6=E7=9A=84=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../collector/MetricCollectingService.java | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/rition-center/service/collector/src/main/java/rition/service/collector/MetricCollectingService.java b/rition-center/service/collector/src/main/java/rition/service/collector/MetricCollectingService.java index 996f4c1..9a9e6d5 100644 --- a/rition-center/service/collector/src/main/java/rition/service/collector/MetricCollectingService.java +++ b/rition-center/service/collector/src/main/java/rition/service/collector/MetricCollectingService.java @@ -11,6 +11,8 @@ import rition.service.collector.dao.entity.MetricRecordEntity; import rition.service.collector.dao.mapper.MetricRecordMapper; import java.time.Instant; +import java.util.ArrayList; +import java.util.List; import java.util.Map; /** @@ -32,19 +34,39 @@ public class MetricCollectingService { this.metricRecordMapper = metricRecordMapper; } + // 目前设定为40轮作为一个批次 + private static final int DEFAULT_DB_BATCH_SAVE_TURN = 40; + /** * 接收处理好的监控指标数据,以时间戳为分组入库和发送mq * * @param collectedMetricData 监控指标数据,以时间戳为组 */ public void receiveData(Map collectedMetricData) { + List batchSaveEntityList = new ArrayList<>(DEFAULT_DB_BATCH_SAVE_TURN * 23); + int turn = 0; for (Long timestamp : collectedMetricData.keySet()) { var metricDataDto = collectedMetricData.get(timestamp); var metricDataList = metricDataDto.getDataList(); var metricRecordEntityList = metricDataList.stream().map(this::convert).toList(); kafkaTemplate.send(this.collectedDataTopic, metricDataDto); - metricRecordMapper.insertBatchSomeColumn(metricRecordEntityList); + + // 分批入库 + batchSaveEntityList.addAll(metricRecordEntityList); + if (turn >= DEFAULT_DB_BATCH_SAVE_TURN) { + metricRecordMapper.insertBatchSomeColumn(batchSaveEntityList); + // 下一轮 + batchSaveEntityList.clear(); + turn = 0; + } else { + turn++; + } + } + + // 还有没入库的 + if (!batchSaveEntityList.isEmpty()) { + metricRecordMapper.insertBatchSomeColumn(batchSaveEntityList); } }