|
|
|
@ -4,8 +4,9 @@ import com.github.yitter.idgen.YitIdHelper; |
|
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
|
import org.springframework.kafka.core.KafkaTemplate; |
|
|
|
|
import org.springframework.stereotype.Service; |
|
|
|
|
import rition.common.data.dto.MetricDataDto; |
|
|
|
|
import rition.common.data.dto.service.MetricDataDto; |
|
|
|
|
import rition.common.data.entity.MetricRecordEntity; |
|
|
|
|
import rition.common.data.enums.Constants; |
|
|
|
|
import rition.service.collector.configure.CollectorServiceKafkaConfigure; |
|
|
|
|
import rition.common.data.dao.mapper.MetricRecordMapper; |
|
|
|
|
|
|
|
|
@ -45,27 +46,31 @@ public class MetricCollectingService { |
|
|
|
|
* @param collectedMetricData 监控指标数据,以时间戳为组 |
|
|
|
|
*/ |
|
|
|
|
public void receiveData(Map<Long, Map<String, MetricDataDto>> collectedMetricData) { |
|
|
|
|
List<MetricRecordEntity> batchSaveEntityList = new ArrayList<>(DEFAULT_DB_BATCH_SIZE); |
|
|
|
|
List<MetricRecordEntity> batchSaveEntityList = new ArrayList<>(DEFAULT_DB_BATCH_SIZE * METRIC_NUMS); |
|
|
|
|
int turn = 0; |
|
|
|
|
for (Long timestamp : collectedMetricData.keySet()) { |
|
|
|
|
Map<String, MetricDataDto> instanceMetricData = collectedMetricData.get(timestamp); |
|
|
|
|
for (String instanceId : instanceMetricData.keySet()) { |
|
|
|
|
MetricDataDto metricDataDto = instanceMetricData.get(instanceId); |
|
|
|
|
|
|
|
|
|
MetricRecordEntity entity = new MetricRecordEntity(); |
|
|
|
|
entity.setId(YitIdHelper.nextId()); |
|
|
|
|
entity.setInstanceId(instanceId); |
|
|
|
|
entity.setMetricData(metricDataDto.getData()); |
|
|
|
|
var time = Instant.ofEpochSecond(metricDataDto.getTimestamp()); |
|
|
|
|
entity.setTime(time); |
|
|
|
|
entity.setUpdateTime(time); |
|
|
|
|
entity.setStatus(0); |
|
|
|
|
List<MetricRecordEntity> metricRecordEntityList = new ArrayList<>(METRIC_NUMS); |
|
|
|
|
for (String metric : metricDataDto.getData().keySet()) { |
|
|
|
|
MetricRecordEntity entity = new MetricRecordEntity(); |
|
|
|
|
entity.setId(YitIdHelper.nextId()); |
|
|
|
|
entity.setInstanceId(instanceId); |
|
|
|
|
entity.setMetric(metric); |
|
|
|
|
entity.setValue(metricDataDto.getData().get(metric)); |
|
|
|
|
|
|
|
|
|
// kafka发布
|
|
|
|
|
kafkaTemplate.send(this.collectedDataTopic, metricDataDto); |
|
|
|
|
var time = Instant.ofEpochSecond(metricDataDto.getTimestamp()); |
|
|
|
|
entity.setTime(time); |
|
|
|
|
entity.setUpdateTime(time); |
|
|
|
|
entity.setStatus(Constants.EntityCommonStatus.NORMAL); |
|
|
|
|
|
|
|
|
|
metricRecordEntityList.add(entity); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// 分批入库
|
|
|
|
|
batchSaveEntityList.add(entity); |
|
|
|
|
batchSaveEntityList.addAll(metricRecordEntityList); |
|
|
|
|
if (turn >= DEFAULT_DB_BATCH_SIZE) { |
|
|
|
|
metricRecordMapper.insertBatchSomeColumn(batchSaveEntityList); |
|
|
|
|
// 下一轮
|
|
|
|
@ -74,6 +79,9 @@ public class MetricCollectingService { |
|
|
|
|
} else { |
|
|
|
|
turn++; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// kafka发布
|
|
|
|
|
kafkaTemplate.send(this.collectedDataTopic, metricDataDto); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|