ref: 数据收集部分改写为异步接口模式

main
lensfrex 7 months ago
parent f58fdd14a4
commit c2a0574aa3
Signed by: lensfrex
GPG Key ID: B1E395B3C6CA0356
  1. 6
      rition-center/api/src/main/java/rition/backend/api/v1/DataCollectingController.java
  2. 26
      rition-center/api/src/main/java/rition/backend/configure/RedisTemplateConfigure.java
  3. 43
      rition-center/api/src/main/java/rition/backend/service/MetricDataCollectingService.java
  4. 10
      rition-center/api/src/main/resources/application.yml
  5. 67
      rition-center/common/src/main/java/rition/common/data/dto/log/RequestProcessTraceRecord.java
  6. 4
      rition-center/pom.xml

@ -5,6 +5,7 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import rition.backend.annotation.WithRequestIdResponse;
import rition.backend.annotation.paramter.RequestId;
import rition.backend.api.v1.dto.request.MetricDataUploadRequest;
import rition.backend.api.v1.dto.response.Response;
import rition.backend.service.MetricDataCollectingService;
@ -27,7 +28,8 @@ public class DataCollectingController {
@PostMapping("/put")
@WithRequestIdResponse
public Response<Object> putData(@RequestBody List<MetricDataUploadRequest> uploadDataList) {
public Response<Object> putData(@RequestBody List<MetricDataUploadRequest> uploadDataList,
@RequestId String requestId) {
// dto转换,按时间组装数据
Map<Long, MetricDataDto> metricDataTimestampMap = new HashMap<>(uploadDataList.size() / 20);
for (MetricDataUploadRequest uploadData : uploadDataList) {
@ -46,7 +48,7 @@ public class DataCollectingController {
);
}
metricDataCollectingService.receiveData(metricDataTimestampMap);
metricDataCollectingService.receiveData(requestId, metricDataTimestampMap);
return Response.success();
}

@ -0,0 +1,26 @@
package rition.backend.configure;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
@Configuration
public class RedisTemplateConfigure {
@Bean
public RedisTemplate redisTemplate(LettuceConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(RedisSerializer.string());
redisTemplate.setValueSerializer(RedisSerializer.json());
redisTemplate.setHashKeySerializer(RedisSerializer.string());
redisTemplate.setDefaultSerializer(RedisSerializer.json());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}

@ -1,9 +1,15 @@
package rition.backend.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import rition.common.data.dto.MetricDataDto;
import rition.common.data.dto.log.RequestProcessTraceRecord;
import rition.service.collector.MetricCollectingService;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Map;
/**
@ -12,20 +18,49 @@ import java.util.Map;
* 这样做只是为了方便日后将collector等模块独立拆分出来
* 这种情况下实际上数据收集的入口应当直接放在collector里而不是api部分
*/
@Slf4j
@Service
public class MetricDataCollectingService {
private final MetricCollectingService metricCollectingService;
private final RedisTemplate<String, Object> redisTemplate;
public MetricDataCollectingService(MetricCollectingService metricCollectingService) {
public MetricDataCollectingService(MetricCollectingService metricCollectingService,
RedisTemplate<String, Object> redisTemplate) {
this.metricCollectingService = metricCollectingService;
this.redisTemplate = redisTemplate;
}
private static final String PROCESS_TRACE_REDIS_KEY_FORMAT = "rition:request:trace:%s";
/**
* 接收处理好的监控指标数据
* 接收处理好的监控指标数据异步处理请求发送后处理状态使用requestId追踪处理结果放置于redis
*
* @param requestId 此次的请求id用于追踪处理结果
* @param collectedMetricData 监控指标数据protobuf对象
*/
public void receiveData(Map<Long, MetricDataDto> collectedMetricData) {
metricCollectingService.receiveData(collectedMetricData);
@Async
public void receiveData(String requestId, Map<Long, MetricDataDto> collectedMetricData) {
RequestProcessTraceRecord record;
try {
var start = System.currentTimeMillis();
metricCollectingService.receiveData(collectedMetricData);
var cost = System.currentTimeMillis() - start;
record = RequestProcessTraceRecord.success(requestId, "ok", cost, null);
} catch (Exception e) {
record = RequestProcessTraceRecord.builder()
.exception(e)
.requestId(requestId)
.message("process failed: " + e)
.status(RequestProcessTraceRecord.OperationStatus.FAIL)
.build();
}
try {
redisTemplate.opsForValue().set(PROCESS_TRACE_REDIS_KEY_FORMAT.formatted(requestId), record);
} catch (Exception e2) {
log.error("请求结果保存出现异常: ", e2);
log.error("请求结果:{}", record);
}
}
}

@ -1,7 +1,13 @@
server:
port: 22019
port: 8000
servlet:
context-path: /api
spring:
profiles:
include: collector
include: collector
data:
redis:
host: ${REDIS_HOST:127.0.0.1}
port: ${REDIS_PORT:6379}
password: ${REDIS_PASSWORD:Test2333!}

@ -0,0 +1,67 @@
package rition.common.data.dto.log;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.Duration;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class RequestProcessTraceRecord {
private String requestId;
private OperationStatus status;
private String message;
private Exception exception;
private long cost;
private Object extra;
public static RequestProcessTraceRecord success(String requestId, String message, Object extraData) {
return RequestProcessTraceRecord.builder()
.requestId(requestId)
.message(message)
.extra(extraData)
.status(OperationStatus.OK)
.build();
}
public static RequestProcessTraceRecord success(String requestId,
String message,
long cost,
Object extraData) {
return RequestProcessTraceRecord.builder()
.requestId(requestId)
.message(message)
.extra(extraData)
.cost(cost)
.status(OperationStatus.OK)
.build();
}
public enum OperationStatus {
/**
* 成功
*/
OK,
/**
* 失败
*/
FAIL,
/**
* 部分完成
*/
PARTIALLY_FINISHED
}
}

@ -62,6 +62,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<!-- id生成 -->
<dependency>

Loading…
Cancel
Save