diff --git a/bulker/ingest/config.go b/bulker/ingest/config.go index 118626bdf..8a29316e7 100644 --- a/bulker/ingest/config.go +++ b/bulker/ingest/config.go @@ -63,6 +63,13 @@ type Config struct { ShutdownTimeoutSec int `mapstructure:"SHUTDOWN_TIMEOUT_SEC" default:"10"` //Extra delay may be needed. E.g. for metric scrapper to scrape final metrics. So http server will stay active for an extra period. ShutdownExtraDelay int `mapstructure:"SHUTDOWN_EXTRA_DELAY_SEC" default:"5"` + + // # IP HEADERS + // Comma-separated list of HTTP headers to check for client IP address, in order of priority. + // Default: "X-Real-Ip,X-Forwarded-For" (current behavior) + // For Cloudflare: "CF-Connecting-IP,X-Real-Ip,X-Forwarded-For" + TrustedIPHeaders string `mapstructure:"TRUSTED_IP_HEADERS" default:"X-Real-Ip,X-Forwarded-For"` + TrustedIPHeadersList []string `mapstructure:"-"` } func init() { @@ -75,5 +82,6 @@ func (ac *Config) PostInit(settings *appbase.AppSettings) error { return err } ac.GlobalHashSecrets = strings.Split(ac.GlobalHashSecret, ",") + ac.TrustedIPHeadersList = strings.Split(ac.TrustedIPHeaders, ",") return nil } diff --git a/bulker/ingest/router.go b/bulker/ingest/router.go index 573499ea3..7d9aabbf7 100644 --- a/bulker/ingest/router.go +++ b/bulker/ingest/router.go @@ -43,6 +43,9 @@ var eventTypesSet = types.NewSet("page", "identify", "track", "group", "alias", var messageIdUnsupportedChars = regexp.MustCompile(`[^a-zA-Z0-9._-]`) +// trustedIPHeadersList is set during router initialization from config +var trustedIPHeadersList []string + type eventPatchFunc func(c *gin.Context, messageId string, event types.Json, tp string, ingestType IngestType, analyticContext types.Json, defaultEventName string) error type Router struct { @@ -146,6 +149,9 @@ func NewRouter(appContext *Context, partitionSelector kafkabase.PartitionSelecto dataHosts: dataHosts, partitionSelector: partitionSelector, } + // Initialize trusted IP headers from config + trustedIPHeadersList = appContext.config.TrustedIPHeadersList + base.Infof("Trusted IP headers: %v", trustedIPHeadersList) engine := router.Engine() // get global Monitor object m := ginmetrics.GetMonitor() @@ -319,7 +325,7 @@ func patchEvent(c *gin.Context, messageId string, ev types.Json, tp string, inge ev.SetIfAbsent("event", eventName) } } - ip := strings.TrimSpace(strings.Split(utils.NvlString(c.GetHeader("X-Real-Ip"), c.GetHeader("X-Forwarded-For"), c.ClientIP()), ",")[0]) + ip := getClientIP(c) ipPolicy := c.GetHeader("X-IP-Policy") switch ipPolicy { case "stripLastOctet": @@ -409,6 +415,17 @@ func ipStripLastOctet(ip string) string { return ip } +// getClientIP extracts client IP from request headers based on configured trusted headers list +func getClientIP(c *gin.Context) string { + for _, header := range trustedIPHeadersList { + if val := c.GetHeader(header); val != "" { + // Handle comma-separated values (e.g., X-Forwarded-For can contain multiple IPs) + return strings.TrimSpace(strings.Split(val, ",")[0]) + } + } + return c.ClientIP() +} + type SyncDestinationsResponse struct { Destinations []*SyncDestinationsData `json:"destinations,omitempty"` OK bool `json:"ok"`