Skip to content

Commit 49122b9

Browse files
authored
Merge pull request #517 from matheuscscp/fix-rate-limit
Fix revision discarded on event rate limiting key calculation
2 parents 5eff905 + a8af564 commit 49122b9

File tree

3 files changed

+127
-77
lines changed

3 files changed

+127
-77
lines changed

internal/server/event_handlers.go

Lines changed: 2 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@ package server
1919
import (
2020
"context"
2121
"crypto/x509"
22-
"encoding/json"
2322
"errors"
2423
"fmt"
25-
"io"
2624
"net/http"
2725
"regexp"
2826
"strings"
@@ -45,30 +43,13 @@ import (
4543

4644
func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request) {
4745
return func(w http.ResponseWriter, r *http.Request) {
48-
r.Context()
49-
body, err := io.ReadAll(r.Body)
50-
if err != nil {
51-
s.logger.Error(err, "reading the request body failed")
52-
w.WriteHeader(http.StatusBadRequest)
53-
return
54-
}
55-
defer r.Body.Close()
56-
57-
event := &eventv1.Event{}
58-
err = json.Unmarshal(body, event)
59-
if err != nil {
60-
s.logger.Error(err, "decoding the request body failed")
61-
w.WriteHeader(http.StatusBadRequest)
62-
return
63-
}
64-
65-
cleanupMetadata(event)
46+
event := r.Context().Value(eventContextKey{}).(*eventv1.Event)
6647

6748
ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
6849
defer cancel()
6950

7051
var allAlerts apiv1beta2.AlertList
71-
err = s.kubeClient.List(ctx, &allAlerts)
52+
err := s.kubeClient.List(ctx, &allAlerts)
7253
if err != nil {
7354
s.logger.Error(err, "listing alerts failed")
7455
w.WriteHeader(http.StatusBadRequest)
@@ -346,28 +327,6 @@ func (s *EventServer) eventMatchesAlert(ctx context.Context, event *eventv1.Even
346327
return false
347328
}
348329

349-
// cleanupMetadata removes metadata entries which are not used for alerting
350-
func cleanupMetadata(event *eventv1.Event) {
351-
group := event.InvolvedObject.GetObjectKind().GroupVersionKind().Group
352-
excludeList := []string{
353-
fmt.Sprintf("%s/%s", group, eventv1.MetaChecksumKey),
354-
fmt.Sprintf("%s/%s", group, eventv1.MetaDigestKey),
355-
}
356-
357-
meta := make(map[string]string)
358-
if event.Metadata != nil && len(event.Metadata) > 0 {
359-
// Filter other meta based on group prefix, while filtering out excludes
360-
for key, val := range event.Metadata {
361-
if strings.HasPrefix(key, group) && !inList(excludeList, key) {
362-
newKey := strings.TrimPrefix(key, fmt.Sprintf("%s/", group))
363-
meta[newKey] = val
364-
}
365-
}
366-
}
367-
368-
event.Metadata = meta
369-
}
370-
371330
func inList(l []string, i string) bool {
372331
for _, v := range l {
373332
if strings.EqualFold(v, i) {

internal/server/event_server.go

Lines changed: 74 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ import (
3838
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
3939
)
4040

41+
type eventContextKey struct{}
42+
4143
// EventServer handles event POST requests
4244
type EventServer struct {
4345
port string
@@ -63,8 +65,16 @@ func (s *EventServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Mid
6365
s.logger.Error(err, "Event server crashed")
6466
os.Exit(1)
6567
}
68+
var handler http.Handler = http.HandlerFunc(s.handleEvent())
69+
for _, middleware := range []func(http.Handler) http.Handler{
70+
limitMiddleware.Handle,
71+
s.logRateLimitMiddleware,
72+
s.cleanupMetadataMiddleware,
73+
} {
74+
handler = middleware(handler)
75+
}
6676
mux := http.NewServeMux()
67-
mux.Handle("/", s.logRateLimitMiddleware(limitMiddleware.Handle(http.HandlerFunc(s.handleEvent()))))
77+
mux.Handle("/", handler)
6878
h := std.Handler("", mdlw, mux)
6979
srv := &http.Server{
7080
Addr: s.port,
@@ -90,6 +100,59 @@ func (s *EventServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Mid
90100
}
91101
}
92102

103+
// cleanupMetadataMiddleware cleans up the metadata using cleanupMetadata() and
104+
// adds the cleaned event in the request context which can then be queried and
105+
// used directly by the other http handlers.
106+
func (s *EventServer) cleanupMetadataMiddleware(h http.Handler) http.Handler {
107+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
108+
body, err := io.ReadAll(r.Body)
109+
if err != nil {
110+
s.logger.Error(err, "reading the request body failed")
111+
w.WriteHeader(http.StatusBadRequest)
112+
return
113+
}
114+
r.Body.Close()
115+
r.Body = io.NopCloser(bytes.NewBuffer(body))
116+
117+
event := &eventv1.Event{}
118+
err = json.Unmarshal(body, event)
119+
if err != nil {
120+
s.logger.Error(err, "decoding the request body failed")
121+
w.WriteHeader(http.StatusBadRequest)
122+
return
123+
}
124+
125+
cleanupMetadata(event)
126+
127+
ctxWithEvent := context.WithValue(r.Context(), eventContextKey{}, event)
128+
reqWithEvent := r.WithContext(ctxWithEvent)
129+
130+
h.ServeHTTP(w, reqWithEvent)
131+
})
132+
}
133+
134+
// cleanupMetadata removes metadata entries which are not used for alerting.
135+
func cleanupMetadata(event *eventv1.Event) {
136+
group := event.InvolvedObject.GetObjectKind().GroupVersionKind().Group
137+
excludeList := []string{
138+
fmt.Sprintf("%s/%s", group, eventv1.MetaChecksumKey),
139+
fmt.Sprintf("%s/%s", group, eventv1.MetaDigestKey),
140+
}
141+
142+
meta := make(map[string]string)
143+
if event.Metadata != nil && len(event.Metadata) > 0 {
144+
// Filter other meta based on group prefix, while filtering out excludes
145+
for key, val := range event.Metadata {
146+
if strings.HasPrefix(key, group) && !inList(excludeList, key) {
147+
newKey := strings.TrimPrefix(key, fmt.Sprintf("%s/", group))
148+
meta[newKey] = val
149+
}
150+
}
151+
}
152+
153+
event.Metadata = meta
154+
}
155+
93156
type statusRecorder struct {
94157
http.ResponseWriter
95158
Status int
@@ -109,23 +172,7 @@ func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler {
109172
h.ServeHTTP(recorder, r)
110173

111174
if recorder.Status == http.StatusTooManyRequests {
112-
body, err := io.ReadAll(r.Body)
113-
if err != nil {
114-
s.logger.Error(err, "reading the request body failed")
115-
w.WriteHeader(http.StatusBadRequest)
116-
return
117-
}
118-
119-
event := &eventv1.Event{}
120-
err = json.Unmarshal(body, event)
121-
if err != nil {
122-
s.logger.Error(err, "decoding the request body failed")
123-
w.WriteHeader(http.StatusBadRequest)
124-
return
125-
}
126-
127-
r.Body = io.NopCloser(bytes.NewBuffer(body))
128-
175+
event := r.Context().Value(eventContextKey{}).(*eventv1.Event)
129176
s.logger.V(1).Info("Discarding event, rate limiting duplicate events",
130177
"reconciler kind", event.InvolvedObject.Kind,
131178
"name", event.InvolvedObject.Name,
@@ -135,24 +182,21 @@ func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler {
135182
}
136183

137184
func eventKeyFunc(r *http.Request) (string, error) {
138-
body, err := io.ReadAll(r.Body)
139-
if err != nil {
140-
return "", err
185+
event := r.Context().Value(eventContextKey{}).(*eventv1.Event)
186+
187+
comps := []string{
188+
"event",
189+
event.InvolvedObject.Name,
190+
event.InvolvedObject.Namespace,
191+
event.InvolvedObject.Kind,
192+
event.Message,
141193
}
142194

143-
event := &eventv1.Event{}
144-
err = json.Unmarshal(body, event)
145-
if err != nil {
146-
return "", err
147-
}
148-
149-
r.Body = io.NopCloser(bytes.NewBuffer(body))
150-
151-
comps := []string{"event", event.InvolvedObject.Name, event.InvolvedObject.Namespace, event.InvolvedObject.Kind, event.Message}
152195
revString, ok := event.Metadata[eventv1.MetaRevisionKey]
153196
if ok {
154197
comps = append(comps, revString)
155198
}
199+
156200
val := strings.Join(comps, "/")
157201
digest := sha256.Sum256([]byte(val))
158202
return fmt.Sprintf("%x", digest), nil

internal/server/event_server_test.go

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package server
1818

1919
import (
2020
"bytes"
21+
"context"
2122
"encoding/json"
2223
"fmt"
2324
"net/http"
@@ -53,6 +54,7 @@ func TestEventKeyFunc(t *testing.T) {
5354
severity string
5455
message string
5556
rateLimit bool
57+
metadata map[string]string
5658
}{
5759
{
5860
involvedObject: corev1.ObjectReference{
@@ -120,21 +122,66 @@ func TestEventKeyFunc(t *testing.T) {
120122
message: "Health check passed",
121123
rateLimit: true,
122124
},
125+
{
126+
involvedObject: corev1.ObjectReference{
127+
APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1",
128+
Kind: "Kustomization",
129+
Name: "4",
130+
Namespace: "4",
131+
},
132+
severity: eventv1.EventSeverityInfo,
133+
message: "Health check passed",
134+
metadata: map[string]string{
135+
fmt.Sprintf("%s/%s", "kustomize.toolkit.fluxcd.io", eventv1.MetaRevisionKey): "rev1",
136+
},
137+
rateLimit: false,
138+
},
139+
{
140+
involvedObject: corev1.ObjectReference{
141+
APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1",
142+
Kind: "Kustomization",
143+
Name: "4",
144+
Namespace: "4",
145+
},
146+
severity: eventv1.EventSeverityInfo,
147+
message: "Health check passed",
148+
metadata: map[string]string{
149+
fmt.Sprintf("%s/%s", "kustomize.toolkit.fluxcd.io", eventv1.MetaRevisionKey): "rev1",
150+
},
151+
rateLimit: true,
152+
},
153+
{
154+
involvedObject: corev1.ObjectReference{
155+
APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1",
156+
Kind: "Kustomization",
157+
Name: "4",
158+
Namespace: "4",
159+
},
160+
severity: eventv1.EventSeverityInfo,
161+
message: "Health check passed",
162+
metadata: map[string]string{
163+
fmt.Sprintf("%s/%s", "kustomize.toolkit.fluxcd.io", eventv1.MetaRevisionKey): "rev2",
164+
},
165+
rateLimit: false,
166+
},
123167
}
124168
for i, tt := range tests {
125169
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
126-
event := eventv1.Event{
170+
event := &eventv1.Event{
127171
InvolvedObject: tt.involvedObject,
128172
Severity: tt.severity,
129173
Message: tt.message,
174+
Metadata: tt.metadata,
130175
}
176+
cleanupMetadata(event)
131177
eventData, err := json.Marshal(event)
132178
g.Expect(err).ShouldNot(gomega.HaveOccurred())
133179

134-
req := httptest.NewRequest("POST", "/", bytes.NewBuffer(eventData))
135-
g.Expect(err).ShouldNot(gomega.HaveOccurred())
136180
res := httptest.NewRecorder()
137-
handler.ServeHTTP(res, req)
181+
req := httptest.NewRequest("POST", "/", bytes.NewBuffer(eventData))
182+
ctxWithEvent := context.WithValue(req.Context(), eventContextKey{}, event)
183+
reqWithEvent := req.WithContext(ctxWithEvent)
184+
handler.ServeHTTP(res, reqWithEvent)
138185

139186
if tt.rateLimit {
140187
g.Expect(res.Code).Should(gomega.Equal(429))

0 commit comments

Comments
 (0)