ref: collector部分根据文档要求修改调整数据处理逻辑

ref: 移除kafka的protobuf部分(目前尚不考虑使用)
ref: collector部分配置项增加kafka的linger批量提交配置
todo: 数据收集部分改写为异步接口模式;优化大批量数据入库db时的性能
main
lensfrex 4 months ago
parent 045f41cafe
commit 72aafb1c79
Signed by: lensfrex
GPG Key ID: B1E395B3C6CA0356
  1. 5
      rition-center/api/pom.xml
  2. 2
      rition-center/api/src/main/java/rition/backend/api/interceptor/ResponseIdInterceptor.java
  3. 4
      rition-center/api/src/main/java/rition/backend/api/resolver/RequestIdArgumentResolver.java
  4. 35
      rition-center/api/src/main/java/rition/backend/api/v1/DataCollectingController.java
  5. 5
      rition-center/api/src/main/java/rition/backend/service/MetricDataCollectingService.java
  6. 3
      rition-center/api/src/main/resources/application.yml
  7. 30
      rition-center/common/pom.xml
  8. 4
      rition-center/pom.xml
  9. 51
      rition-center/service/collector/src/main/java/rition/service/collector/MetricCollectingService.java
  10. 18
      rition-center/service/collector/src/main/java/rition/service/collector/mq/CollectedMetricDataDeserializer.java
  11. 13
      rition-center/service/collector/src/main/java/rition/service/collector/mq/ProtobufMessageSerializer.java
  12. 6
      rition-center/service/collector/src/main/resources/application-collector.yml
  13. 4
      rition-probe/main.go

@ -43,5 +43,10 @@
<artifactId>notify</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
</project>

@ -41,7 +41,7 @@ public class ResponseIdInterceptor implements HandlerInterceptor {
if (StringUtils.isEmpty(existsRequestId)) {
var requestId = UUID.randomUUID().toString();
response.setHeader("X-Request-ID", requestId);
request.setAttribute("X-Request-ID", requestId);
request.setAttribute("requestId", requestId);
}
}

@ -37,7 +37,7 @@ public class RequestIdArgumentResolver implements HandlerMethodArgumentResolver
// 如果Attribute上下文和ResponseHeader都有RequestId,则现在生成一个新的
// 相应的方法已经使用@WithRequestIdResponse注解在拦截器中设置,
// 则其实可以断言Attribute上下文和ResponseHeader都有RequestId
String requestId = (String) request.getAttribute("X-Request-ID");
String requestId = (String) request.getAttribute("requestId");
if (requestId == null) {
requestId = response.getHeader("X-Request-ID");
if (requestId == null) {
@ -45,7 +45,7 @@ public class RequestIdArgumentResolver implements HandlerMethodArgumentResolver
response.setHeader("X-Request-ID", requestId);
}
request.setAttribute("X-Request-ID", requestId);
request.setAttribute("requestId", requestId);
return requestId;
} else {
if (response.getHeader("X-Request-ID") == null) {

@ -11,7 +11,9 @@ import rition.backend.service.MetricDataCollectingService;
import rition.common.data.dto.MetricDataDto;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/metric")
@ -26,20 +28,25 @@ public class DataCollectingController {
@PostMapping("/put")
@WithRequestIdResponse
public Response<Object> putData(@RequestBody List<MetricDataUploadRequest> uploadDataList) {
// dto转换
MetricDataDto metricDataDto = new MetricDataDto();
List<MetricDataDto.MetricData> dataList = uploadDataList.stream()
.map(data -> MetricDataDto.MetricData.builder()
.metric(data.getMetric())
.value(data.getValue())
.timestamp(data.getTimestamp())
.tags(data.getTags())
.build())
.toList();
metricDataDto.setDataList(dataList);
metricDataCollectingService.receiveData(metricDataDto);
// dto转换,按时间组装数据
Map<Long, MetricDataDto> metricDataTimestampMap = new HashMap<>(uploadDataList.size() / 20);
for (MetricDataUploadRequest uploadData : uploadDataList) {
MetricDataDto dataList = metricDataTimestampMap.get(uploadData.getTimestamp());
if (dataList == null) {
dataList = MetricDataDto.builder().dataList(new ArrayList<>(20)).build();
metricDataTimestampMap.put(uploadData.getTimestamp(), dataList);
}
dataList.getDataList().add(MetricDataDto.MetricData.builder()
.metric(uploadData.getMetric())
.value(uploadData.getValue())
.timestamp(uploadData.getTimestamp())
.tags(uploadData.getTags())
.build()
);
}
metricDataCollectingService.receiveData(metricDataTimestampMap);
return Response.success();
}

@ -2,9 +2,10 @@ package rition.backend.service;
import org.springframework.stereotype.Service;
import rition.common.data.dto.MetricDataDto;
import rition.common.data.dto.MetricDataProto;
import rition.service.collector.MetricCollectingService;
import java.util.Map;
/**
* MetricCollectorService 指标数据收集类负责收集来自云主机探针上报的数据并将其入库等
* 这里只是个代理类实际的实现在collector模块
@ -24,7 +25,7 @@ public class MetricDataCollectingService {
* 接收处理好的监控指标数据
* @param collectedMetricData 监控指标数据protobuf对象
*/
public void receiveData(MetricDataDto collectedMetricData) {
public void receiveData(Map<Long, MetricDataDto> collectedMetricData) {
metricCollectingService.receiveData(collectedMetricData);
}
}

@ -1,6 +1,7 @@
server:
port: 22019
servlet:
context-path: /api
spring:
profiles:
include: collector

@ -20,39 +20,15 @@
</properties>
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<extensions>true</extensions>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>test-compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -53,6 +53,10 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>

@ -11,7 +11,7 @@ import rition.service.collector.dao.entity.MetricRecordEntity;
import rition.service.collector.dao.mapper.MetricRecordMapper;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Map;
/**
* DataCollectorService 指标数据收集类负责收集来自云主机探针上报的数据并将其入库等
@ -21,7 +21,6 @@ import java.util.ArrayList;
public class MetricCollectingService {
private final KafkaTemplate<String, MetricDataDto> kafkaTemplate;
private final KafkaConfigure kafkaConfigure;
private final String collectedDataTopic;
private final MetricRecordMapper metricRecordMapper;
@ -29,55 +28,27 @@ public class MetricCollectingService {
public MetricCollectingService(KafkaTemplate<String, MetricDataDto> kafkaTemplate,
KafkaConfigure kafkaConfigure, MetricRecordMapper metricRecordMapper) {
this.kafkaTemplate = kafkaTemplate;
this.kafkaConfigure = kafkaConfigure;
this.collectedDataTopic = kafkaConfigure.getDataCollecting().getTopic();
this.metricRecordMapper = metricRecordMapper;
}
private static final int MAX_BATCH_INSERT_SIZE = 512;
/**
* 接收处理好的监控指标数据
* 接收处理好的监控指标数据以时间戳为分组入库和发送mq
*
* @param collectedMetricData 监控指标数据protobuf对象
* @param collectedMetricData 监控指标数据以时间戳为组
*/
public void receiveData(MetricDataDto collectedMetricData) {
// 对大于MAX_BATCH_INSERT_SIZE大小的数据进行分组的批量插入
var metricDataList = collectedMetricData.getDataList();
if (metricDataList.size() <= MAX_BATCH_INSERT_SIZE) {
var metricRecordEntityList = new ArrayList<MetricRecordEntity>(metricDataList.size());
for (MetricDataDto.MetricData metricData : metricDataList) {
MetricRecordEntity entity = this.convert(metricData);
metricRecordEntityList.add(entity);
}
public void receiveData(Map<Long, MetricDataDto> collectedMetricData) {
for (Long timestamp : collectedMetricData.keySet()) {
var metricDataDto = collectedMetricData.get(timestamp);
var metricDataList = metricDataDto.getDataList();
var metricRecordEntityList = metricDataList.stream().map(this::convert).toList();
kafkaTemplate.send(this.collectedDataTopic, collectedMetricData);
kafkaTemplate.send(this.collectedDataTopic, metricDataDto);
metricRecordMapper.insertBatchSomeColumn(metricRecordEntityList);
} else {
var batch = metricDataList.size() / MAX_BATCH_INSERT_SIZE;
if (metricDataList.size() % MAX_BATCH_INSERT_SIZE != 0) {
batch++;
}
var metricRecordEntityList = new ArrayList<MetricRecordEntity>(MAX_BATCH_INSERT_SIZE);
for (int i = 0; i < batch; i++) {
metricRecordEntityList.clear();
var from = i * MAX_BATCH_INSERT_SIZE;
var to = (i + 1) * MAX_BATCH_INSERT_SIZE;
var subList = metricDataList.subList(from, Math.min(metricDataList.size(), to));
for (MetricDataDto.MetricData metricData : subList) {
MetricRecordEntity entity = this.convert(metricData);
metricRecordEntityList.add(entity);
}
kafkaTemplate.send(this.collectedDataTopic, MetricDataDto.builder().dataList(subList).build());
metricRecordMapper.insertBatchSomeColumn(metricRecordEntityList);
}
}
}
public MetricRecordEntity convert(MetricDataDto.MetricData metricData) {
private MetricRecordEntity convert(MetricDataDto.MetricData metricData) {
MetricRecordEntity entity = new MetricRecordEntity();
entity.setId(YitIdHelper.nextId());
entity.setInstanceId(metricData.getTags().get("instanceId"));
@ -85,7 +56,7 @@ public class MetricCollectingService {
entity.setValue(metricData.getValue());
entity.setStatus(CommonEntityStatus.STATUS_NORMAL.getValue());
var time = Instant.ofEpochMilli(metricData.getTimestamp());
var time = Instant.ofEpochSecond(metricData.getTimestamp());
entity.setTime(time);
entity.setUpdateTime(time);

@ -1,18 +0,0 @@
package rition.service.collector.mq;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.kafka.common.serialization.Deserializer;
import rition.common.data.dto.MetricDataProto;
@Deprecated
public class CollectedMetricDataDeserializer implements Deserializer<MetricDataProto.CollectedMetricData> {
@Override
public MetricDataProto.CollectedMetricData deserialize(String topic, byte[] data) {
try {
return MetricDataProto.CollectedMetricData.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("exception happen while parsing protobuf data: ", e);
}
}
}

@ -1,13 +0,0 @@
package rition.service.collector.mq;
import com.google.protobuf.GeneratedMessageV3;
import org.apache.kafka.common.serialization.Serializer;
@Deprecated
public class ProtobufMessageSerializer implements Serializer<GeneratedMessageV3> {
@Override
public byte[] serialize(String topic, GeneratedMessageV3 data) {
return data.toByteArray();
}
}

@ -1,7 +1,7 @@
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/rition?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
url: jdbc:mysql://127.0.0.1:3306/rition?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
username: root
password: Test2333!
kafka:
@ -12,6 +12,10 @@ spring:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: 1
properties:
linger:
ms: 200
rition:
kafka:
data-collecting:

@ -32,6 +32,6 @@ func main() {
}
srv := service.NewService(config)
//srv.Run()
srv.Report()
srv.Run()
//srv.Report()
}

Loading…
Cancel
Save