From 045f41cafea4bc53ed408552f669c7fe848b6d17 Mon Sep 17 00:00:00 2001 From: lensfrex Date: Sun, 5 May 2024 22:34:28 +0800 Subject: [PATCH] =?UTF-8?q?impl:=20=E5=9F=BA=E6=9C=AC=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=94=B6=E9=9B=86=E6=A8=A1=E5=9D=97=EF=BC=8C?= =?UTF-8?q?=E4=BB=A5=E5=8F=8A=E6=9C=8D=E5=8A=A1=E5=99=A8=E7=8A=B6=E6=80=81?= =?UTF-8?q?=E6=8E=A2=E9=92=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rition-center/api/pom.xml | 15 +++ .../rition/backend/RitionBackendMain.java | 13 ++- .../api/v1/DataCollectingController.java | 30 ++++- ...uest.java => MetricDataUploadRequest.java} | 4 +- .../service/MetricDataCollectingService.java | 30 +++++ .../api/src/main/resources/application.yml | 6 +- rition-center/common/pom.xml | 38 +++++++ .../rition/common/data/dto/MetricDataDto.java | 48 ++++++++ .../common/data/enums/CommonEntityStatus.java | 17 +++ .../rition/common/data/enums/Constants.java | 7 ++ .../common/src/main/proto/metric_data.proto | 17 +++ rition-center/pom.xml | 9 ++ rition-center/service/collector/pom.xml | 37 ++++++ .../collector/DataCollectorService.java | 4 - .../collector/MetricCollectingService.java | 94 ++++++++++++++++ .../collector/configure/KafkaConfigure.java | 20 ++++ .../configure/MybatisPlusConfigure.java | 8 ++ .../collector/dao/BatchSQLInjector.java | 21 ++++ .../dao/entity/MetricRecordEntity.java | 59 ++++++++++ .../dao/mapper/MetricRecordMapper.java | 12 ++ .../mq/CollectedMetricDataDeserializer.java | 18 +++ .../mq/ProtobufMessageSerializer.java | 13 +++ .../main/resources/application-collector.yml | 19 ++++ .../resources/mapper/MetricRecordMapper.xml | 5 + rition-center/service/pom.xml | 7 ++ rition-probe/client/client.go | 3 - rition-probe/client/http.go | 71 +++++++++++- rition-probe/main.go | 32 ++++++ rition-probe/probe/cpu.go | 4 +- rition-probe/probe/cpu_test.go | 10 ++ rition-probe/probe/host.go | 2 +- rition-probe/probe/network.go | 11 +- rition-probe/probe/network_test.go | 6 + rition-probe/service/probe.go | 106 ++++++++++++++++++ rition-probe/service/service.go | 24 +++- 35 files changed, 790 insertions(+), 30 deletions(-) rename rition-center/api/src/main/java/rition/backend/api/v1/dto/request/{MonitorDataUploadRequest.java => MetricDataUploadRequest.java} (83%) create mode 100644 rition-center/api/src/main/java/rition/backend/service/MetricDataCollectingService.java create mode 100644 rition-center/common/src/main/java/rition/common/data/dto/MetricDataDto.java create mode 100644 rition-center/common/src/main/java/rition/common/data/enums/CommonEntityStatus.java create mode 100644 rition-center/common/src/main/java/rition/common/data/enums/Constants.java create mode 100644 rition-center/common/src/main/proto/metric_data.proto delete mode 100644 rition-center/service/collector/src/main/java/rition/service/collector/DataCollectorService.java create mode 100644 rition-center/service/collector/src/main/java/rition/service/collector/MetricCollectingService.java create mode 100644 rition-center/service/collector/src/main/java/rition/service/collector/configure/KafkaConfigure.java create mode 100644 rition-center/service/collector/src/main/java/rition/service/collector/configure/MybatisPlusConfigure.java create mode 100644 rition-center/service/collector/src/main/java/rition/service/collector/dao/BatchSQLInjector.java create mode 100644 rition-center/service/collector/src/main/java/rition/service/collector/dao/entity/MetricRecordEntity.java create mode 100644 rition-center/service/collector/src/main/java/rition/service/collector/dao/mapper/MetricRecordMapper.java create mode 100644 rition-center/service/collector/src/main/java/rition/service/collector/mq/CollectedMetricDataDeserializer.java create mode 100644 rition-center/service/collector/src/main/java/rition/service/collector/mq/ProtobufMessageSerializer.java create mode 100644 rition-center/service/collector/src/main/resources/application-collector.yml create mode 100644 rition-center/service/collector/src/main/resources/mapper/MetricRecordMapper.xml create mode 100644 rition-probe/probe/cpu_test.go create mode 100644 rition-probe/service/probe.go diff --git a/rition-center/api/pom.xml b/rition-center/api/pom.xml index bd9217e..73f5b73 100644 --- a/rition-center/api/pom.xml +++ b/rition-center/api/pom.xml @@ -28,5 +28,20 @@ common ${revision} + + net.rition + collector + ${revision} + + + net.rition + monitor + ${revision} + + + net.rition + notify + ${revision} + \ No newline at end of file diff --git a/rition-center/api/src/main/java/rition/backend/RitionBackendMain.java b/rition-center/api/src/main/java/rition/backend/RitionBackendMain.java index 9316aac..592d8fc 100644 --- a/rition-center/api/src/main/java/rition/backend/RitionBackendMain.java +++ b/rition-center/api/src/main/java/rition/backend/RitionBackendMain.java @@ -1,15 +1,26 @@ package rition.backend; +import com.github.yitter.contract.IdGeneratorOptions; +import com.github.yitter.idgen.YitIdHelper; +import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.transaction.annotation.EnableTransactionManagement; @EnableAsync @EnableScheduling -@SpringBootApplication +@SpringBootApplication(scanBasePackages = { + "rition", +}) +@MapperScan("rition") +@EnableTransactionManagement public class RitionBackendMain { public static void main(String[] args) { + IdGeneratorOptions options = new IdGeneratorOptions((short) 0); + YitIdHelper.setIdGenerator(options); + SpringApplication.run(RitionBackendMain.class); } } diff --git a/rition-center/api/src/main/java/rition/backend/api/v1/DataCollectingController.java b/rition-center/api/src/main/java/rition/backend/api/v1/DataCollectingController.java index 121b16e..a76362d 100644 --- a/rition-center/api/src/main/java/rition/backend/api/v1/DataCollectingController.java +++ b/rition-center/api/src/main/java/rition/backend/api/v1/DataCollectingController.java @@ -4,21 +4,43 @@ import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import rition.backend.annotation.paramter.RequestId; import rition.backend.annotation.WithRequestIdResponse; -import rition.backend.api.v1.dto.request.MonitorDataUploadRequest; +import rition.backend.api.v1.dto.request.MetricDataUploadRequest; import rition.backend.api.v1.dto.response.Response; +import rition.backend.service.MetricDataCollectingService; +import rition.common.data.dto.MetricDataDto; +import java.util.ArrayList; import java.util.List; @RestController @RequestMapping("/metric") public class DataCollectingController { + private final MetricDataCollectingService metricDataCollectingService; + + public DataCollectingController(MetricDataCollectingService metricDataCollectingService) { + this.metricDataCollectingService = metricDataCollectingService; + } + @PostMapping("/put") @WithRequestIdResponse - public Response putData(@RequestBody List request, - @RequestId String requestId) { + public Response putData(@RequestBody List uploadDataList) { + + // dto转换 + MetricDataDto metricDataDto = new MetricDataDto(); + List dataList = uploadDataList.stream() + .map(data -> MetricDataDto.MetricData.builder() + .metric(data.getMetric()) + .value(data.getValue()) + .timestamp(data.getTimestamp()) + .tags(data.getTags()) + .build()) + .toList(); + + metricDataDto.setDataList(dataList); + metricDataCollectingService.receiveData(metricDataDto); + return Response.success(); } } diff --git a/rition-center/api/src/main/java/rition/backend/api/v1/dto/request/MonitorDataUploadRequest.java b/rition-center/api/src/main/java/rition/backend/api/v1/dto/request/MetricDataUploadRequest.java similarity index 83% rename from rition-center/api/src/main/java/rition/backend/api/v1/dto/request/MonitorDataUploadRequest.java rename to rition-center/api/src/main/java/rition/backend/api/v1/dto/request/MetricDataUploadRequest.java index 3122b4b..dade081 100644 --- a/rition-center/api/src/main/java/rition/backend/api/v1/dto/request/MonitorDataUploadRequest.java +++ b/rition-center/api/src/main/java/rition/backend/api/v1/dto/request/MetricDataUploadRequest.java @@ -6,7 +6,7 @@ import lombok.Data; import java.util.Map; @Data -public class MonitorDataUploadRequest { +public class MetricDataUploadRequest { /** * 指标名 */ @@ -25,5 +25,5 @@ public class MonitorDataUploadRequest { /** * 值 */ - private Float value; + private String value; } diff --git a/rition-center/api/src/main/java/rition/backend/service/MetricDataCollectingService.java b/rition-center/api/src/main/java/rition/backend/service/MetricDataCollectingService.java new file mode 100644 index 0000000..d6e24c6 --- /dev/null +++ b/rition-center/api/src/main/java/rition/backend/service/MetricDataCollectingService.java @@ -0,0 +1,30 @@ +package rition.backend.service; + +import org.springframework.stereotype.Service; +import rition.common.data.dto.MetricDataDto; +import rition.common.data.dto.MetricDataProto; +import rition.service.collector.MetricCollectingService; + +/** + * MetricCollectorService 指标数据收集类,负责收集来自云主机探针上报的数据,并将其入库等, + * 这里只是个代理类,实际的实现在collector模块, + * 这样做只是为了方便日后将collector等模块独立拆分出来, + * 这种情况下实际上数据收集的入口应当直接放在collector里,而不是api部分 + */ +@Service +public class MetricDataCollectingService { + + private final MetricCollectingService metricCollectingService; + + public MetricDataCollectingService(MetricCollectingService metricCollectingService) { + this.metricCollectingService = metricCollectingService; + } + + /** + * 接收处理好的监控指标数据 + * @param collectedMetricData 监控指标数据,protobuf对象 + */ + public void receiveData(MetricDataDto collectedMetricData) { + metricCollectingService.receiveData(collectedMetricData); + } +} diff --git a/rition-center/api/src/main/resources/application.yml b/rition-center/api/src/main/resources/application.yml index 0c11785..e7e3cc2 100644 --- a/rition-center/api/src/main/resources/application.yml +++ b/rition-center/api/src/main/resources/application.yml @@ -1,2 +1,6 @@ server: - port: 22019 \ No newline at end of file + port: 22019 + +spring: + profiles: + include: collector \ No newline at end of file diff --git a/rition-center/common/pom.xml b/rition-center/common/pom.xml index 3944548..1b0c570 100644 --- a/rition-center/common/pom.xml +++ b/rition-center/common/pom.xml @@ -15,6 +15,44 @@ 17 17 UTF-8 + + 3.25.3 + + + com.google.protobuf + protobuf-java + ${protobuf.version} + + + + + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + true + + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + + + + + compile + test-compile + + + + + + \ No newline at end of file diff --git a/rition-center/common/src/main/java/rition/common/data/dto/MetricDataDto.java b/rition-center/common/src/main/java/rition/common/data/dto/MetricDataDto.java new file mode 100644 index 0000000..b2c4349 --- /dev/null +++ b/rition-center/common/src/main/java/rition/common/data/dto/MetricDataDto.java @@ -0,0 +1,48 @@ + +package rition.common.data.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; +import java.util.Map; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class MetricDataDto { + + private List dataList; + + /** + * 指标数据 + */ + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class MetricData { + /** + * 指标名 + */ + private String metric; + + /** + * 数据标签 + */ + private Map tags; + + /** + * 时间戳 + */ + private Long timestamp; + + /** + * 值 + */ + private String value; + } +} diff --git a/rition-center/common/src/main/java/rition/common/data/enums/CommonEntityStatus.java b/rition-center/common/src/main/java/rition/common/data/enums/CommonEntityStatus.java new file mode 100644 index 0000000..0dfc333 --- /dev/null +++ b/rition-center/common/src/main/java/rition/common/data/enums/CommonEntityStatus.java @@ -0,0 +1,17 @@ +package rition.common.data.enums; + +import lombok.Getter; + +@Getter +public enum CommonEntityStatus { + STATUS_NORMAL(0), + STATUS_DELETED(1), + STATUS_HIDDEN(2), + ; + + private final int value; + + CommonEntityStatus(int value) { + this.value = value; + } +} diff --git a/rition-center/common/src/main/java/rition/common/data/enums/Constants.java b/rition-center/common/src/main/java/rition/common/data/enums/Constants.java new file mode 100644 index 0000000..8323267 --- /dev/null +++ b/rition-center/common/src/main/java/rition/common/data/enums/Constants.java @@ -0,0 +1,7 @@ +package rition.common.data.enums; + +import java.time.ZoneId; + +public class Constants { + public static final ZoneId DefaultTimeZone = ZoneId.of("Asia/Shanghai"); +} diff --git a/rition-center/common/src/main/proto/metric_data.proto b/rition-center/common/src/main/proto/metric_data.proto new file mode 100644 index 0000000..90d43fd --- /dev/null +++ b/rition-center/common/src/main/proto/metric_data.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package rition.proto.collecting.v1; + +option java_package = "rition.common.data.dto"; +option java_outer_classname = "MetricDataProto"; + +message MetricData { + string metric = 1; + string value = 2; + uint64 timestamp = 3; + map tags = 4; +} + +message CollectedMetricData { + repeated MetricData data = 4; +} \ No newline at end of file diff --git a/rition-center/pom.xml b/rition-center/pom.xml index e69c91f..2de6815 100644 --- a/rition-center/pom.xml +++ b/rition-center/pom.xml @@ -27,6 +27,8 @@ 17 17 UTF-8 + + 1.0.6 @@ -56,6 +58,13 @@ com.fasterxml.jackson.core jackson-annotations + + + + com.github.yitter + yitter-idgenerator + ${idgen.version} + diff --git a/rition-center/service/collector/pom.xml b/rition-center/service/collector/pom.xml index aba9b9a..2614d3b 100644 --- a/rition-center/service/collector/pom.xml +++ b/rition-center/service/collector/pom.xml @@ -15,6 +15,43 @@ 17 17 UTF-8 + + 3.0.3 + 3.5.16 + 3.5.6 + + + org.springframework.kafka + spring-kafka + + + + org.mybatis.spring.boot + mybatis-spring-boot-starter + ${mybatis-sb-starter.version} + + + org.mybatis.spring.boot + mybatis-spring-boot-starter-test + ${mybatis-sb-starter.version} + test + + + org.mybatis + mybatis + ${mybatis.version} + + + com.baomidou + mybatis-plus-spring-boot3-starter + ${mybatis-plus.version} + + + com.mysql + mysql-connector-j + runtime + + \ No newline at end of file diff --git a/rition-center/service/collector/src/main/java/rition/service/collector/DataCollectorService.java b/rition-center/service/collector/src/main/java/rition/service/collector/DataCollectorService.java deleted file mode 100644 index bc20d67..0000000 --- a/rition-center/service/collector/src/main/java/rition/service/collector/DataCollectorService.java +++ /dev/null @@ -1,4 +0,0 @@ -package rition.service.collector; - -public class DataCollectorService { -} diff --git a/rition-center/service/collector/src/main/java/rition/service/collector/MetricCollectingService.java b/rition-center/service/collector/src/main/java/rition/service/collector/MetricCollectingService.java new file mode 100644 index 0000000..0f8e8d5 --- /dev/null +++ b/rition-center/service/collector/src/main/java/rition/service/collector/MetricCollectingService.java @@ -0,0 +1,94 @@ +package rition.service.collector; + +import com.github.yitter.idgen.YitIdHelper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; +import rition.common.data.dto.MetricDataDto; +import rition.common.data.enums.CommonEntityStatus; +import rition.service.collector.configure.KafkaConfigure; +import rition.service.collector.dao.entity.MetricRecordEntity; +import rition.service.collector.dao.mapper.MetricRecordMapper; + +import java.time.Instant; +import java.util.ArrayList; + +/** + * DataCollectorService 指标数据收集类,负责收集来自云主机探针上报的数据,并将其入库等 + */ +@Slf4j +@Service +public class MetricCollectingService { + + private final KafkaTemplate kafkaTemplate; + private final KafkaConfigure kafkaConfigure; + + private final String collectedDataTopic; + private final MetricRecordMapper metricRecordMapper; + + public MetricCollectingService(KafkaTemplate kafkaTemplate, + KafkaConfigure kafkaConfigure, MetricRecordMapper metricRecordMapper) { + this.kafkaTemplate = kafkaTemplate; + this.kafkaConfigure = kafkaConfigure; + this.collectedDataTopic = kafkaConfigure.getDataCollecting().getTopic(); + this.metricRecordMapper = metricRecordMapper; + } + + private static final int MAX_BATCH_INSERT_SIZE = 512; + + /** + * 接收处理好的监控指标数据 + * + * @param collectedMetricData 监控指标数据,protobuf对象 + */ + public void receiveData(MetricDataDto collectedMetricData) { + // 对大于MAX_BATCH_INSERT_SIZE大小的数据进行分组的批量插入 + var metricDataList = collectedMetricData.getDataList(); + if (metricDataList.size() <= MAX_BATCH_INSERT_SIZE) { + var metricRecordEntityList = new ArrayList(metricDataList.size()); + for (MetricDataDto.MetricData metricData : metricDataList) { + MetricRecordEntity entity = this.convert(metricData); + + metricRecordEntityList.add(entity); + } + + kafkaTemplate.send(this.collectedDataTopic, collectedMetricData); + metricRecordMapper.insertBatchSomeColumn(metricRecordEntityList); + } else { + var batch = metricDataList.size() / MAX_BATCH_INSERT_SIZE; + if (metricDataList.size() % MAX_BATCH_INSERT_SIZE != 0) { + batch++; + } + + var metricRecordEntityList = new ArrayList(MAX_BATCH_INSERT_SIZE); + for (int i = 0; i < batch; i++) { + metricRecordEntityList.clear(); + var from = i * MAX_BATCH_INSERT_SIZE; + var to = (i + 1) * MAX_BATCH_INSERT_SIZE; + var subList = metricDataList.subList(from, Math.min(metricDataList.size(), to)); + for (MetricDataDto.MetricData metricData : subList) { + MetricRecordEntity entity = this.convert(metricData); + metricRecordEntityList.add(entity); + } + + kafkaTemplate.send(this.collectedDataTopic, MetricDataDto.builder().dataList(subList).build()); + metricRecordMapper.insertBatchSomeColumn(metricRecordEntityList); + } + } + } + + public MetricRecordEntity convert(MetricDataDto.MetricData metricData) { + MetricRecordEntity entity = new MetricRecordEntity(); + entity.setId(YitIdHelper.nextId()); + entity.setInstanceId(metricData.getTags().get("instanceId")); + entity.setItem(metricData.getMetric()); + entity.setValue(metricData.getValue()); + entity.setStatus(CommonEntityStatus.STATUS_NORMAL.getValue()); + + var time = Instant.ofEpochMilli(metricData.getTimestamp()); + entity.setTime(time); + entity.setUpdateTime(time); + + return entity; + } +} diff --git a/rition-center/service/collector/src/main/java/rition/service/collector/configure/KafkaConfigure.java b/rition-center/service/collector/src/main/java/rition/service/collector/configure/KafkaConfigure.java new file mode 100644 index 0000000..8e2f01b --- /dev/null +++ b/rition-center/service/collector/src/main/java/rition/service/collector/configure/KafkaConfigure.java @@ -0,0 +1,20 @@ +package rition.service.collector.configure; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@ConfigurationProperties("rition.kafka") +public class KafkaConfigure { + private DataCollectingKafkaConfig dataCollecting; + + @Data + public static class DataCollectingKafkaConfig { + private String topic; + private String group; + } + + +} diff --git a/rition-center/service/collector/src/main/java/rition/service/collector/configure/MybatisPlusConfigure.java b/rition-center/service/collector/src/main/java/rition/service/collector/configure/MybatisPlusConfigure.java new file mode 100644 index 0000000..e639035 --- /dev/null +++ b/rition-center/service/collector/src/main/java/rition/service/collector/configure/MybatisPlusConfigure.java @@ -0,0 +1,8 @@ +package rition.service.collector.configure; + +import org.springframework.context.annotation.Configuration; + +@Configuration +public class MybatisPlusConfigure { + +} diff --git a/rition-center/service/collector/src/main/java/rition/service/collector/dao/BatchSQLInjector.java b/rition-center/service/collector/src/main/java/rition/service/collector/dao/BatchSQLInjector.java new file mode 100644 index 0000000..55fcb90 --- /dev/null +++ b/rition-center/service/collector/src/main/java/rition/service/collector/dao/BatchSQLInjector.java @@ -0,0 +1,21 @@ +package rition.service.collector.dao; + +import com.baomidou.mybatisplus.core.injector.AbstractMethod; +import com.baomidou.mybatisplus.core.injector.DefaultSqlInjector; +import com.baomidou.mybatisplus.core.metadata.TableInfo; +import com.baomidou.mybatisplus.extension.injector.methods.InsertBatchSomeColumn; +import org.apache.ibatis.session.Configuration; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Component +public class BatchSQLInjector extends DefaultSqlInjector { + @Override + public List getMethodList(Configuration configuration, Class mapperClass, TableInfo tableInfo) { + List methodList = super.getMethodList(configuration, mapperClass, tableInfo); + methodList.add(new InsertBatchSomeColumn()); + + return methodList; + } +} diff --git a/rition-center/service/collector/src/main/java/rition/service/collector/dao/entity/MetricRecordEntity.java b/rition-center/service/collector/src/main/java/rition/service/collector/dao/entity/MetricRecordEntity.java new file mode 100644 index 0000000..a30deaf --- /dev/null +++ b/rition-center/service/collector/src/main/java/rition/service/collector/dao/entity/MetricRecordEntity.java @@ -0,0 +1,59 @@ +package rition.service.collector.dao.entity; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Instant; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +@TableName("rition_record") +public class MetricRecordEntity { + + /** + * 数据记录id + */ + private Long id; + + /** + * 云主机实例id + */ + private String instanceId; + + /** + * 数据项 + */ + private String item; + + /** + * 数据值,统一使用字符串存储 + */ + private String value; + + /** + * create_time + */ + private Instant time; + + /** + * update_time + */ + private Instant updateTime; + + /** + * status + */ + private Integer status; + + public static class RecordValueTypes { + public static final int STRING = 0; + public static final int INTEGER = 1; + public static final int FLOAT = 2; + public static final int BOOLEAN = 3; + } +} diff --git a/rition-center/service/collector/src/main/java/rition/service/collector/dao/mapper/MetricRecordMapper.java b/rition-center/service/collector/src/main/java/rition/service/collector/dao/mapper/MetricRecordMapper.java new file mode 100644 index 0000000..06e01f3 --- /dev/null +++ b/rition-center/service/collector/src/main/java/rition/service/collector/dao/mapper/MetricRecordMapper.java @@ -0,0 +1,12 @@ +package rition.service.collector.dao.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Mapper; +import rition.service.collector.dao.entity.MetricRecordEntity; + +import java.util.List; + +@Mapper +public interface MetricRecordMapper extends BaseMapper { + int insertBatchSomeColumn(List entityList); +} diff --git a/rition-center/service/collector/src/main/java/rition/service/collector/mq/CollectedMetricDataDeserializer.java b/rition-center/service/collector/src/main/java/rition/service/collector/mq/CollectedMetricDataDeserializer.java new file mode 100644 index 0000000..5124ded --- /dev/null +++ b/rition-center/service/collector/src/main/java/rition/service/collector/mq/CollectedMetricDataDeserializer.java @@ -0,0 +1,18 @@ +package rition.service.collector.mq; + +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.kafka.common.serialization.Deserializer; +import rition.common.data.dto.MetricDataProto; + +@Deprecated +public class CollectedMetricDataDeserializer implements Deserializer { + + @Override + public MetricDataProto.CollectedMetricData deserialize(String topic, byte[] data) { + try { + return MetricDataProto.CollectedMetricData.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("exception happen while parsing protobuf data: ", e); + } + } +} diff --git a/rition-center/service/collector/src/main/java/rition/service/collector/mq/ProtobufMessageSerializer.java b/rition-center/service/collector/src/main/java/rition/service/collector/mq/ProtobufMessageSerializer.java new file mode 100644 index 0000000..fbf2c9b --- /dev/null +++ b/rition-center/service/collector/src/main/java/rition/service/collector/mq/ProtobufMessageSerializer.java @@ -0,0 +1,13 @@ +package rition.service.collector.mq; + +import com.google.protobuf.GeneratedMessageV3; +import org.apache.kafka.common.serialization.Serializer; + +@Deprecated +public class ProtobufMessageSerializer implements Serializer { + + @Override + public byte[] serialize(String topic, GeneratedMessageV3 data) { + return data.toByteArray(); + } +} diff --git a/rition-center/service/collector/src/main/resources/application-collector.yml b/rition-center/service/collector/src/main/resources/application-collector.yml new file mode 100644 index 0000000..b9740e4 --- /dev/null +++ b/rition-center/service/collector/src/main/resources/application-collector.yml @@ -0,0 +1,19 @@ +spring: + datasource: + driver-class-name: com.mysql.cj.jdbc.Driver + url: jdbc:mysql://127.0.0.1:3306/rition?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true + username: root + password: Test2333! + kafka: + bootstrap-servers: '127.0.0.1:9092' + producer: + retries: 4 + compression-type: zstd + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + acks: 1 +rition: + kafka: + data-collecting: + topic: 'ecs-metric-data-topic' + group: 'ecs-metric-data-group' \ No newline at end of file diff --git a/rition-center/service/collector/src/main/resources/mapper/MetricRecordMapper.xml b/rition-center/service/collector/src/main/resources/mapper/MetricRecordMapper.xml new file mode 100644 index 0000000..e5ae9a9 --- /dev/null +++ b/rition-center/service/collector/src/main/resources/mapper/MetricRecordMapper.xml @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/rition-center/service/pom.xml b/rition-center/service/pom.xml index 299bd35..b62e2a7 100644 --- a/rition-center/service/pom.xml +++ b/rition-center/service/pom.xml @@ -23,4 +23,11 @@ UTF-8 + + + net.rition + common + ${revision} + + \ No newline at end of file diff --git a/rition-probe/client/client.go b/rition-probe/client/client.go index 83e1417..8e907d8 100644 --- a/rition-probe/client/client.go +++ b/rition-probe/client/client.go @@ -1,12 +1,9 @@ package client -import "time" - type Config struct { CenterServer string `json:"centerServer,omitempty"` HostName string `json:"hostName,omitempty"` InstanceId string `json:"instanceId,omitempty"` - ReportInterval time.Duration `json:"reportInterval,omitempty"` } type Client struct { diff --git a/rition-probe/client/http.go b/rition-probe/client/http.go index 9c93cd9..ba9bf62 100644 --- a/rition-probe/client/http.go +++ b/rition-probe/client/http.go @@ -1,16 +1,79 @@ package client import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" "time" ) type ReportDataItem struct { Metric string `json:"metric,omitempty"` - Tags map[string]string `json:"tags,omitempty"` - Timestamp time.Time `json:"timestamp"` Value any `json:"value,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + Timestamp int64 `json:"timestamp"` +} + +type DataItem struct { + Metric string `json:"metric,omitempty"` + Value any `json:"value,omitempty"` + Timestamp int64 `json:"timestamp"` } -func (c *Client) Report() { - +type ReportResponse struct { + RequestId string `json:"request_id,omitempty"` + Message string `json:"message,omitempty"` +} + +func (c *Client) Report(dataItem []DataItem) (*ReportResponse, error) { + tags := map[string]string{ + "instanceId": c.config.InstanceId, + "hostname": c.config.HostName, + } + + // convert data + reportDataItems := make([]ReportDataItem, 0, len(dataItem)) + for _, item := range dataItem { + reportDataItems = append(reportDataItems, ReportDataItem{ + Metric: item.Metric, + Value: item.Value, + Tags: tags, + Timestamp: time.Now().UnixMilli(), + }) + } + + reportJsonBytes, err := json.Marshal(reportDataItems) + if err != nil { + return nil, err + } + + fmt.Println(string(reportJsonBytes)) + + // send report data + resp, err := http.Post(c.config.CenterServer, "application/json", bytes.NewBuffer(reportJsonBytes)) + if err != nil { + return nil, err + } + + // get response + defer func(Body io.ReadCloser) { + _ = Body.Close() + }(resp.Body) + + respBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + fmt.Println(string(respBytes)) + + response := ReportResponse{} + err = json.Unmarshal(respBytes, &response) + if err != nil { + return nil, err + } + + return &response, nil } diff --git a/rition-probe/main.go b/rition-probe/main.go index 7905807..cea7119 100644 --- a/rition-probe/main.go +++ b/rition-probe/main.go @@ -1,5 +1,37 @@ package main +import ( + "flag" + "os" + "rition-probe/client" + "rition-probe/service" + "time" +) + func main() { + // --report '127.0.0.1:22019' --disk_dev nvme0n1 --mount / --net_dev wlp0s20f3 --interval 30 + centerServer := flag.String("report", "", "上报中心") + diskDevice := flag.String("disk_dev", "", "监控的硬盘设备") + mount := flag.String("mount", "/", "监控的分区挂载位置") + netInterface := flag.String("net_dev", "", "监控的网卡设备") + interval := flag.Int("interval", 30, "上报间隔,单位为秒") + flag.Parse() + + hostname, _ := os.Hostname() + + config := service.Config{ + ClientConfig: client.Config{ + CenterServer: *centerServer, + HostName: hostname, + InstanceId: "5c4928f8-f27a-4ac9-85b4-c2dd03e086ad", + }, + MonitorDiskDevice: *diskDevice, + MonitorDiskMountPoint: *mount, + MonitorNetworkInterface: *netInterface, + ReportInterval: time.Duration(*interval) * time.Second, + } + srv := service.NewService(config) + //srv.Run() + srv.Report() } diff --git a/rition-probe/probe/cpu.go b/rition-probe/probe/cpu.go index 865b8c7..d8f0411 100644 --- a/rition-probe/probe/cpu.go +++ b/rition-probe/probe/cpu.go @@ -1,6 +1,8 @@ package probe -import "github.com/shirou/gopsutil/v3/cpu" +import ( + "github.com/shirou/gopsutil/v3/cpu" +) func (p *Probe) CurrentCPUTotalPercent() float64 { cpuPercents, _ := cpu.Percent(0, false) diff --git a/rition-probe/probe/cpu_test.go b/rition-probe/probe/cpu_test.go new file mode 100644 index 0000000..69e1560 --- /dev/null +++ b/rition-probe/probe/cpu_test.go @@ -0,0 +1,10 @@ +package probe + +import ( + "fmt" + "testing" +) + +func TestProbe_CurrentCPUTotalPercent(t *testing.T) { + fmt.Println(probeTest.CurrentCPUTotalPercent()) +} diff --git a/rition-probe/probe/host.go b/rition-probe/probe/host.go index af59ab3..f35c233 100644 --- a/rition-probe/probe/host.go +++ b/rition-probe/probe/host.go @@ -4,7 +4,7 @@ import ( "github.com/shirou/gopsutil/v3/host" ) -func (p *Probe) HostUpTime() uint64 { +func (p *Probe) HostUptime() uint64 { upTime, _ := host.Uptime() return upTime } diff --git a/rition-probe/probe/network.go b/rition-probe/probe/network.go index b391b15..332f751 100644 --- a/rition-probe/probe/network.go +++ b/rition-probe/probe/network.go @@ -9,13 +9,14 @@ func (p *Probe) GetNetworkCounterAll() []net.IOCountersStat { return stats } -func (p *Probe) GetNetworkCounterOne(name string) net.IOCountersStat { - stats, _ := net.IOCounters(true) - for _, stat := range stats { +func (p *Probe) GetNetworkCounterOne(name string) (net.IOCountersStat, []net.ConnectionStat) { + ioCounters, _ := net.IOCounters(true) + tcpConnStat, _ := net.Connections("tcp") + for _, stat := range ioCounters { if stat.Name == name { - return stat + return stat, tcpConnStat } } - return net.IOCountersStat{} + return net.IOCountersStat{}, tcpConnStat } diff --git a/rition-probe/probe/network_test.go b/rition-probe/probe/network_test.go index a0f0190..cb5d818 100644 --- a/rition-probe/probe/network_test.go +++ b/rition-probe/probe/network_test.go @@ -10,4 +10,10 @@ func TestProbe_GetNetworkCounter(t *testing.T) { for _, counter := range counters { fmt.Println(counter) } + + networkIoCount, connStat := probeTest.GetNetworkCounterOne("wlp0s20f3") + fmt.Printf("%+v", networkIoCount) + fmt.Printf("--------------------------") + fmt.Printf("%+v", connStat) + println() } diff --git a/rition-probe/service/probe.go b/rition-probe/service/probe.go new file mode 100644 index 0000000..c0ed4d5 --- /dev/null +++ b/rition-probe/service/probe.go @@ -0,0 +1,106 @@ +package service + +import ( + "fmt" + "rition-probe/client" + "time" +) + +func (s *Service) Run() { + ticker := time.NewTicker(s.config.ReportInterval) + defer ticker.Stop() + + for range ticker.C { + s.Report() + } +} + +func (s *Service) Report() { + dataItems := make([]client.DataItem, 0) + nowTime := time.Now() + now := nowTime.UnixMilli() + + // cpu + _5minCPULoad := s.prevCpuLoadData + if nowTime.Sub(s.prevCpuLoadDataCollectTime).Minutes() >= 5 { + _5minCPULoad = s.probe.CurrentCPUTotalPercent() + } + dataItems = append(dataItems, client.DataItem{ + Metric: "node_load5", + Value: _5minCPULoad, + Timestamp: now, + }) + + sysUptime := s.probe.HostUptime() + dataItems = append(dataItems, client.DataItem{ + Metric: "node_cpu_seconds_total", + Value: sysUptime, + Timestamp: now, + }) + + // mem + memTotal, memUsed, memBuffers, memCached := s.probe.RamUsage() + dataItems = append(dataItems, client.DataItem{ + Metric: "node_memory_MemTotal_bytes", Value: memTotal, Timestamp: now, + }, client.DataItem{ + Metric: "node_memory_MemFree_bytes", Value: memTotal - memUsed, Timestamp: now, + }, client.DataItem{ + Metric: "node_memory_Buffers_bytes", Value: memBuffers, Timestamp: now, + }, client.DataItem{ + Metric: "node_memory_Cached_bytes", Value: memCached, Timestamp: now, + }) + + // disk + usageStat, diskIoCount := s.probe.DiskStatus(s.config.MonitorDiskMountPoint, s.config.MonitorDiskDevice) + dataItems = append(dataItems, client.DataItem{ + Metric: "node_filesystem_avail_bytes", Value: usageStat.Free, Timestamp: now, + }, client.DataItem{ + Metric: "node_filesystem_size_bytes", Value: usageStat.Total, Timestamp: now, + }, client.DataItem{ + Metric: "node_filesystem_free_bytes", Value: usageStat.Free, Timestamp: now, + }, client.DataItem{ + Metric: "node_disk_read_bytes_total", Value: diskIoCount.ReadBytes, Timestamp: now, + }, client.DataItem{ + Metric: "node_disk_written_bytes_total", Value: diskIoCount.WriteBytes, Timestamp: now, + }, client.DataItem{ + Metric: "node_disk_reads_completed_total", Value: diskIoCount.MergedWriteCount, Timestamp: now, + }, client.DataItem{ + Metric: "node_disk_writes_completed_total", Value: diskIoCount.MergedWriteCount, Timestamp: now, + }) + + // network + networkIoCount, conns := s.probe.GetNetworkCounterOne(s.config.MonitorNetworkInterface) + dataItems = append(dataItems, client.DataItem{ + Metric: "node_network_receive_bytes_total", Value: networkIoCount.BytesRecv, Timestamp: now, + }, client.DataItem{ + Metric: "node_network_transmit_bytes_total", Value: networkIoCount.BytesSent, Timestamp: now, + }, client.DataItem{ + Metric: "node_network_receive_packets_total", Value: networkIoCount.PacketsRecv, Timestamp: now, + }, client.DataItem{ + Metric: "node_network_transmit_packets_total", Value: networkIoCount.PacketsSent, Timestamp: now, + }, client.DataItem{ + Metric: "node_network_receive_drop_total", Value: networkIoCount.Dropin, Timestamp: now, + }, client.DataItem{ + Metric: "node_network_transmit_drop_total", Value: networkIoCount.Dropout, Timestamp: now, + }) + + estabCnt, twCnt := 0, 0 + for _, conn := range conns { + switch conn.Status { + case "ESTABLISHED": + estabCnt++ + case "TIME_WAIT": + twCnt++ + } + } + dataItems = append(dataItems, client.DataItem{ + Metric: "node_netstat_Tcp_CurrEstab", Value: estabCnt, Timestamp: now, + }, client.DataItem{ + Metric: "node_netstat_Tcp_tw", Value: twCnt, Timestamp: now, + }) + + _, err := s.client.Report(dataItems) + if err != nil { + fmt.Println("err: ", err.Error()) + } +} diff --git a/rition-probe/service/service.go b/rition-probe/service/service.go index 418ffc2..5239ced 100644 --- a/rition-probe/service/service.go +++ b/rition-probe/service/service.go @@ -1,22 +1,38 @@ package service -import "time" +import ( + "rition-probe/client" + "rition-probe/probe" + "time" +) type Config struct { - CenterServer string `json:"centerServer,omitempty"` - HostName string `json:"hostName,omitempty"` - InstanceId string `json:"instanceId,omitempty"` + ClientConfig client.Config + MonitorDiskDevice string + MonitorDiskMountPoint string + MonitorNetworkInterface string + ReportInterval time.Duration `json:"reportInterval,omitempty"` } type Service struct { config Config + probe *probe.Probe + client *client.Client + + prevCpuLoadDataCollectTime time.Time + prevCpuLoadData float64 } func NewService(config Config) *Service { service := Service{ config: config, + probe: probe.NewProbe(), + client: client.NetClient(config.ClientConfig), } + service.prevCpuLoadData = service.probe.CurrentCPUTotalPercent() + service.prevCpuLoadDataCollectTime = time.Now() + return &service }