parent
							
								
									143859e3eb
								
							
						
					
					
						commit
						045f41cafe
					
				@ -1,15 +1,26 @@ | 
				
			|||||||
package rition.backend; | 
					package rition.backend; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import com.github.yitter.contract.IdGeneratorOptions; | 
				
			||||||
 | 
					import com.github.yitter.idgen.YitIdHelper; | 
				
			||||||
 | 
					import org.mybatis.spring.annotation.MapperScan; | 
				
			||||||
import org.springframework.boot.SpringApplication; | 
					import org.springframework.boot.SpringApplication; | 
				
			||||||
import org.springframework.boot.autoconfigure.SpringBootApplication; | 
					import org.springframework.boot.autoconfigure.SpringBootApplication; | 
				
			||||||
import org.springframework.scheduling.annotation.EnableAsync; | 
					import org.springframework.scheduling.annotation.EnableAsync; | 
				
			||||||
import org.springframework.scheduling.annotation.EnableScheduling; | 
					import org.springframework.scheduling.annotation.EnableScheduling; | 
				
			||||||
 | 
					import org.springframework.transaction.annotation.EnableTransactionManagement; | 
				
			||||||
 | 
					
 | 
				
			||||||
@EnableAsync | 
					@EnableAsync | 
				
			||||||
@EnableScheduling | 
					@EnableScheduling | 
				
			||||||
@SpringBootApplication | 
					@SpringBootApplication(scanBasePackages = { | 
				
			||||||
 | 
					        "rition", | 
				
			||||||
 | 
					}) | 
				
			||||||
 | 
					@MapperScan("rition") | 
				
			||||||
 | 
					@EnableTransactionManagement | 
				
			||||||
public class RitionBackendMain { | 
					public class RitionBackendMain { | 
				
			||||||
    public static void main(String[] args) { | 
					    public static void main(String[] args) { | 
				
			||||||
 | 
					        IdGeneratorOptions options = new IdGeneratorOptions((short) 0); | 
				
			||||||
 | 
					        YitIdHelper.setIdGenerator(options); | 
				
			||||||
 | 
					
 | 
				
			||||||
        SpringApplication.run(RitionBackendMain.class); | 
					        SpringApplication.run(RitionBackendMain.class); | 
				
			||||||
    } | 
					    } | 
				
			||||||
} | 
					} | 
				
			||||||
 | 
				
			|||||||
@ -0,0 +1,30 @@ | 
				
			|||||||
 | 
					package rition.backend.service; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import org.springframework.stereotype.Service; | 
				
			||||||
 | 
					import rition.common.data.dto.MetricDataDto; | 
				
			||||||
 | 
					import rition.common.data.dto.MetricDataProto; | 
				
			||||||
 | 
					import rition.service.collector.MetricCollectingService; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/** | 
				
			||||||
 | 
					 * MetricCollectorService 指标数据收集类,负责收集来自云主机探针上报的数据,并将其入库等, | 
				
			||||||
 | 
					 * 这里只是个代理类,实际的实现在collector模块, | 
				
			||||||
 | 
					 * 这样做只是为了方便日后将collector等模块独立拆分出来, | 
				
			||||||
 | 
					 * 这种情况下实际上数据收集的入口应当直接放在collector里,而不是api部分 | 
				
			||||||
 | 
					 */ | 
				
			||||||
 | 
					@Service | 
				
			||||||
 | 
					public class MetricDataCollectingService { | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private final MetricCollectingService metricCollectingService; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public MetricDataCollectingService(MetricCollectingService metricCollectingService) { | 
				
			||||||
 | 
					        this.metricCollectingService = metricCollectingService; | 
				
			||||||
 | 
					    } | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /** | 
				
			||||||
 | 
					     * 接收处理好的监控指标数据 | 
				
			||||||
 | 
					     * @param collectedMetricData 监控指标数据,protobuf对象 | 
				
			||||||
 | 
					     */ | 
				
			||||||
 | 
					    public void receiveData(MetricDataDto collectedMetricData) { | 
				
			||||||
 | 
					        metricCollectingService.receiveData(collectedMetricData); | 
				
			||||||
 | 
					    } | 
				
			||||||
 | 
					} | 
				
			||||||
@ -1,2 +1,6 @@ | 
				
			|||||||
server: | 
					server: | 
				
			||||||
  port: 22019 | 
					  port: 22019 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					spring: | 
				
			||||||
 | 
					  profiles: | 
				
			||||||
 | 
					    include: collector | 
				
			||||||
@ -0,0 +1,48 @@ | 
				
			|||||||
 | 
					
 | 
				
			||||||
 | 
					package rition.common.data.dto; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import lombok.AllArgsConstructor; | 
				
			||||||
 | 
					import lombok.Builder; | 
				
			||||||
 | 
					import lombok.Data; | 
				
			||||||
 | 
					import lombok.NoArgsConstructor; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.util.List; | 
				
			||||||
 | 
					import java.util.Map; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@Data | 
				
			||||||
 | 
					@Builder | 
				
			||||||
 | 
					@NoArgsConstructor | 
				
			||||||
 | 
					@AllArgsConstructor | 
				
			||||||
 | 
					public class MetricDataDto { | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private List<MetricData> dataList; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /** | 
				
			||||||
 | 
					     * 指标数据 | 
				
			||||||
 | 
					     */ | 
				
			||||||
 | 
					    @Data | 
				
			||||||
 | 
					    @Builder | 
				
			||||||
 | 
					    @NoArgsConstructor | 
				
			||||||
 | 
					    @AllArgsConstructor | 
				
			||||||
 | 
					    public static class MetricData { | 
				
			||||||
 | 
					        /** | 
				
			||||||
 | 
					         * 指标名 | 
				
			||||||
 | 
					         */ | 
				
			||||||
 | 
					        private String metric; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        /** | 
				
			||||||
 | 
					         * 数据标签 | 
				
			||||||
 | 
					         */ | 
				
			||||||
 | 
					        private Map<String, String> tags; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        /** | 
				
			||||||
 | 
					         * 时间戳 | 
				
			||||||
 | 
					         */ | 
				
			||||||
 | 
					        private Long timestamp; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        /** | 
				
			||||||
 | 
					         * 值 | 
				
			||||||
 | 
					         */ | 
				
			||||||
 | 
					        private String value; | 
				
			||||||
 | 
					    } | 
				
			||||||
 | 
					} | 
				
			||||||
@ -0,0 +1,17 @@ | 
				
			|||||||
 | 
					package rition.common.data.enums; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import lombok.Getter; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@Getter | 
				
			||||||
 | 
					public enum CommonEntityStatus { | 
				
			||||||
 | 
					    STATUS_NORMAL(0), | 
				
			||||||
 | 
					    STATUS_DELETED(1), | 
				
			||||||
 | 
					    STATUS_HIDDEN(2), | 
				
			||||||
 | 
					    ; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private final int value; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    CommonEntityStatus(int value) { | 
				
			||||||
 | 
					        this.value = value; | 
				
			||||||
 | 
					    } | 
				
			||||||
 | 
					} | 
				
			||||||
@ -0,0 +1,7 @@ | 
				
			|||||||
 | 
					package rition.common.data.enums; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.time.ZoneId; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					public class Constants { | 
				
			||||||
 | 
					    public static final ZoneId DefaultTimeZone = ZoneId.of("Asia/Shanghai"); | 
				
			||||||
 | 
					} | 
				
			||||||
@ -0,0 +1,17 @@ | 
				
			|||||||
 | 
					syntax = "proto3"; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package rition.proto.collecting.v1; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					option java_package = "rition.common.data.dto"; | 
				
			||||||
 | 
					option java_outer_classname = "MetricDataProto"; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					message MetricData { | 
				
			||||||
 | 
					  string metric = 1; | 
				
			||||||
 | 
					  string value = 2; | 
				
			||||||
 | 
					  uint64 timestamp = 3; | 
				
			||||||
 | 
					  map<string, string> tags = 4; | 
				
			||||||
 | 
					} | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					message CollectedMetricData { | 
				
			||||||
 | 
					  repeated MetricData data = 4; | 
				
			||||||
 | 
					} | 
				
			||||||
@ -1,4 +0,0 @@ | 
				
			|||||||
package rition.service.collector; | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
public class DataCollectorService { | 
					 | 
				
			||||||
} | 
					 | 
				
			||||||
@ -0,0 +1,94 @@ | 
				
			|||||||
 | 
					package rition.service.collector; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import com.github.yitter.idgen.YitIdHelper; | 
				
			||||||
 | 
					import lombok.extern.slf4j.Slf4j; | 
				
			||||||
 | 
					import org.springframework.kafka.core.KafkaTemplate; | 
				
			||||||
 | 
					import org.springframework.stereotype.Service; | 
				
			||||||
 | 
					import rition.common.data.dto.MetricDataDto; | 
				
			||||||
 | 
					import rition.common.data.enums.CommonEntityStatus; | 
				
			||||||
 | 
					import rition.service.collector.configure.KafkaConfigure; | 
				
			||||||
 | 
					import rition.service.collector.dao.entity.MetricRecordEntity; | 
				
			||||||
 | 
					import rition.service.collector.dao.mapper.MetricRecordMapper; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.time.Instant; | 
				
			||||||
 | 
					import java.util.ArrayList; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/** | 
				
			||||||
 | 
					 * DataCollectorService 指标数据收集类,负责收集来自云主机探针上报的数据,并将其入库等 | 
				
			||||||
 | 
					 */ | 
				
			||||||
 | 
					@Slf4j | 
				
			||||||
 | 
					@Service | 
				
			||||||
 | 
					public class MetricCollectingService { | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private final KafkaTemplate<String, MetricDataDto> kafkaTemplate; | 
				
			||||||
 | 
					    private final KafkaConfigure kafkaConfigure; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private final String collectedDataTopic; | 
				
			||||||
 | 
					    private final MetricRecordMapper metricRecordMapper; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public MetricCollectingService(KafkaTemplate<String, MetricDataDto> kafkaTemplate, | 
				
			||||||
 | 
					                                   KafkaConfigure kafkaConfigure, MetricRecordMapper metricRecordMapper) { | 
				
			||||||
 | 
					        this.kafkaTemplate = kafkaTemplate; | 
				
			||||||
 | 
					        this.kafkaConfigure = kafkaConfigure; | 
				
			||||||
 | 
					        this.collectedDataTopic = kafkaConfigure.getDataCollecting().getTopic(); | 
				
			||||||
 | 
					        this.metricRecordMapper = metricRecordMapper; | 
				
			||||||
 | 
					    } | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    private static final int MAX_BATCH_INSERT_SIZE = 512; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /** | 
				
			||||||
 | 
					     * 接收处理好的监控指标数据 | 
				
			||||||
 | 
					     * | 
				
			||||||
 | 
					     * @param collectedMetricData 监控指标数据,protobuf对象 | 
				
			||||||
 | 
					     */ | 
				
			||||||
 | 
					    public void receiveData(MetricDataDto collectedMetricData) { | 
				
			||||||
 | 
					        // 对大于MAX_BATCH_INSERT_SIZE大小的数据进行分组的批量插入
 | 
				
			||||||
 | 
					        var metricDataList = collectedMetricData.getDataList(); | 
				
			||||||
 | 
					        if (metricDataList.size() <= MAX_BATCH_INSERT_SIZE) { | 
				
			||||||
 | 
					            var metricRecordEntityList = new ArrayList<MetricRecordEntity>(metricDataList.size()); | 
				
			||||||
 | 
					            for (MetricDataDto.MetricData metricData : metricDataList) { | 
				
			||||||
 | 
					                MetricRecordEntity entity = this.convert(metricData); | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                metricRecordEntityList.add(entity); | 
				
			||||||
 | 
					            } | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            kafkaTemplate.send(this.collectedDataTopic, collectedMetricData); | 
				
			||||||
 | 
					            metricRecordMapper.insertBatchSomeColumn(metricRecordEntityList); | 
				
			||||||
 | 
					        } else { | 
				
			||||||
 | 
					            var batch = metricDataList.size() / MAX_BATCH_INSERT_SIZE; | 
				
			||||||
 | 
					            if (metricDataList.size() % MAX_BATCH_INSERT_SIZE != 0) { | 
				
			||||||
 | 
					                batch++; | 
				
			||||||
 | 
					            } | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            var metricRecordEntityList = new ArrayList<MetricRecordEntity>(MAX_BATCH_INSERT_SIZE); | 
				
			||||||
 | 
					            for (int i = 0; i < batch; i++) { | 
				
			||||||
 | 
					                metricRecordEntityList.clear(); | 
				
			||||||
 | 
					                var from = i * MAX_BATCH_INSERT_SIZE; | 
				
			||||||
 | 
					                var to = (i + 1) * MAX_BATCH_INSERT_SIZE; | 
				
			||||||
 | 
					                var subList = metricDataList.subList(from, Math.min(metricDataList.size(), to)); | 
				
			||||||
 | 
					                for (MetricDataDto.MetricData metricData : subList) { | 
				
			||||||
 | 
					                    MetricRecordEntity entity = this.convert(metricData); | 
				
			||||||
 | 
					                    metricRecordEntityList.add(entity); | 
				
			||||||
 | 
					                } | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                kafkaTemplate.send(this.collectedDataTopic, MetricDataDto.builder().dataList(subList).build()); | 
				
			||||||
 | 
					                metricRecordMapper.insertBatchSomeColumn(metricRecordEntityList); | 
				
			||||||
 | 
					            } | 
				
			||||||
 | 
					        } | 
				
			||||||
 | 
					    } | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public MetricRecordEntity convert(MetricDataDto.MetricData metricData) { | 
				
			||||||
 | 
					        MetricRecordEntity entity = new MetricRecordEntity(); | 
				
			||||||
 | 
					        entity.setId(YitIdHelper.nextId()); | 
				
			||||||
 | 
					        entity.setInstanceId(metricData.getTags().get("instanceId")); | 
				
			||||||
 | 
					        entity.setItem(metricData.getMetric()); | 
				
			||||||
 | 
					        entity.setValue(metricData.getValue()); | 
				
			||||||
 | 
					        entity.setStatus(CommonEntityStatus.STATUS_NORMAL.getValue()); | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        var time = Instant.ofEpochMilli(metricData.getTimestamp()); | 
				
			||||||
 | 
					        entity.setTime(time); | 
				
			||||||
 | 
					        entity.setUpdateTime(time); | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return entity; | 
				
			||||||
 | 
					    } | 
				
			||||||
 | 
					} | 
				
			||||||
@ -0,0 +1,20 @@ | 
				
			|||||||
 | 
					package rition.service.collector.configure; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import lombok.Data; | 
				
			||||||
 | 
					import org.springframework.boot.context.properties.ConfigurationProperties; | 
				
			||||||
 | 
					import org.springframework.context.annotation.Configuration; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@Data | 
				
			||||||
 | 
					@Configuration | 
				
			||||||
 | 
					@ConfigurationProperties("rition.kafka") | 
				
			||||||
 | 
					public class KafkaConfigure { | 
				
			||||||
 | 
					    private DataCollectingKafkaConfig dataCollecting; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Data | 
				
			||||||
 | 
					    public static class DataCollectingKafkaConfig { | 
				
			||||||
 | 
					        private String topic; | 
				
			||||||
 | 
					        private String group; | 
				
			||||||
 | 
					    } | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					} | 
				
			||||||
@ -0,0 +1,8 @@ | 
				
			|||||||
 | 
					package rition.service.collector.configure; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import org.springframework.context.annotation.Configuration; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@Configuration | 
				
			||||||
 | 
					public class MybatisPlusConfigure { | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					} | 
				
			||||||
@ -0,0 +1,21 @@ | 
				
			|||||||
 | 
					package rition.service.collector.dao; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import com.baomidou.mybatisplus.core.injector.AbstractMethod; | 
				
			||||||
 | 
					import com.baomidou.mybatisplus.core.injector.DefaultSqlInjector; | 
				
			||||||
 | 
					import com.baomidou.mybatisplus.core.metadata.TableInfo; | 
				
			||||||
 | 
					import com.baomidou.mybatisplus.extension.injector.methods.InsertBatchSomeColumn; | 
				
			||||||
 | 
					import org.apache.ibatis.session.Configuration; | 
				
			||||||
 | 
					import org.springframework.stereotype.Component; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.util.List; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@Component | 
				
			||||||
 | 
					public class BatchSQLInjector extends DefaultSqlInjector { | 
				
			||||||
 | 
					    @Override | 
				
			||||||
 | 
					    public List<AbstractMethod> getMethodList(Configuration configuration, Class<?> mapperClass, TableInfo tableInfo) { | 
				
			||||||
 | 
					        List<AbstractMethod> methodList = super.getMethodList(configuration, mapperClass, tableInfo); | 
				
			||||||
 | 
					        methodList.add(new InsertBatchSomeColumn()); | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return methodList; | 
				
			||||||
 | 
					    } | 
				
			||||||
 | 
					} | 
				
			||||||
@ -0,0 +1,59 @@ | 
				
			|||||||
 | 
					package rition.service.collector.dao.entity; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import com.baomidou.mybatisplus.annotation.TableName; | 
				
			||||||
 | 
					import lombok.AllArgsConstructor; | 
				
			||||||
 | 
					import lombok.Builder; | 
				
			||||||
 | 
					import lombok.Data; | 
				
			||||||
 | 
					import lombok.NoArgsConstructor; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.time.Instant; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@Data | 
				
			||||||
 | 
					@Builder | 
				
			||||||
 | 
					@AllArgsConstructor | 
				
			||||||
 | 
					@NoArgsConstructor | 
				
			||||||
 | 
					@TableName("rition_record") | 
				
			||||||
 | 
					public class MetricRecordEntity { | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /** | 
				
			||||||
 | 
					     * 数据记录id | 
				
			||||||
 | 
					     */ | 
				
			||||||
 | 
					    private Long id; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /** | 
				
			||||||
 | 
					     * 云主机实例id | 
				
			||||||
 | 
					     */ | 
				
			||||||
 | 
					    private String instanceId; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /** | 
				
			||||||
 | 
					     * 数据项 | 
				
			||||||
 | 
					     */ | 
				
			||||||
 | 
					    private String item; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /** | 
				
			||||||
 | 
					     * 数据值,统一使用字符串存储 | 
				
			||||||
 | 
					     */ | 
				
			||||||
 | 
					    private String value; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /** | 
				
			||||||
 | 
					     * create_time | 
				
			||||||
 | 
					     */ | 
				
			||||||
 | 
					    private Instant time; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /** | 
				
			||||||
 | 
					     * update_time | 
				
			||||||
 | 
					     */ | 
				
			||||||
 | 
					    private Instant updateTime; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /** | 
				
			||||||
 | 
					     * status | 
				
			||||||
 | 
					     */ | 
				
			||||||
 | 
					    private Integer status; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public static class RecordValueTypes { | 
				
			||||||
 | 
					        public static final int STRING = 0; | 
				
			||||||
 | 
					        public static final int INTEGER = 1; | 
				
			||||||
 | 
					        public static final int FLOAT = 2; | 
				
			||||||
 | 
					        public static final int BOOLEAN = 3; | 
				
			||||||
 | 
					    } | 
				
			||||||
 | 
					} | 
				
			||||||
@ -0,0 +1,12 @@ | 
				
			|||||||
 | 
					package rition.service.collector.dao.mapper; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import com.baomidou.mybatisplus.core.mapper.BaseMapper; | 
				
			||||||
 | 
					import org.apache.ibatis.annotations.Mapper; | 
				
			||||||
 | 
					import rition.service.collector.dao.entity.MetricRecordEntity; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.util.List; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@Mapper | 
				
			||||||
 | 
					public interface MetricRecordMapper extends BaseMapper<MetricRecordEntity> { | 
				
			||||||
 | 
					    int insertBatchSomeColumn(List<MetricRecordEntity> entityList); | 
				
			||||||
 | 
					} | 
				
			||||||
@ -0,0 +1,18 @@ | 
				
			|||||||
 | 
					package rition.service.collector.mq; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import com.google.protobuf.InvalidProtocolBufferException; | 
				
			||||||
 | 
					import org.apache.kafka.common.serialization.Deserializer; | 
				
			||||||
 | 
					import rition.common.data.dto.MetricDataProto; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@Deprecated | 
				
			||||||
 | 
					public class CollectedMetricDataDeserializer implements Deserializer<MetricDataProto.CollectedMetricData> { | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override | 
				
			||||||
 | 
					    public MetricDataProto.CollectedMetricData deserialize(String topic, byte[] data) { | 
				
			||||||
 | 
					        try { | 
				
			||||||
 | 
					            return MetricDataProto.CollectedMetricData.parseFrom(data); | 
				
			||||||
 | 
					        } catch (InvalidProtocolBufferException e) { | 
				
			||||||
 | 
					            throw new RuntimeException("exception happen while parsing protobuf data: ", e); | 
				
			||||||
 | 
					        } | 
				
			||||||
 | 
					    } | 
				
			||||||
 | 
					} | 
				
			||||||
@ -0,0 +1,13 @@ | 
				
			|||||||
 | 
					package rition.service.collector.mq; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import com.google.protobuf.GeneratedMessageV3; | 
				
			||||||
 | 
					import org.apache.kafka.common.serialization.Serializer; | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@Deprecated | 
				
			||||||
 | 
					public class ProtobufMessageSerializer implements Serializer<GeneratedMessageV3> { | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override | 
				
			||||||
 | 
					    public byte[] serialize(String topic, GeneratedMessageV3 data) { | 
				
			||||||
 | 
					        return data.toByteArray(); | 
				
			||||||
 | 
					    } | 
				
			||||||
 | 
					} | 
				
			||||||
@ -0,0 +1,19 @@ | 
				
			|||||||
 | 
					spring: | 
				
			||||||
 | 
					  datasource: | 
				
			||||||
 | 
					    driver-class-name: com.mysql.cj.jdbc.Driver | 
				
			||||||
 | 
					    url: jdbc:mysql://127.0.0.1:3306/rition?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true | 
				
			||||||
 | 
					    username: root | 
				
			||||||
 | 
					    password: Test2333! | 
				
			||||||
 | 
					  kafka: | 
				
			||||||
 | 
					    bootstrap-servers: '127.0.0.1:9092' | 
				
			||||||
 | 
					    producer: | 
				
			||||||
 | 
					      retries: 4 | 
				
			||||||
 | 
					      compression-type: zstd | 
				
			||||||
 | 
					      key-serializer: org.apache.kafka.common.serialization.StringSerializer | 
				
			||||||
 | 
					      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer | 
				
			||||||
 | 
					      acks: 1 | 
				
			||||||
 | 
					rition: | 
				
			||||||
 | 
					  kafka: | 
				
			||||||
 | 
					    data-collecting: | 
				
			||||||
 | 
					      topic: 'ecs-metric-data-topic' | 
				
			||||||
 | 
					      group: 'ecs-metric-data-group' | 
				
			||||||
@ -0,0 +1,5 @@ | 
				
			|||||||
 | 
					<?xml version="1.0" encoding="UTF-8" ?> | 
				
			||||||
 | 
					<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > | 
				
			||||||
 | 
					<mapper namespace="rition.service.collector.dao.mapper.MetricRecordMapper"> | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					</mapper> | 
				
			||||||
@ -1,16 +1,79 @@ | 
				
			|||||||
package client | 
					package client | 
				
			||||||
 | 
					
 | 
				
			||||||
import ( | 
					import ( | 
				
			||||||
 | 
						"bytes" | 
				
			||||||
 | 
						"encoding/json" | 
				
			||||||
 | 
						"fmt" | 
				
			||||||
 | 
						"io" | 
				
			||||||
 | 
						"net/http" | 
				
			||||||
	"time" | 
						"time" | 
				
			||||||
) | 
					) | 
				
			||||||
 | 
					
 | 
				
			||||||
type ReportDataItem struct { | 
					type ReportDataItem struct { | 
				
			||||||
	Metric    string            `json:"metric,omitempty"` | 
						Metric    string            `json:"metric,omitempty"` | 
				
			||||||
 | 
						Value     any               `json:"value,omitempty"` | 
				
			||||||
	Tags      map[string]string `json:"tags,omitempty"` | 
						Tags      map[string]string `json:"tags,omitempty"` | 
				
			||||||
	Timestamp time.Time         `json:"timestamp"` | 
						Timestamp int64             `json:"timestamp"` | 
				
			||||||
 | 
					} | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type DataItem struct { | 
				
			||||||
 | 
						Metric    string `json:"metric,omitempty"` | 
				
			||||||
	Value     any    `json:"value,omitempty"` | 
						Value     any    `json:"value,omitempty"` | 
				
			||||||
 | 
						Timestamp int64  `json:"timestamp"` | 
				
			||||||
} | 
					} | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Client) Report() { | 
					type ReportResponse struct { | 
				
			||||||
 | 
						RequestId string `json:"request_id,omitempty"` | 
				
			||||||
 | 
						Message   string `json:"message,omitempty"` | 
				
			||||||
 | 
					} | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c *Client) Report(dataItem []DataItem) (*ReportResponse, error) { | 
				
			||||||
 | 
						tags := map[string]string{ | 
				
			||||||
 | 
							"instanceId": c.config.InstanceId, | 
				
			||||||
 | 
							"hostname":   c.config.HostName, | 
				
			||||||
 | 
						} | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// convert data
 | 
				
			||||||
 | 
						reportDataItems := make([]ReportDataItem, 0, len(dataItem)) | 
				
			||||||
 | 
						for _, item := range dataItem { | 
				
			||||||
 | 
							reportDataItems = append(reportDataItems, ReportDataItem{ | 
				
			||||||
 | 
								Metric:    item.Metric, | 
				
			||||||
 | 
								Value:     item.Value, | 
				
			||||||
 | 
								Tags:      tags, | 
				
			||||||
 | 
								Timestamp: time.Now().UnixMilli(), | 
				
			||||||
 | 
							}) | 
				
			||||||
 | 
						} | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						reportJsonBytes, err := json.Marshal(reportDataItems) | 
				
			||||||
 | 
						if err != nil { | 
				
			||||||
 | 
							return nil, err | 
				
			||||||
 | 
						} | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						fmt.Println(string(reportJsonBytes)) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// send report data
 | 
				
			||||||
 | 
						resp, err := http.Post(c.config.CenterServer, "application/json", bytes.NewBuffer(reportJsonBytes)) | 
				
			||||||
 | 
						if err != nil { | 
				
			||||||
 | 
							return nil, err | 
				
			||||||
 | 
						} | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// get response
 | 
				
			||||||
 | 
						defer func(Body io.ReadCloser) { | 
				
			||||||
 | 
							_ = Body.Close() | 
				
			||||||
 | 
						}(resp.Body) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						respBytes, err := io.ReadAll(resp.Body) | 
				
			||||||
 | 
						if err != nil { | 
				
			||||||
 | 
							return nil, err | 
				
			||||||
 | 
						} | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						fmt.Println(string(respBytes)) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						response := ReportResponse{} | 
				
			||||||
 | 
						err = json.Unmarshal(respBytes, &response) | 
				
			||||||
 | 
						if err != nil { | 
				
			||||||
 | 
							return nil, err | 
				
			||||||
 | 
						} | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return &response, nil | 
				
			||||||
} | 
					} | 
				
			||||||
 | 
				
			|||||||
@ -1,5 +1,37 @@ | 
				
			|||||||
package main | 
					package main | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import ( | 
				
			||||||
 | 
						"flag" | 
				
			||||||
 | 
						"os" | 
				
			||||||
 | 
						"rition-probe/client" | 
				
			||||||
 | 
						"rition-probe/service" | 
				
			||||||
 | 
						"time" | 
				
			||||||
 | 
					) | 
				
			||||||
 | 
					
 | 
				
			||||||
func main() { | 
					func main() { | 
				
			||||||
 | 
						// --report '127.0.0.1:22019' --disk_dev nvme0n1 --mount / --net_dev wlp0s20f3 --interval 30
 | 
				
			||||||
 | 
						centerServer := flag.String("report", "", "上报中心") | 
				
			||||||
 | 
						diskDevice := flag.String("disk_dev", "", "监控的硬盘设备") | 
				
			||||||
 | 
						mount := flag.String("mount", "/", "监控的分区挂载位置") | 
				
			||||||
 | 
						netInterface := flag.String("net_dev", "", "监控的网卡设备") | 
				
			||||||
 | 
						interval := flag.Int("interval", 30, "上报间隔,单位为秒") | 
				
			||||||
 | 
						flag.Parse() | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						hostname, _ := os.Hostname() | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						config := service.Config{ | 
				
			||||||
 | 
							ClientConfig: client.Config{ | 
				
			||||||
 | 
								CenterServer: *centerServer, | 
				
			||||||
 | 
								HostName:     hostname, | 
				
			||||||
 | 
								InstanceId:   "5c4928f8-f27a-4ac9-85b4-c2dd03e086ad", | 
				
			||||||
 | 
							}, | 
				
			||||||
 | 
							MonitorDiskDevice:       *diskDevice, | 
				
			||||||
 | 
							MonitorDiskMountPoint:   *mount, | 
				
			||||||
 | 
							MonitorNetworkInterface: *netInterface, | 
				
			||||||
 | 
							ReportInterval:          time.Duration(*interval) * time.Second, | 
				
			||||||
 | 
						} | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						srv := service.NewService(config) | 
				
			||||||
 | 
						//srv.Run()
 | 
				
			||||||
 | 
						srv.Report() | 
				
			||||||
} | 
					} | 
				
			||||||
 | 
				
			|||||||
@ -0,0 +1,10 @@ | 
				
			|||||||
 | 
					package probe | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import ( | 
				
			||||||
 | 
						"fmt" | 
				
			||||||
 | 
						"testing" | 
				
			||||||
 | 
					) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestProbe_CurrentCPUTotalPercent(t *testing.T) { | 
				
			||||||
 | 
						fmt.Println(probeTest.CurrentCPUTotalPercent()) | 
				
			||||||
 | 
					} | 
				
			||||||
@ -0,0 +1,106 @@ | 
				
			|||||||
 | 
					package service | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import ( | 
				
			||||||
 | 
						"fmt" | 
				
			||||||
 | 
						"rition-probe/client" | 
				
			||||||
 | 
						"time" | 
				
			||||||
 | 
					) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (s *Service) Run() { | 
				
			||||||
 | 
						ticker := time.NewTicker(s.config.ReportInterval) | 
				
			||||||
 | 
						defer ticker.Stop() | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for range ticker.C { | 
				
			||||||
 | 
							s.Report() | 
				
			||||||
 | 
						} | 
				
			||||||
 | 
					} | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (s *Service) Report() { | 
				
			||||||
 | 
						dataItems := make([]client.DataItem, 0) | 
				
			||||||
 | 
						nowTime := time.Now() | 
				
			||||||
 | 
						now := nowTime.UnixMilli() | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// cpu
 | 
				
			||||||
 | 
						_5minCPULoad := s.prevCpuLoadData | 
				
			||||||
 | 
						if nowTime.Sub(s.prevCpuLoadDataCollectTime).Minutes() >= 5 { | 
				
			||||||
 | 
							_5minCPULoad = s.probe.CurrentCPUTotalPercent() | 
				
			||||||
 | 
						} | 
				
			||||||
 | 
						dataItems = append(dataItems, client.DataItem{ | 
				
			||||||
 | 
							Metric:    "node_load5", | 
				
			||||||
 | 
							Value:     _5minCPULoad, | 
				
			||||||
 | 
							Timestamp: now, | 
				
			||||||
 | 
						}) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						sysUptime := s.probe.HostUptime() | 
				
			||||||
 | 
						dataItems = append(dataItems, client.DataItem{ | 
				
			||||||
 | 
							Metric:    "node_cpu_seconds_total", | 
				
			||||||
 | 
							Value:     sysUptime, | 
				
			||||||
 | 
							Timestamp: now, | 
				
			||||||
 | 
						}) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// mem
 | 
				
			||||||
 | 
						memTotal, memUsed, memBuffers, memCached := s.probe.RamUsage() | 
				
			||||||
 | 
						dataItems = append(dataItems, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_memory_MemTotal_bytes", Value: memTotal, Timestamp: now, | 
				
			||||||
 | 
						}, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_memory_MemFree_bytes", Value: memTotal - memUsed, Timestamp: now, | 
				
			||||||
 | 
						}, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_memory_Buffers_bytes", Value: memBuffers, Timestamp: now, | 
				
			||||||
 | 
						}, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_memory_Cached_bytes", Value: memCached, Timestamp: now, | 
				
			||||||
 | 
						}) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// disk
 | 
				
			||||||
 | 
						usageStat, diskIoCount := s.probe.DiskStatus(s.config.MonitorDiskMountPoint, s.config.MonitorDiskDevice) | 
				
			||||||
 | 
						dataItems = append(dataItems, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_filesystem_avail_bytes", Value: usageStat.Free, Timestamp: now, | 
				
			||||||
 | 
						}, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_filesystem_size_bytes", Value: usageStat.Total, Timestamp: now, | 
				
			||||||
 | 
						}, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_filesystem_free_bytes", Value: usageStat.Free, Timestamp: now, | 
				
			||||||
 | 
						}, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_disk_read_bytes_total", Value: diskIoCount.ReadBytes, Timestamp: now, | 
				
			||||||
 | 
						}, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_disk_written_bytes_total", Value: diskIoCount.WriteBytes, Timestamp: now, | 
				
			||||||
 | 
						}, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_disk_reads_completed_total", Value: diskIoCount.MergedWriteCount, Timestamp: now, | 
				
			||||||
 | 
						}, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_disk_writes_completed_total", Value: diskIoCount.MergedWriteCount, Timestamp: now, | 
				
			||||||
 | 
						}) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// network
 | 
				
			||||||
 | 
						networkIoCount, conns := s.probe.GetNetworkCounterOne(s.config.MonitorNetworkInterface) | 
				
			||||||
 | 
						dataItems = append(dataItems, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_network_receive_bytes_total", Value: networkIoCount.BytesRecv, Timestamp: now, | 
				
			||||||
 | 
						}, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_network_transmit_bytes_total", Value: networkIoCount.BytesSent, Timestamp: now, | 
				
			||||||
 | 
						}, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_network_receive_packets_total", Value: networkIoCount.PacketsRecv, Timestamp: now, | 
				
			||||||
 | 
						}, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_network_transmit_packets_total", Value: networkIoCount.PacketsSent, Timestamp: now, | 
				
			||||||
 | 
						}, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_network_receive_drop_total", Value: networkIoCount.Dropin, Timestamp: now, | 
				
			||||||
 | 
						}, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_network_transmit_drop_total", Value: networkIoCount.Dropout, Timestamp: now, | 
				
			||||||
 | 
						}) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						estabCnt, twCnt := 0, 0 | 
				
			||||||
 | 
						for _, conn := range conns { | 
				
			||||||
 | 
							switch conn.Status { | 
				
			||||||
 | 
							case "ESTABLISHED": | 
				
			||||||
 | 
								estabCnt++ | 
				
			||||||
 | 
							case "TIME_WAIT": | 
				
			||||||
 | 
								twCnt++ | 
				
			||||||
 | 
							} | 
				
			||||||
 | 
						} | 
				
			||||||
 | 
						dataItems = append(dataItems, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_netstat_Tcp_CurrEstab", Value: estabCnt, Timestamp: now, | 
				
			||||||
 | 
						}, client.DataItem{ | 
				
			||||||
 | 
							Metric: "node_netstat_Tcp_tw", Value: twCnt, Timestamp: now, | 
				
			||||||
 | 
						}) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						_, err := s.client.Report(dataItems) | 
				
			||||||
 | 
						if err != nil { | 
				
			||||||
 | 
							fmt.Println("err: ", err.Error()) | 
				
			||||||
 | 
						} | 
				
			||||||
 | 
					} | 
				
			||||||
@ -1,22 +1,38 @@ | 
				
			|||||||
package service | 
					package service | 
				
			||||||
 | 
					
 | 
				
			||||||
import "time" | 
					import ( | 
				
			||||||
 | 
						"rition-probe/client" | 
				
			||||||
 | 
						"rition-probe/probe" | 
				
			||||||
 | 
						"time" | 
				
			||||||
 | 
					) | 
				
			||||||
 | 
					
 | 
				
			||||||
type Config struct { | 
					type Config struct { | 
				
			||||||
	CenterServer   string        `json:"centerServer,omitempty"` | 
						ClientConfig            client.Config | 
				
			||||||
	HostName       string        `json:"hostName,omitempty"` | 
						MonitorDiskDevice       string | 
				
			||||||
	InstanceId     string        `json:"instanceId,omitempty"` | 
						MonitorDiskMountPoint   string | 
				
			||||||
 | 
						MonitorNetworkInterface string | 
				
			||||||
 | 
					
 | 
				
			||||||
	ReportInterval time.Duration `json:"reportInterval,omitempty"` | 
						ReportInterval time.Duration `json:"reportInterval,omitempty"` | 
				
			||||||
} | 
					} | 
				
			||||||
 | 
					
 | 
				
			||||||
type Service struct { | 
					type Service struct { | 
				
			||||||
	config Config | 
						config Config | 
				
			||||||
 | 
						probe  *probe.Probe | 
				
			||||||
 | 
						client *client.Client | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						prevCpuLoadDataCollectTime time.Time | 
				
			||||||
 | 
						prevCpuLoadData            float64 | 
				
			||||||
} | 
					} | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewService(config Config) *Service { | 
					func NewService(config Config) *Service { | 
				
			||||||
	service := Service{ | 
						service := Service{ | 
				
			||||||
		config: config, | 
							config: config, | 
				
			||||||
 | 
							probe:  probe.NewProbe(), | 
				
			||||||
 | 
							client: client.NetClient(config.ClientConfig), | 
				
			||||||
	} | 
						} | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						service.prevCpuLoadData = service.probe.CurrentCPUTotalPercent() | 
				
			||||||
 | 
						service.prevCpuLoadDataCollectTime = time.Now() | 
				
			||||||
 | 
					
 | 
				
			||||||
	return &service | 
						return &service | 
				
			||||||
} | 
					} | 
				
			||||||
 | 
				
			|||||||
					Loading…
					
					
				
		Reference in new issue