|
|
|
@ -11,6 +11,8 @@ import rition.service.collector.dao.entity.MetricRecordEntity; |
|
|
|
|
import rition.service.collector.dao.mapper.MetricRecordMapper; |
|
|
|
|
|
|
|
|
|
import java.time.Instant; |
|
|
|
|
import java.util.ArrayList; |
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.Map; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -32,19 +34,39 @@ public class MetricCollectingService { |
|
|
|
|
this.metricRecordMapper = metricRecordMapper; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// 目前设定为40轮作为一个批次
|
|
|
|
|
private static final int DEFAULT_DB_BATCH_SAVE_TURN = 40; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* 接收处理好的监控指标数据,以时间戳为分组入库和发送mq |
|
|
|
|
* |
|
|
|
|
* @param collectedMetricData 监控指标数据,以时间戳为组 |
|
|
|
|
*/ |
|
|
|
|
public void receiveData(Map<Long, MetricDataDto> collectedMetricData) { |
|
|
|
|
List<MetricRecordEntity> batchSaveEntityList = new ArrayList<>(DEFAULT_DB_BATCH_SAVE_TURN * 23); |
|
|
|
|
int turn = 0; |
|
|
|
|
for (Long timestamp : collectedMetricData.keySet()) { |
|
|
|
|
var metricDataDto = collectedMetricData.get(timestamp); |
|
|
|
|
var metricDataList = metricDataDto.getDataList(); |
|
|
|
|
var metricRecordEntityList = metricDataList.stream().map(this::convert).toList(); |
|
|
|
|
|
|
|
|
|
kafkaTemplate.send(this.collectedDataTopic, metricDataDto); |
|
|
|
|
metricRecordMapper.insertBatchSomeColumn(metricRecordEntityList); |
|
|
|
|
|
|
|
|
|
// 分批入库
|
|
|
|
|
batchSaveEntityList.addAll(metricRecordEntityList); |
|
|
|
|
if (turn >= DEFAULT_DB_BATCH_SAVE_TURN) { |
|
|
|
|
metricRecordMapper.insertBatchSomeColumn(batchSaveEntityList); |
|
|
|
|
// 下一轮
|
|
|
|
|
batchSaveEntityList.clear(); |
|
|
|
|
turn = 0; |
|
|
|
|
} else { |
|
|
|
|
turn++; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// 还有没入库的
|
|
|
|
|
if (!batchSaveEntityList.isEmpty()) { |
|
|
|
|
metricRecordMapper.insertBatchSomeColumn(batchSaveEntityList); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|