Skip to content

Commit 03a8518

Browse files
author
Michal Tichák
committed
[core] fixing and refactoring monitoring
Fixed race condition which results in golang metrics reporting being stuck by adding check to monitoring.Send function. Other refactoring and documentation
1 parent c46119c commit 03a8518

File tree

5 files changed

+101
-57
lines changed

5 files changed

+101
-57
lines changed

common/ecsmetrics/metrics.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"time"
66

77
"github.com/AliceO2Group/Control/common/logger"
8+
"github.com/AliceO2Group/Control/common/logger/infologger"
89
"github.com/AliceO2Group/Control/common/monitoring"
910
"github.com/sirupsen/logrus"
1011
)
@@ -30,7 +31,6 @@ func gather() monitoring.Metric {
3031
{Name: "/memory/classes/heap/unused:bytes"},
3132
}
3233

33-
// Collect metrics data
3434
internalmetrics.Read(samples)
3535

3636
metric := NewMetric("golangruntimemetrics")
@@ -42,24 +42,28 @@ func gather() monitoring.Metric {
4242
case internalmetrics.KindFloat64:
4343
metric.AddValue(sample.Name, sample.Value.Float64())
4444
case internalmetrics.KindFloat64Histogram:
45-
log.Warning("Error: Histogram is not supported yet for metric [%s]", sample.Name)
45+
log.WithField("level", infologger.IL_Devel).Warningf("Error: Histogram is not supported yet for metric [%s]", sample.Name)
4646
continue
4747
default:
48-
log.Warning("Unsupported kind %v for metric %s\n", sample.Value.Kind(), sample.Name)
48+
log.WithField("level", infologger.IL_Devel).Warningf("Unsupported kind %v for metric %s\n", sample.Value.Kind(), sample.Name)
4949
continue
5050
}
5151
}
5252
return metric
5353
}
5454

5555
func StartGolangMetrics(period time.Duration) {
56+
log.WithField("level", infologger.IL_Devel).Info("Starting golang metrics reporting")
5657
go func() {
58+
log.Debug("Starting golang metrics goroutine")
5759
for {
5860
select {
5961
case <-endRequestChannel:
62+
log.Debug("ending golang metrics")
6063
endRequestChannel <- struct{}{}
6164
return
6265
default:
66+
log.Debug("sending golang metrics")
6367
monitoring.Send(gather())
6468
time.Sleep(period)
6569
}

common/monitoring/monitoring.go

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,64 +22,77 @@ var (
2222
// channel used to send metrics into the event loop
2323
metricsChannel chan Metric
2424

25-
// channel for sending notifications to event loop that new http Request to report metrics arrived
26-
metricsRequestChannel chan struct{}
25+
// channel for sending requests to reset actual metrics slice and send it back to caller via metricsExportedToRequest
26+
metricsRequestedChannel chan struct{}
2727

2828
// channel used to send metrics to be reported by http request from event loop
29-
metricsToRequest chan []Metric
29+
metricsExportedToRequest chan []Metric
3030

31-
Log = logger.New(logrus.StandardLogger(), "metrics")
31+
log = logger.New(logrus.StandardLogger(), "metrics")
3232
)
3333

3434
func initChannels(messageBufferSize int) {
3535
endChannel = make(chan struct{})
36-
metricsRequestChannel = make(chan struct{})
36+
metricsRequestedChannel = make(chan struct{})
37+
// 100 was chosen arbitrarily as a number that seemed sensible to be high enough to provide nice buffer if
38+
// multiple goroutines want to send metrics without blocking each other
3739
metricsChannel = make(chan Metric, 100)
38-
metricsToRequest = make(chan []Metric)
40+
metricsExportedToRequest = make(chan []Metric)
3941
metricsLimit = messageBufferSize
4042
}
4143

4244
func closeChannels() {
4345
close(endChannel)
44-
close(metricsRequestChannel)
46+
close(metricsRequestedChannel)
4547
close(metricsChannel)
46-
close(metricsToRequest)
48+
close(metricsExportedToRequest)
4749
}
4850

51+
// this eventLoop is the main part that processes all metrics send to the package
52+
// 3 events can happen:
53+
// 1. metricsChannel receives message from Send() method. We just add the new metric to metrics slice
54+
// 2. metricsRequestChannel receives request to dump and request existing metrics. We send shallow copy of existing
55+
// metrics to requestor (via metricsExportedToRequest channel) while resetting current metrics slice
56+
// 3. receive request to stop monitoring via endChannel. We send confirmation through endChannel to notify caller
57+
// that eventLoop stopped
4958
func eventLoop() {
5059
for {
5160
select {
52-
case <-metricsRequestChannel:
61+
case <-metricsRequestedChannel:
5362
shallowCopyMetrics := metrics
5463
metrics = make([]Metric, 0)
55-
metricsToRequest <- shallowCopyMetrics
64+
metricsExportedToRequest <- shallowCopyMetrics
5665

5766
case metric := <-metricsChannel:
5867
if len(metrics) < metricsLimit {
5968
metrics = append(metrics, metric)
6069
} else {
61-
Log.Warn("too many metrics waiting to be scraped. Are you sure that metrics scraping is running?")
70+
log.Warn("too many metrics waiting to be scraped. Are you sure that metrics scraping is running?")
6271
}
6372

6473
case <-endChannel:
65-
endChannel <- struct{}{}
74+
defer func() {
75+
endChannel <- struct{}{}
76+
}()
6677
return
6778
}
6879
}
6980
}
7081

7182
func exportMetricsAndReset(w http.ResponseWriter, r *http.Request) {
7283
w.Header().Set("Content-Type", "application/json")
73-
metricsRequestChannel <- struct{}{}
74-
metricsToConvert := <-metricsToRequest
84+
metricsRequestedChannel <- struct{}{}
85+
metricsToConvert := <-metricsExportedToRequest
7586
if metricsToConvert == nil {
7687
metricsToConvert = make([]Metric, 0)
7788
}
7889
json.NewEncoder(w).Encode(metricsToConvert)
7990
}
8091

8192
func Send(metric Metric) {
82-
metricsChannel <- metric
93+
if IsRunning() {
94+
metricsChannel <- metric
95+
}
8396
}
8497

8598
func handleFunc(endpointName string) {
@@ -96,22 +109,22 @@ func handleFunc(endpointName string) {
96109
// \param messageBufferSize size of buffer for messages where messages are kept between scraping request.
97110
//
98111
// If we attempt send more messages than the size of the buffer, these overflowing messages will be ignored and warning will be logged.
99-
func Start(port uint16, endpointName string, messageBufferSize int) error {
100-
if server != nil {
112+
func Run(port uint16, endpointName string, messageBufferSize int) error {
113+
if IsRunning() {
101114
return nil
102115
}
103116

104117
initChannels(messageBufferSize)
105118

106119
go eventLoop()
107120

108-
server := &http.Server{Addr: fmt.Sprintf(":%d", port)}
121+
server = &http.Server{Addr: fmt.Sprintf(":%d", port)}
109122
handleFunc(endpointName)
110123
return server.ListenAndServe()
111124
}
112125

113126
func Stop() {
114-
if server == nil {
127+
if !IsRunning() {
115128
return
116129
}
117130

@@ -122,4 +135,9 @@ func Stop() {
122135
endChannel <- struct{}{}
123136
<-endChannel
124137
server = nil
138+
metrics = nil
139+
}
140+
141+
func IsRunning() bool {
142+
return server != nil
125143
}

common/monitoring/monitoring_test.go

Lines changed: 54 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,36 +8,61 @@ import (
88
"time"
99
)
1010

11+
// blocks until either IsRunning() returns true or timeout is triggered
12+
func isRunningWithTimeout(t *testing.T, timeout time.Duration) {
13+
timeoutChan := time.After(timeout)
14+
for !IsRunning() {
15+
select {
16+
case <-timeoutChan:
17+
t.Errorf("Monitoring is not running even after %v", timeout)
18+
return
19+
20+
default:
21+
time.Sleep(10 * time.Millisecond)
22+
}
23+
}
24+
}
25+
26+
// block until either length of metrics is the same as \requiredMessages or timeout is triggered
27+
func hasNumberOfMetrics(t *testing.T, timeout time.Duration, requiredMessages int) {
28+
timeoutChan := time.After(timeout)
29+
for len(metrics) != requiredMessages {
30+
select {
31+
case <-timeoutChan:
32+
t.Errorf("Timeout %v triggered when waiting for %v messages, got %v", timeout, requiredMessages, len(metrics))
33+
return
34+
35+
default:
36+
time.Sleep(10 * time.Millisecond)
37+
}
38+
}
39+
}
40+
1141
func TestSimpleStartStop(t *testing.T) {
12-
go Start(1234, "/random", 100)
13-
time.Sleep(time.Millisecond * 100)
42+
go Run(1234, "/random", 100)
43+
isRunningWithTimeout(t, time.Second)
1444
Stop()
1545
}
1646

1747
func TestStartMultipleStop(t *testing.T) {
18-
go Start(1234, "/random", 100)
19-
time.Sleep(time.Millisecond * 100)
48+
go Run(1234, "/random", 100)
49+
isRunningWithTimeout(t, time.Second)
2050
Stop()
2151
Stop()
2252
}
2353

2454
func cleaningUpAfterTest() {
25-
endChannel <- struct{}{}
26-
<-endChannel
27-
closeChannels()
28-
metrics = make([]Metric, 0)
55+
Stop()
2956
}
3057

3158
func initTest() {
32-
initChannels(100)
33-
// we need metrics channel to block so we don't end to quickly
34-
metricsChannel = make(chan Metric, 0)
35-
go eventLoop()
59+
go Run(12345, "notimportant", 100)
3660
}
3761

3862
// decorator function that properly inits and cleans after higher level test of Monitoring package
3963
func testFunction(t *testing.T, testToRun func(*testing.T)) {
4064
initTest()
65+
isRunningWithTimeout(t, time.Second)
4166
testToRun(t)
4267
cleaningUpAfterTest()
4368
}
@@ -46,9 +71,7 @@ func TestSendingSingleMetric(t *testing.T) {
4671
testFunction(t, func(t *testing.T) {
4772
metric := Metric{Name: "test"}
4873
Send(metric)
49-
if len(metrics) != 1 {
50-
t.Error("wrong number of metrics, should be 1")
51-
}
74+
hasNumberOfMetrics(t, time.Second, 1)
5275

5376
if metrics[0].Name != "test" {
5477
t.Errorf("Got wrong name %s in stored metric", metrics[0].Name)
@@ -60,16 +83,17 @@ func TestExportingMetrics(t *testing.T) {
6083
testFunction(t, func(t *testing.T) {
6184
metric := Metric{Name: "test"}
6285
Send(metric)
86+
hasNumberOfMetrics(t, time.Second, 1)
6387

64-
metricsRequestChannel <- struct{}{}
65-
metrics := <-metricsToRequest
88+
metricsRequestedChannel <- struct{}{}
89+
metricsToExport := <-metricsExportedToRequest
6690

67-
if len(metrics) != 1 {
68-
t.Errorf("Got wrong amount of metrics %d, expected 1", len(metrics))
91+
if len(metricsToExport) != 1 {
92+
t.Errorf("Got wrong amount of metrics %d, expected 1", len(metricsToExport))
6993
}
7094

71-
if metrics[0].Name != "test" {
72-
t.Errorf("Got wrong name of metric %s, expected test", metrics[0].Name)
95+
if metricsToExport[0].Name != "test" {
96+
t.Errorf("Got wrong name of metric %s, expected test", metricsToExport[0].Name)
7397
}
7498
})
7599
}
@@ -81,11 +105,9 @@ func TestBufferLimit(t *testing.T) {
81105
metric.Timestamp = 10
82106
metric.AddTag("tag1", 42)
83107
metric.AddValue("value1", 11)
84-
Send(metric)
85108

86-
if len(metrics) != 1 {
87-
t.Errorf("Metrics length is %d, but should be 1 after sending first metric", len(metrics))
88-
}
109+
Send(metric)
110+
hasNumberOfMetrics(t, time.Second, 1)
89111

90112
Send(metric)
91113
time.Sleep(100 * time.Millisecond)
@@ -97,20 +119,20 @@ func TestBufferLimit(t *testing.T) {
97119
}
98120

99121
func TestHttpRun(t *testing.T) {
100-
go Start(12345, "/metrics", 10)
122+
go Run(9876, "/metrics", 10)
101123
defer Stop()
102124

103-
time.Sleep(time.Second)
125+
isRunningWithTimeout(t, time.Second)
104126

105127
metric := Metric{Name: "test"}
106128
metric.Timestamp = 10
107129
metric.AddTag("tag1", 42)
108130
metric.AddValue("value1", 11)
109131
Send(metric)
110132

111-
response, err := http.Get("http://localhost:12345/metrics")
133+
response, err := http.Get("http://localhost:9876/metrics")
112134
if err != nil {
113-
t.Fatalf("Failed to GET metrics at port 12345: %v", err)
135+
t.Fatalf("Failed to GET metrics at port 9876: %v", err)
114136
}
115137
decoder := json.NewDecoder(response.Body)
116138
var receivedMetrics []Metric
@@ -157,7 +179,7 @@ func TestHttpRun(t *testing.T) {
157179
// PASS
158180
// ok github.com/AliceO2Group/Control/common/monitoring 44.686s
159181
func BenchmarkSendingMetrics(b *testing.B) {
160-
Start(12345, "/metrics", 100)
182+
Run(12345, "/metrics", 100)
161183

162184
// this goroutine keeps clearing results so RAM does not exhausted
163185
go func() {
@@ -168,8 +190,8 @@ func BenchmarkSendingMetrics(b *testing.B) {
168190
break
169191
default:
170192
if len(metrics) >= 10000000 {
171-
metricsRequestChannel <- struct{}{}
172-
<-metricsToRequest
193+
metricsRequestedChannel <- struct{}{}
194+
<-metricsExportedToRequest
173195
}
174196
}
175197
time.Sleep(100 * time.Millisecond)

core/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func setDefaults() error {
128128
viper.SetDefault("kafkaEndpoints", []string{"localhost:9092"})
129129
viper.SetDefault("enableKafka", true)
130130
viper.SetDefault("logAllIL", false)
131-
viper.SetDefault("metricsEndpoint", "8086/metrics")
131+
viper.SetDefault("metricsEndpoint", "8088/ecsmetrics")
132132
viper.SetDefault("metricsBufferSize", 10000)
133133
return nil
134134
}

core/core.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ func parseMetricsEndpoint(metricsEndpoint string) (error, uint16, string) {
7373
}
7474

7575
func runMetrics() {
76-
log.Info("Starting run metrics")
7776
metricsEndpoint := viper.GetString("metricsEndpoint")
7877
err, port, endpoint := parseMetricsEndpoint(metricsEndpoint)
7978
if err != nil {
@@ -82,7 +81,8 @@ func runMetrics() {
8281
}
8382

8483
go func() {
85-
if err := monitoring.Start(port, fmt.Sprintf("/%s", endpoint), viper.GetInt("metricsBufferSize")); err != nil && err != http.ErrServerClosed {
84+
log.Infof("Starting to listen on endpoint %s:%d for metrics", endpoint, port)
85+
if err := monitoring.Run(port, fmt.Sprintf("/%s", endpoint), viper.GetInt("metricsBufferSize")); err != nil && err != http.ErrServerClosed {
8686
ecsmetrics.StopGolangMetrics()
8787
log.Errorf("failed to run metrics on port %d and endpoint: %s")
8888
}

0 commit comments

Comments
 (0)