Skip to content
This repository was archived by the owner on Nov 21, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added aggregator/commons-math3-3.6.1-javadoc.jar
Binary file not shown.
Binary file added aggregator/commons-math3-3.6.1-sources.jar
Binary file not shown.
Binary file added aggregator/commons-math3-3.6.1-test-sources.jar
Binary file not shown.
Binary file added aggregator/commons-math3-3.6.1-tests.jar
Binary file not shown.
Binary file added aggregator/commons-math3-3.6.1-tools.jar
Binary file not shown.
Binary file added aggregator/commons-math3-3.6.1.jar
Binary file not shown.
Binary file added aggregator/javacsv.jar
Binary file not shown.
10 changes: 10 additions & 0 deletions aggregator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@
<artifactId>flowgate-worker</artifactId>
<version>${flowgate.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>
<dependency>
<groupId>net.sourceforge.javacsv</groupId>
<artifactId>javacsv</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ public ServiceKeyConfig() {
}

public String getServiceKey() {
return serviceKey;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please check those code before create the pr.

return serviceKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
}
long execount = Long.valueOf(execountString);
//will execute weekly?

if (execount++ % 168 == 0) {
try {
EventMessage eventMessage = EventMessageUtil.createEventMessage(EventType.Aggregator,
Expand Down Expand Up @@ -76,7 +77,17 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
}catch(IOException e) {
logger.error("Failed to Send sync summary data command", e);
}
}else {
}else if(execount % 24 == 0){
try {
EventMessage eventMessage = EventMessageUtil.createEventMessage(EventType.Aggregator,
EventMessageUtil.SYNC_FITTING, "");
String jobmessage = EventMessageUtil.convertEventMessageAsString(eventMessage);
publisher.publish(EventMessageUtil.AggregatorTopic, jobmessage);
logger.info("Send sync fitting command");
}catch(IOException e) {
logger.error("Failed to Send sync fitting command", e);
}
}else {
//will execute hourly?
try {
EventMessage eventMessage = EventMessageUtil.createEventMessage(EventType.Aggregator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@
*/
package com.vmware.flowgate.aggregator.scheduler.job;

import java.io.FileNotFoundException;
import com.vmware.flowgate.aggregator.tool.*;
import com.vmware.flowgate.aggregator.tool.basic.*;
import java.io.IOException;
import com.csvreader.CsvReader;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -21,7 +27,11 @@
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import org.apache.commons.math3.fitting.PolynomialCurveFitter;
import org.apache.commons.math3.fitting.WeightedObservedPoints;
import org.apache.commons.math3.util.Pair;
import com.fasterxml.jackson.core.JsonProcessingException;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.vmware.flowgate.aggregator.config.ServiceKeyConfig;
Expand All @@ -31,6 +41,7 @@
import com.vmware.flowgate.common.MetricName;
import com.vmware.flowgate.common.model.Asset;
import com.vmware.flowgate.common.model.FacilitySoftwareConfig;
import com.vmware.flowgate.common.model.MetricData;
import com.vmware.flowgate.common.model.FacilitySoftwareConfig.SoftwareType;
import com.vmware.flowgate.common.model.SDDCSoftwareConfig;
import com.vmware.flowgate.common.model.ServerMapping;
Expand All @@ -40,6 +51,9 @@
import com.vmware.flowgate.common.model.redis.message.EventUser;
import com.vmware.flowgate.common.model.redis.message.impl.EventMessageUtil;

import io.netty.handler.codec.string.StringDecoder;


@Service
public class AggregatorService implements AsyncService {

Expand All @@ -65,10 +79,9 @@ public void executeAsync(EventMessage message) {
logger.warn("Drop none aggregator message " + message.getType());
return;
}
logger.info(message.getContent());
logger.info("message: " + message.getContent());
Set<EventUser> users = message.getTarget().getUsers();
for (EventUser command : users) {
logger.info(command.getId());
switch (command.getId()) {
case EventMessageUtil.FullMappingCommand:
mergeServerMapping();
Expand All @@ -92,6 +105,10 @@ public void executeAsync(EventMessage message) {
case EventMessageUtil.CleanRealtimeData:
cleanRealtimeData();
break;
case EventMessageUtil.SYNC_FITTING:
syncFitting();
recommend();
break;
case EventMessageUtil.AggregateAndCleanPowerIQPDU:
aggregateAndCleanPDUFromPowerIQ();
break;
Expand All @@ -103,7 +120,163 @@ public void executeAsync(EventMessage message) {
}
}
}

public List<MetricData> collectData(int saperate_num, long StartTime, String assetId) {
long Two_Hours = 305000 * 24;
List<MetricData> ret = new ArrayList<>(0);
long duration = Two_Hours * 60 / saperate_num;
logger.info("StartTime: " + String.valueOf(StartTime / Two_Hours * 2));
long duration_right = StartTime;
long duration_left = StartTime - Two_Hours * 24;
long duration_start;
for (int i = 0; i < saperate_num - 1; i++) {
duration_start = StartTime - duration * (i + 1);
if (i == 0)
duration_right = StartTime;
else
duration_right = duration_left;
if (i == saperate_num - 2)
duration_left = StartTime - Two_Hours * 60;
else
duration_left = duration_start - duration/2;

long cursor = duration_start;
MetricData[] metric_datas = null;

while (cursor < duration_right) {
if (cursor + Two_Hours > duration_right)
{
try {
metric_datas = restClient.getServerRealtimeDataByServerID(assetId, duration_right, Two_Hours).getBody();
} catch (Exception e) {
metric_datas = null;
}
}
else {
try {
metric_datas = restClient.getServerRealtimeDataByServerID(assetId, cursor + Two_Hours, Two_Hours).getBody();
} catch (Exception e) {
metric_datas = null;
}
}
if (metric_datas == null)
break;
else {
ret.addAll(Arrays.asList(metric_datas));
cursor += Two_Hours;
}
}
cursor = duration_start;
long loc = duration_left;
while (cursor - Two_Hours>= duration_left) {
try {
metric_datas = restClient.getServerRealtimeDataByServerID(assetId, cursor, Two_Hours).getBody();
} catch (Exception e) {
metric_datas = null;
}
if (metric_datas == null) {
loc = duration_left;
break;
}
else {
ret.addAll(Arrays.asList(metric_datas));
cursor -= Two_Hours;
loc = cursor;
}
}
duration_left = loc;
}
return ret;
}

public void syncFitting() {

List<MetricData> MetricDatas;
List<List<Double>> results = new ArrayList<>();
SyncFittingTool tool = new SyncFittingTool();

long Five_Miniutes = 305000;
restClient.setServiceKey(serviceKeyConfig.getServiceKey());
MetricData[] raw_MetricDatas = null;
Asset[] servers = restClient.getMappedAsset(AssetCategory.Server).getBody();
for (int i = 0; i < servers.length; i++)
{
String assetId = servers[i].getId();
MetricDatas = collectData(10, System.currentTimeMillis(), assetId);
List<Double> fitting_result = tool.doFitting(MetricDatas);
results.add(fitting_result);
servers[i].setFittingResults(fitting_result);
Asset asset = new Asset();
asset.setFittingResults(fitting_result);
//asset.setAssetName("fitting_results");
restClient.saveAssets(asset);
}

}

public void recommend() {

Asset[] servers = restClient.getMappedAsset(AssetCategory.Server).getBody();

//TODO: use vc api to create bags.
Bag[] bags = new Bag[];
bags[i] = new Bag(memory, cpu, fitting_result, i);
//

long Two_Hours = 305000 * 24;
long now_time = System.currentTimeMillis();
ArrayList<Item> items = new ArrayList<>();
for (int i = 0; i < servers.length; i++) {
List<Double> fitting_result = servers[i].getFittingResults();
String assetId = servers[i].getId();
MetricData[] metric_datas = null;
long start = now_time;
while (metric_datas == null && start > now_time - Two_Hours * 60) {
metric_datas = restClient.getServerRealtimeDataByServerID(assetId, start, Two_Hours).getBody();
start = start - Two_Hours;
}
if (metric_datas == null)
continue;
List< Double> raw_CPU_list = new ArrayList<>();
List<Double> raw_cpuMhz_list = new ArrayList<>();
List< Double> raw_memory_list = new ArrayList<>();
List<Double> raw_memoryKb_list = new ArrayList<>();
Double cpu_sum = 0.0;
Double cpu_mhz_sum = 0.0;
Double memory_sum = 0.0;
Double memory_kb_sum = 0.0;

for (int j = 0; j < metric_datas.length; j++) {
if (metric_datas[j].getMetricName() == "CpuUsage") {
raw_CPU_list.add(metric_datas[j].getValueNum());
cpu_sum += metric_datas[j].getValueNum();
}
else if (metric_datas[j].getMetricName() == "CpuUsedInMhz") {
raw_cpuMhz_list.add(metric_datas[j].getValueNum());
cpu_mhz_sum += metric_datas[j].getValueNum();
}
else if (metric_datas[j].getMetricName() == "MemoryUsage") {
raw_memory_list.add(metric_datas[j].getValueNum());
memory_sum += metric_datas[j].getValueNum();
}
else if (metric_datas[j].getMetricName() == "ActiveMemory") {
raw_memoryKb_list.add(metric_datas[j].getValueNum());
memory_kb_sum += metric_datas[j].getValueNum();
}

}
Double avg_cpu = cpu_sum / raw_CPU_list.size();
Double avg_cpu_mhz = cpu_mhz_sum / raw_cpuMhz_list.size();
Double avg_memory = memory_sum / raw_memory_list.size();
Double avg_memory_kb = memory_kb_sum / raw_memoryKb_list.size();
items.add(new Item(assetId, avg_memory_kb/avg_memory, avg_cpu_mhz, avg_cpu_mhz/avg_cpu));


}
NeighborhoodSearch ns = new NeighborhoodSearch(bags, items);
ns.search();
}

public void syncSummaryData() {
restClient.setServiceKey(serviceKeyConfig.getServiceKey());
restClient.getSystemSummary(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
//every 12 hour we will trigger a sync host metadata job.
//every 1 day we will trigger a sync CustomerAttrsData job.
//every 10 days we will trigger a sync CustomAttributes job.
//every 1 day we will trigger a fitting job.

restClient.setServiceKey(serviceKeyConfig.getServiceKey());
SDDCSoftwareConfig[] vcServers = restClient.getVCServers().getBody();
Expand Down Expand Up @@ -110,7 +111,7 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
generateSDDCMessageListByType(EventMessageUtil.VCENTER_SyncCustomerAttrs,
vcServersActiveArray));
}

publisher.publish(EventMessageUtil.VCTopic,
EventMessageUtil.generateSDDCNotifyMessage(EventType.VCenter));
} catch (IOException e) {
Expand Down
Loading