Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Fixed

- [#8378](https://github.com/thanos-io/thanos/pull/8378): Store: fix the reuse of dirty posting slices

### Added

### Changed
Expand Down
24 changes: 19 additions & 5 deletions pkg/compact/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ const (
// tenantRetentionRegex is the regex pattern for parsing tenant retention.
// valid format is `<tenant>:(<yyyy-mm-dd>|<duration>d)(:all)?` where <duration> > 0.
// Default behavior is to delete only level 1 blocks, use :all to delete all blocks.
tenantRetentionRegex = `^([\w-]+):((\d{4}-\d{2}-\d{2})|(\d+d))(:all)?$`
// Use `*` as tenant name to apply policy to all tenants (as a default/fallback).
// Specific tenant policies take precedence over the wildcard policy.
tenantRetentionRegex = `^([\w-]+|\*):((\d{4}-\d{2}-\d{2})|(\d+d))(:all)?$`
wildCardTenant = "*"

Level1 = 1 // compaction level 1 indicating a new block
Level2 = 2 // compaction level 2 indicating a compacted block
Expand Down Expand Up @@ -120,6 +123,8 @@ func ParesRetentionPolicyByTenant(logger log.Logger, retentionTenants []string)
}

// ApplyRetentionPolicyByTenant removes blocks depending on the specified retentionByTenant based on blocks MaxTime.
// The wildcard policy ("*") applies to all tenants as a default/fallback.
// Specific tenant policies take precedence over the wildcard policy.
func ApplyRetentionPolicyByTenant(
ctx context.Context,
logger log.Logger,
Expand All @@ -133,19 +138,28 @@ func ApplyRetentionPolicyByTenant(
}
level.Info(logger).Log("msg", "start tenant retention", "total", len(metas))
deleted, skipped, notExpired := 0, 0, 0
// Check if wildcard policy exists
wildcardPolicy, hasWildcard := retentionByTenant[wildCardTenant]
for id, m := range metas {
policy, ok := retentionByTenant[m.Thanos.GetTenant()]
tenant := m.Thanos.GetTenant()
// First try to find tenant-specific policy
policy, ok := retentionByTenant[tenant]
if !ok {
skipped++
continue
// Fallback to wildcard policy if tenant-specific policy not found
if hasWildcard {
policy = wildcardPolicy
} else {
skipped++
continue
}
}
maxTime := time.Unix(m.MaxTime/1000, 0)
// Default behavior: only delete level 1 blocks unless IsAll is true
if !policy.IsAll && m.Compaction.Level != Level1 {
continue
}
if policy.isExpired(maxTime) {
level.Info(logger).Log("msg", "deleting blocks applying retention policy", "id", id, "maxTime", maxTime.String())
level.Info(logger).Log("msg", "deleting blocks applying retention policy", "id", id, "tenant", tenant, "maxTime", maxTime.String())
if err := block.Delete(ctx, logger, bkt, id); err != nil {
level.Error(logger).Log("msg", "failed to delete block", "id", id, "err", err)
continue // continue to next block to clean up backlogs
Expand Down
207 changes: 207 additions & 0 deletions pkg/compact/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,47 @@ func TestParseRetentionPolicyByTenant(t *testing.T) {
nil,
true,
},
{
"wildcard tenant with duration",
[]string{"*:30d"},
map[string]compact.RetentionPolicy{
"*": {
CutoffDate: time.Time{},
RetentionDuration: 30 * 24 * time.Hour,
IsAll: false,
},
},
false,
},
{
"wildcard tenant with cutoff date and all flag",
[]string{"*:2024-01-01:all"},
map[string]compact.RetentionPolicy{
"*": {
CutoffDate: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
RetentionDuration: time.Duration(0),
IsAll: true,
},
},
false,
},
{
"wildcard with specific tenant override",
[]string{"*:90d", "tenant-special:30d:all"},
map[string]compact.RetentionPolicy{
"*": {
CutoffDate: time.Time{},
RetentionDuration: 90 * 24 * time.Hour,
IsAll: false,
},
"tenant-special": {
CutoffDate: time.Time{},
RetentionDuration: 30 * 24 * time.Hour,
IsAll: true,
},
},
false,
},
} {
t.Run(tt.name, func(t *testing.T) {
got, err := compact.ParesRetentionPolicyByTenant(log.NewNopLogger(), tt.retentionTenants)
Expand Down Expand Up @@ -573,6 +614,172 @@ func TestApplyRetentionPolicyByTenant(t *testing.T) {
},
false,
},
{
"wildcard tenant applies to all tenants",
[]testBlock{
{
"01CPHBEX20729MJQZXE3W0BW48",
"tenant-a",
time.Now().Add(-3 * 24 * time.Hour),
time.Now().Add(-2 * 24 * time.Hour),
compact.Level1,
},
{
"01CPHBEX20729MJQZXE3W0BW49",
"tenant-b",
time.Now().Add(-2 * 24 * time.Hour),
time.Now().Add(-24 * time.Hour),
compact.Level1,
},
{
"01CPHBEX20729MJQZXE3W0BW50",
"tenant-c",
time.Now().Add(-24 * time.Hour),
time.Now().Add(-23 * time.Hour),
compact.Level1,
},
{
"01CPHBEX20729MJQZXE3W0BW51",
"tenant-d",
time.Now().Add(-5 * time.Hour),
time.Now().Add(-4 * time.Hour),
compact.Level1,
},
},
map[string]compact.RetentionPolicy{
"*": {
CutoffDate: time.Time{},
RetentionDuration: 10 * time.Hour,
IsAll: false,
},
},
[]string{
"01CPHBEX20729MJQZXE3W0BW51/",
},
false,
},
{
"wildcard tenant with all flag applies to all levels",
[]testBlock{
{
"01CPHBEX20729MJQZXE3W0BW48",
"tenant-a",
time.Now().Add(-3 * 24 * time.Hour),
time.Now().Add(-2 * 24 * time.Hour),
compact.Level1,
},
{
"01CPHBEX20729MJQZXE3W0BW49",
"tenant-b",
time.Now().Add(-2 * 24 * time.Hour),
time.Now().Add(-24 * time.Hour),
compact.Level2,
},
{
"01CPHBEX20729MJQZXE3W0BW50",
"tenant-c",
time.Now().Add(-5 * time.Hour),
time.Now().Add(-4 * time.Hour),
compact.Level1,
},
},
map[string]compact.RetentionPolicy{
"*": {
CutoffDate: time.Time{},
RetentionDuration: 10 * time.Hour,
IsAll: true,
},
},
[]string{
"01CPHBEX20729MJQZXE3W0BW50/",
},
false,
},
{
"wildcard with specific tenant override - wildcard longer retention, specific shorter",
[]testBlock{
{
"01CPHBEX20729MJQZXE3W0BW48",
"tenant-a",
time.Now().Add(-50 * 24 * time.Hour),
time.Now().Add(-49 * 24 * time.Hour),
compact.Level1,
},
{
"01CPHBEX20729MJQZXE3W0BW49",
"tenant-cleanup",
time.Now().Add(-15 * 24 * time.Hour),
time.Now().Add(-14 * 24 * time.Hour),
compact.Level1,
},
{
"01CPHBEX20729MJQZXE3W0BW50",
"tenant-b",
time.Now().Add(-20 * 24 * time.Hour),
time.Now().Add(-19 * 24 * time.Hour),
compact.Level1,
},
{
"01CPHBEX20729MJQZXE3W0BW51",
"tenant-cleanup",
time.Now().Add(-5 * time.Hour),
time.Now().Add(-4 * time.Hour),
compact.Level1,
},
},
map[string]compact.RetentionPolicy{
"*": {
CutoffDate: time.Time{},
RetentionDuration: 30 * 24 * time.Hour, // 30 days for most tenants
IsAll: false,
},
"tenant-cleanup": {
CutoffDate: time.Time{},
RetentionDuration: 10 * 24 * time.Hour, // 10 days for cleanup tenant
IsAll: false,
},
},
[]string{
"01CPHBEX20729MJQZXE3W0BW50/",
"01CPHBEX20729MJQZXE3W0BW51/",
},
false,
},
{
"wildcard precedence - specific policy takes priority over wildcard",
[]testBlock{
{
"01CPHBEX20729MJQZXE3W0BW48",
"tenant-override",
time.Now().Add(-15 * 24 * time.Hour),
time.Now().Add(-14 * 24 * time.Hour),
compact.Level1,
},
{
"01CPHBEX20729MJQZXE3W0BW49",
"tenant-normal",
time.Now().Add(-15 * 24 * time.Hour),
time.Now().Add(-14 * 24 * time.Hour),
compact.Level1,
},
},
map[string]compact.RetentionPolicy{
"*": {
CutoffDate: time.Time{},
RetentionDuration: 10 * 24 * time.Hour, // 10 days wildcard
IsAll: false,
},
"tenant-override": {
CutoffDate: time.Time{},
RetentionDuration: 20 * 24 * time.Hour, // 20 days specific override
IsAll: false,
},
},
[]string{
"01CPHBEX20729MJQZXE3W0BW48/", // kept due to 20-day specific policy
},
false,
},
} {
t.Run(tt.name, func(t *testing.T) {
bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())
Expand Down
82 changes: 59 additions & 23 deletions pkg/queryfrontend/query_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type UserInfo struct {
Tenant string
ForwardedFor string
UserAgent string
Groups string
Email string
}

// ResponseStats holds statistics extracted from query response.
Expand Down Expand Up @@ -91,6 +93,10 @@ func ExtractUserInfoFromHeaders(headers []*RequestHeader) UserInfo {
if userInfo.Source == "" {
userInfo.Source = headerValue
}
case "x-auth-request-groups":
userInfo.Groups = headerValue
case "x-auth-request-email":
userInfo.Email = headerValue
}
}

Expand All @@ -102,29 +108,6 @@ func ExtractUserInfoFromHeaders(headers []*RequestHeader) UserInfo {
return userInfo
}

// ExtractEmailFromResponse extracts the email from response headers (works for both range and instant queries).
func ExtractEmailFromResponse(resp queryrange.Response) string {
if resp == nil {
return ""
}

// Check both response types using OR condition
var headers []*queryrange.PrometheusResponseHeader
if promResp, ok := resp.(*queryrange.PrometheusResponse); ok {
headers = promResp.GetHeaders()
} else if promResp, ok := resp.(*queryrange.PrometheusInstantQueryResponse); ok {
headers = promResp.GetHeaders()
}

for _, header := range headers {
if strings.ToLower(header.Name) == "x-auth-request-email" && len(header.Values) > 0 {
return header.Values[0]
}
}

return ""
}

// ConvertStoreMatchers converts internal store matchers to logging format.
func ConvertStoreMatchers(storeMatchers [][]*labels.Matcher) []StoreMatcherSet {
if len(storeMatchers) == 0 {
Expand Down Expand Up @@ -174,6 +157,59 @@ func GetResponseStats(resp queryrange.Response) ResponseStats {
return stats
}

// ExtractMetricNames extracts all unique __name__ labels from query response (works for both range and instant queries).
func ExtractMetricNames(resp queryrange.Response) []string {
if resp == nil {
return nil
}

metricNamesMap := make(map[string]struct{})

// Handle range query response (resultType: matrix)
if r, ok := resp.(*queryrange.PrometheusResponse); ok {
for _, stream := range r.Data.Result {
for _, label := range stream.Labels {
if label.Name == "__name__" {
metricNamesMap[label.Value] = struct{}{}
break
}
}
}
} else if r, ok := resp.(*queryrange.PrometheusInstantQueryResponse); ok {
// Handle instant query response - check all result types
if vector := r.Data.Result.GetVector(); vector != nil {
// resultType: vector
for _, sample := range vector.Samples {
for _, label := range sample.Labels {
if label.Name == "__name__" {
metricNamesMap[label.Value] = struct{}{}
break
}
}
}
} else if matrix := r.Data.Result.GetMatrix(); matrix != nil {
// resultType: matrix (subqueries in instant queries)
for _, stream := range matrix.SampleStreams {
for _, label := range stream.Labels {
if label.Name == "__name__" {
metricNamesMap[label.Value] = struct{}{}
break
}
}
}
}
// Scalar and StringSample don't have __name__ labels
}

// Convert map to slice
metricNames := make([]string, 0, len(metricNamesMap))
for name := range metricNamesMap {
metricNames = append(metricNames, name)
}

return metricNames
}

// WriteJSONLogToFile writes query logs to file in JSON format.
func WriteJSONLogToFile(logger log.Logger, writer interface{}, queryLog interface{}, queryType string) error {
if writer == nil {
Expand Down
Loading
Loading