diff --git a/rition-center/api/pom.xml b/rition-center/api/pom.xml index 73f5b73..1fb24f8 100644 --- a/rition-center/api/pom.xml +++ b/rition-center/api/pom.xml @@ -43,5 +43,10 @@ notify ${revision} + + + org.springframework.boot + spring-boot-starter-data-redis + \ No newline at end of file diff --git a/rition-center/api/src/main/java/rition/backend/api/interceptor/ResponseIdInterceptor.java b/rition-center/api/src/main/java/rition/backend/api/interceptor/ResponseIdInterceptor.java index 16051ed..cf8c9f1 100644 --- a/rition-center/api/src/main/java/rition/backend/api/interceptor/ResponseIdInterceptor.java +++ b/rition-center/api/src/main/java/rition/backend/api/interceptor/ResponseIdInterceptor.java @@ -41,7 +41,7 @@ public class ResponseIdInterceptor implements HandlerInterceptor { if (StringUtils.isEmpty(existsRequestId)) { var requestId = UUID.randomUUID().toString(); response.setHeader("X-Request-ID", requestId); - request.setAttribute("X-Request-ID", requestId); + request.setAttribute("requestId", requestId); } } diff --git a/rition-center/api/src/main/java/rition/backend/api/resolver/RequestIdArgumentResolver.java b/rition-center/api/src/main/java/rition/backend/api/resolver/RequestIdArgumentResolver.java index 283ef6e..d2ca9e7 100644 --- a/rition-center/api/src/main/java/rition/backend/api/resolver/RequestIdArgumentResolver.java +++ b/rition-center/api/src/main/java/rition/backend/api/resolver/RequestIdArgumentResolver.java @@ -37,7 +37,7 @@ public class RequestIdArgumentResolver implements HandlerMethodArgumentResolver // 如果Attribute上下文和ResponseHeader都有RequestId,则现在生成一个新的 // 相应的方法已经使用@WithRequestIdResponse注解在拦截器中设置, // 则其实可以断言Attribute上下文和ResponseHeader都有RequestId - String requestId = (String) request.getAttribute("X-Request-ID"); + String requestId = (String) request.getAttribute("requestId"); if (requestId == null) { requestId = response.getHeader("X-Request-ID"); if (requestId == null) { @@ -45,7 +45,7 @@ public class RequestIdArgumentResolver implements HandlerMethodArgumentResolver response.setHeader("X-Request-ID", requestId); } - request.setAttribute("X-Request-ID", requestId); + request.setAttribute("requestId", requestId); return requestId; } else { if (response.getHeader("X-Request-ID") == null) { diff --git a/rition-center/api/src/main/java/rition/backend/api/v1/DataCollectingController.java b/rition-center/api/src/main/java/rition/backend/api/v1/DataCollectingController.java index a76362d..5698815 100644 --- a/rition-center/api/src/main/java/rition/backend/api/v1/DataCollectingController.java +++ b/rition-center/api/src/main/java/rition/backend/api/v1/DataCollectingController.java @@ -11,7 +11,9 @@ import rition.backend.service.MetricDataCollectingService; import rition.common.data.dto.MetricDataDto; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; @RestController @RequestMapping("/metric") @@ -26,20 +28,25 @@ public class DataCollectingController { @PostMapping("/put") @WithRequestIdResponse public Response putData(@RequestBody List uploadDataList) { - - // dto转换 - MetricDataDto metricDataDto = new MetricDataDto(); - List dataList = uploadDataList.stream() - .map(data -> MetricDataDto.MetricData.builder() - .metric(data.getMetric()) - .value(data.getValue()) - .timestamp(data.getTimestamp()) - .tags(data.getTags()) - .build()) - .toList(); - - metricDataDto.setDataList(dataList); - metricDataCollectingService.receiveData(metricDataDto); + // dto转换,按时间组装数据 + Map metricDataTimestampMap = new HashMap<>(uploadDataList.size() / 20); + for (MetricDataUploadRequest uploadData : uploadDataList) { + MetricDataDto dataList = metricDataTimestampMap.get(uploadData.getTimestamp()); + if (dataList == null) { + dataList = MetricDataDto.builder().dataList(new ArrayList<>(20)).build(); + metricDataTimestampMap.put(uploadData.getTimestamp(), dataList); + } + + dataList.getDataList().add(MetricDataDto.MetricData.builder() + .metric(uploadData.getMetric()) + .value(uploadData.getValue()) + .timestamp(uploadData.getTimestamp()) + .tags(uploadData.getTags()) + .build() + ); + } + + metricDataCollectingService.receiveData(metricDataTimestampMap); return Response.success(); } diff --git a/rition-center/api/src/main/java/rition/backend/service/MetricDataCollectingService.java b/rition-center/api/src/main/java/rition/backend/service/MetricDataCollectingService.java index d6e24c6..accf019 100644 --- a/rition-center/api/src/main/java/rition/backend/service/MetricDataCollectingService.java +++ b/rition-center/api/src/main/java/rition/backend/service/MetricDataCollectingService.java @@ -2,9 +2,10 @@ package rition.backend.service; import org.springframework.stereotype.Service; import rition.common.data.dto.MetricDataDto; -import rition.common.data.dto.MetricDataProto; import rition.service.collector.MetricCollectingService; +import java.util.Map; + /** * MetricCollectorService 指标数据收集类,负责收集来自云主机探针上报的数据,并将其入库等, * 这里只是个代理类,实际的实现在collector模块, @@ -24,7 +25,7 @@ public class MetricDataCollectingService { * 接收处理好的监控指标数据 * @param collectedMetricData 监控指标数据,protobuf对象 */ - public void receiveData(MetricDataDto collectedMetricData) { + public void receiveData(Map collectedMetricData) { metricCollectingService.receiveData(collectedMetricData); } } diff --git a/rition-center/api/src/main/resources/application.yml b/rition-center/api/src/main/resources/application.yml index e7e3cc2..6036c89 100644 --- a/rition-center/api/src/main/resources/application.yml +++ b/rition-center/api/src/main/resources/application.yml @@ -1,6 +1,7 @@ server: port: 22019 - + servlet: + context-path: /api spring: profiles: include: collector \ No newline at end of file diff --git a/rition-center/common/pom.xml b/rition-center/common/pom.xml index 1b0c570..9cb68e7 100644 --- a/rition-center/common/pom.xml +++ b/rition-center/common/pom.xml @@ -20,39 +20,15 @@ - - com.google.protobuf - protobuf-java - ${protobuf.version} - + - - kr.motd.maven - os-maven-plugin - 1.7.1 - + - - org.xolstice.maven.plugins - protobuf-maven-plugin - 0.6.1 - true - - com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} - - - - - compile - test-compile - - - - + \ No newline at end of file diff --git a/rition-center/pom.xml b/rition-center/pom.xml index 2de6815..914b8f1 100644 --- a/rition-center/pom.xml +++ b/rition-center/pom.xml @@ -53,6 +53,10 @@ spring-boot-starter-test test + + org.springframework.boot + spring-boot-starter-aop + com.fasterxml.jackson.core diff --git a/rition-center/service/collector/src/main/java/rition/service/collector/MetricCollectingService.java b/rition-center/service/collector/src/main/java/rition/service/collector/MetricCollectingService.java index 0f8e8d5..996f4c1 100644 --- a/rition-center/service/collector/src/main/java/rition/service/collector/MetricCollectingService.java +++ b/rition-center/service/collector/src/main/java/rition/service/collector/MetricCollectingService.java @@ -11,7 +11,7 @@ import rition.service.collector.dao.entity.MetricRecordEntity; import rition.service.collector.dao.mapper.MetricRecordMapper; import java.time.Instant; -import java.util.ArrayList; +import java.util.Map; /** * DataCollectorService 指标数据收集类,负责收集来自云主机探针上报的数据,并将其入库等 @@ -21,7 +21,6 @@ import java.util.ArrayList; public class MetricCollectingService { private final KafkaTemplate kafkaTemplate; - private final KafkaConfigure kafkaConfigure; private final String collectedDataTopic; private final MetricRecordMapper metricRecordMapper; @@ -29,55 +28,27 @@ public class MetricCollectingService { public MetricCollectingService(KafkaTemplate kafkaTemplate, KafkaConfigure kafkaConfigure, MetricRecordMapper metricRecordMapper) { this.kafkaTemplate = kafkaTemplate; - this.kafkaConfigure = kafkaConfigure; this.collectedDataTopic = kafkaConfigure.getDataCollecting().getTopic(); this.metricRecordMapper = metricRecordMapper; } - private static final int MAX_BATCH_INSERT_SIZE = 512; - /** - * 接收处理好的监控指标数据 + * 接收处理好的监控指标数据,以时间戳为分组入库和发送mq * - * @param collectedMetricData 监控指标数据,protobuf对象 + * @param collectedMetricData 监控指标数据,以时间戳为组 */ - public void receiveData(MetricDataDto collectedMetricData) { - // 对大于MAX_BATCH_INSERT_SIZE大小的数据进行分组的批量插入 - var metricDataList = collectedMetricData.getDataList(); - if (metricDataList.size() <= MAX_BATCH_INSERT_SIZE) { - var metricRecordEntityList = new ArrayList(metricDataList.size()); - for (MetricDataDto.MetricData metricData : metricDataList) { - MetricRecordEntity entity = this.convert(metricData); - - metricRecordEntityList.add(entity); - } + public void receiveData(Map collectedMetricData) { + for (Long timestamp : collectedMetricData.keySet()) { + var metricDataDto = collectedMetricData.get(timestamp); + var metricDataList = metricDataDto.getDataList(); + var metricRecordEntityList = metricDataList.stream().map(this::convert).toList(); - kafkaTemplate.send(this.collectedDataTopic, collectedMetricData); + kafkaTemplate.send(this.collectedDataTopic, metricDataDto); metricRecordMapper.insertBatchSomeColumn(metricRecordEntityList); - } else { - var batch = metricDataList.size() / MAX_BATCH_INSERT_SIZE; - if (metricDataList.size() % MAX_BATCH_INSERT_SIZE != 0) { - batch++; - } - - var metricRecordEntityList = new ArrayList(MAX_BATCH_INSERT_SIZE); - for (int i = 0; i < batch; i++) { - metricRecordEntityList.clear(); - var from = i * MAX_BATCH_INSERT_SIZE; - var to = (i + 1) * MAX_BATCH_INSERT_SIZE; - var subList = metricDataList.subList(from, Math.min(metricDataList.size(), to)); - for (MetricDataDto.MetricData metricData : subList) { - MetricRecordEntity entity = this.convert(metricData); - metricRecordEntityList.add(entity); - } - - kafkaTemplate.send(this.collectedDataTopic, MetricDataDto.builder().dataList(subList).build()); - metricRecordMapper.insertBatchSomeColumn(metricRecordEntityList); - } } } - public MetricRecordEntity convert(MetricDataDto.MetricData metricData) { + private MetricRecordEntity convert(MetricDataDto.MetricData metricData) { MetricRecordEntity entity = new MetricRecordEntity(); entity.setId(YitIdHelper.nextId()); entity.setInstanceId(metricData.getTags().get("instanceId")); @@ -85,7 +56,7 @@ public class MetricCollectingService { entity.setValue(metricData.getValue()); entity.setStatus(CommonEntityStatus.STATUS_NORMAL.getValue()); - var time = Instant.ofEpochMilli(metricData.getTimestamp()); + var time = Instant.ofEpochSecond(metricData.getTimestamp()); entity.setTime(time); entity.setUpdateTime(time); diff --git a/rition-center/service/collector/src/main/java/rition/service/collector/mq/CollectedMetricDataDeserializer.java b/rition-center/service/collector/src/main/java/rition/service/collector/mq/CollectedMetricDataDeserializer.java deleted file mode 100644 index 5124ded..0000000 --- a/rition-center/service/collector/src/main/java/rition/service/collector/mq/CollectedMetricDataDeserializer.java +++ /dev/null @@ -1,18 +0,0 @@ -package rition.service.collector.mq; - -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.kafka.common.serialization.Deserializer; -import rition.common.data.dto.MetricDataProto; - -@Deprecated -public class CollectedMetricDataDeserializer implements Deserializer { - - @Override - public MetricDataProto.CollectedMetricData deserialize(String topic, byte[] data) { - try { - return MetricDataProto.CollectedMetricData.parseFrom(data); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException("exception happen while parsing protobuf data: ", e); - } - } -} diff --git a/rition-center/service/collector/src/main/java/rition/service/collector/mq/ProtobufMessageSerializer.java b/rition-center/service/collector/src/main/java/rition/service/collector/mq/ProtobufMessageSerializer.java deleted file mode 100644 index fbf2c9b..0000000 --- a/rition-center/service/collector/src/main/java/rition/service/collector/mq/ProtobufMessageSerializer.java +++ /dev/null @@ -1,13 +0,0 @@ -package rition.service.collector.mq; - -import com.google.protobuf.GeneratedMessageV3; -import org.apache.kafka.common.serialization.Serializer; - -@Deprecated -public class ProtobufMessageSerializer implements Serializer { - - @Override - public byte[] serialize(String topic, GeneratedMessageV3 data) { - return data.toByteArray(); - } -} diff --git a/rition-center/service/collector/src/main/resources/application-collector.yml b/rition-center/service/collector/src/main/resources/application-collector.yml index b9740e4..20ff0ed 100644 --- a/rition-center/service/collector/src/main/resources/application-collector.yml +++ b/rition-center/service/collector/src/main/resources/application-collector.yml @@ -1,7 +1,7 @@ spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://127.0.0.1:3306/rition?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true + url: jdbc:mysql://127.0.0.1:3306/rition?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true username: root password: Test2333! kafka: @@ -12,6 +12,10 @@ spring: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer acks: 1 + properties: + linger: + ms: 200 + rition: kafka: data-collecting: diff --git a/rition-probe/main.go b/rition-probe/main.go index cea7119..efb5a9b 100644 --- a/rition-probe/main.go +++ b/rition-probe/main.go @@ -32,6 +32,6 @@ func main() { } srv := service.NewService(config) - //srv.Run() - srv.Report() + srv.Run() + //srv.Report() }