diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 9676a4a95ee..a911d07a09a 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -377,7 +377,7 @@ func runCompactForTenant( cf := baseMetaFetcher.NewMetaFetcher( extprom.WrapRegistererWithPrefix("thanos_", reg), filters) cf.UpdateOnChange(func(blocks []metadata.Meta, err error) { - api.SetLoaded(blocks, err) + api.SetLoadedForTenant(tenant, blocks, err) }) // Still use blockViewerSyncBlockTimeout to retain original behavior before this upstream change: diff --git a/pkg/api/blocks/v1.go b/pkg/api/blocks/v1.go index 91c05d85486..40a6e817c50 100644 --- a/pkg/api/blocks/v1.go +++ b/pkg/api/blocks/v1.go @@ -31,10 +31,16 @@ type BlocksAPI struct { globalBlocksInfo *BlocksInfo loadedBlocksInfo *BlocksInfo + // Per-tenant loaded blocks for multi-tenant mode aggregation. + loadedBlocksByTenant map[string][]metadata.Meta + loadedRefreshedAt time.Time + loadedErr error + globalLock, loadedLock sync.Mutex disableCORS bool bkt objstore.Bucket disableAdminOperations bool + label string } type BlocksInfo struct { @@ -77,9 +83,11 @@ func NewBlocksAPI(logger log.Logger, disableCORS bool, label string, flagsMap ma Blocks: []metadata.Meta{}, Label: label, }, + loadedBlocksByTenant: make(map[string][]metadata.Meta), disableCORS: disableCORS, bkt: bkt, disableAdminOperations: disableAdminOperations, + label: label, } } @@ -137,6 +145,38 @@ func (bapi *BlocksAPI) blocks(r *http.Request) (interface{}, []error, *api.ApiEr bapi.loadedLock.Lock() defer bapi.loadedLock.Unlock() + // If we have per-tenant blocks (multi-tenant mode), handle tenant filtering. + if len(bapi.loadedBlocksByTenant) > 0 { + tenantParam := r.URL.Query().Get("tenant") + + // If a specific tenant is requested, return only that tenant's blocks. + if tenantParam != "" { + tenantBlocks, exists := bapi.loadedBlocksByTenant[tenantParam] + if !exists { + tenantBlocks = []metadata.Meta{} + } + return &BlocksInfo{ + Label: bapi.label, + Blocks: tenantBlocks, + RefreshedAt: bapi.loadedRefreshedAt, + Err: bapi.loadedErr, + }, nil, nil, func() {} + } + + // No tenant specified, so we aggregate all tenants' blocks. + var allBlocks []metadata.Meta + for _, tenantBlocks := range bapi.loadedBlocksByTenant { + allBlocks = append(allBlocks, tenantBlocks...) + } + return &BlocksInfo{ + Label: bapi.label, + Blocks: allBlocks, + RefreshedAt: bapi.loadedRefreshedAt, + Err: bapi.loadedErr, + }, nil, nil, func() {} + } + + // Fall back to single-tenant loadedBlocksInfo for backward compatibility. return bapi.loadedBlocksInfo, nil, nil, func() {} } @@ -168,9 +208,27 @@ func (bapi *BlocksAPI) SetGlobal(blocks []metadata.Meta, err error) { } // SetLoaded updates the local blocks' metadata in the API. +// This is used for single-tenant mode backward compatibility. func (bapi *BlocksAPI) SetLoaded(blocks []metadata.Meta, err error) { bapi.loadedLock.Lock() defer bapi.loadedLock.Unlock() bapi.loadedBlocksInfo.set(blocks, err) } + +// SetLoadedForTenant updates the loaded blocks for a specific tenant. +// This is used in multi-tenant mode to aggregate blocks from all tenants. +func (bapi *BlocksAPI) SetLoadedForTenant(tenant string, blocks []metadata.Meta, err error) { + bapi.loadedLock.Lock() + defer bapi.loadedLock.Unlock() + + if err != nil { + bapi.loadedErr = err + bapi.loadedRefreshedAt = time.Now() + return + } + + bapi.loadedBlocksByTenant[tenant] = blocks + bapi.loadedRefreshedAt = time.Now() + bapi.loadedErr = nil +} diff --git a/pkg/api/blocks/v1_test.go b/pkg/api/blocks/v1_test.go index 49e7ef0681e..a394d36f6b1 100644 --- a/pkg/api/blocks/v1_test.go +++ b/pkg/api/blocks/v1_test.go @@ -20,6 +20,7 @@ import ( "github.com/oklog/ulid" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/objstore" "github.com/efficientgo/core/testutil" @@ -126,8 +127,10 @@ func TestMarkBlockEndpoint(t *testing.T) { Blocks: []metadata.Meta{}, Label: "foo", }, - disableCORS: true, - bkt: bkt, + loadedBlocksByTenant: make(map[string][]metadata.Meta), + disableCORS: true, + bkt: bkt, + label: "foo", } var tests = []endpointTestCase{ @@ -186,3 +189,235 @@ func TestMarkBlockEndpoint(t *testing.T) { _, err = os.Stat(file) testutil.Ok(t, err) } + +func TestSetLoadedForTenant(t *testing.T) { + logger := log.NewNopLogger() + bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) + + api := NewBlocksAPI(logger, true, "test-label", map[string]string{}, bkt) + + // Create some test block metadata + block1 := metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(1, nil), + MinTime: 0, + MaxTime: 1000, + }, + } + block2 := metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(2, nil), + MinTime: 1000, + MaxTime: 2000, + }, + } + block3 := metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(3, nil), + MinTime: 2000, + MaxTime: 3000, + }, + } + + // Test setting blocks for multiple tenants + api.SetLoadedForTenant("tenant-a", []metadata.Meta{block1, block2}, nil) + api.SetLoadedForTenant("tenant-b", []metadata.Meta{block3}, nil) + + // Verify blocks are stored per tenant + testutil.Equals(t, 2, len(api.loadedBlocksByTenant["tenant-a"])) + testutil.Equals(t, 1, len(api.loadedBlocksByTenant["tenant-b"])) + + // Test single-tenant mode (empty string tenant) + api2 := NewBlocksAPI(logger, true, "test-label", map[string]string{}, bkt) + api2.SetLoadedForTenant("", []metadata.Meta{block1, block2, block3}, nil) + testutil.Equals(t, 3, len(api2.loadedBlocksByTenant[""])) +} + +func TestBlocksEndpointMultiTenantAggregation(t *testing.T) { + logger := log.NewNopLogger() + bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) + + api := NewBlocksAPI(logger, true, "test-label", map[string]string{}, bkt) + + // Create test block metadata for different tenants + block1 := metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(1, nil), + MinTime: 0, + MaxTime: 1000, + }, + } + block2 := metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(2, nil), + MinTime: 1000, + MaxTime: 2000, + }, + } + block3 := metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(3, nil), + MinTime: 2000, + MaxTime: 3000, + }, + } + + // Set blocks for multiple tenants + api.SetLoadedForTenant("tenant-a", []metadata.Meta{block1}, nil) + api.SetLoadedForTenant("tenant-b", []metadata.Meta{block2, block3}, nil) + + // Create a request for the loaded view + req, err := http.NewRequest("GET", "http://example.com?view=loaded", nil) + testutil.Ok(t, err) + + resp, _, apiErr, releaseResources := api.blocks(req) + defer releaseResources() + + testutil.Equals(t, (*baseAPI.ApiError)(nil), apiErr) + + blocksInfo, ok := resp.(*BlocksInfo) + testutil.Assert(t, ok, "response should be *BlocksInfo") + testutil.Equals(t, "test-label", blocksInfo.Label) + testutil.Equals(t, 3, len(blocksInfo.Blocks)) // Should aggregate all blocks from all tenants +} + +func TestBlocksEndpointSingleTenantFallback(t *testing.T) { + logger := log.NewNopLogger() + bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) + + api := NewBlocksAPI(logger, true, "test-label", map[string]string{}, bkt) + + // Create test block metadata + block1 := metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(1, nil), + MinTime: 0, + MaxTime: 1000, + }, + } + + // Use SetLoaded (single-tenant backward compatibility) + api.SetLoaded([]metadata.Meta{block1}, nil) + + // Create a request for the loaded view + req, err := http.NewRequest("GET", "http://example.com?view=loaded", nil) + testutil.Ok(t, err) + + resp, _, apiErr, releaseResources := api.blocks(req) + defer releaseResources() + + testutil.Equals(t, (*baseAPI.ApiError)(nil), apiErr) + + blocksInfo, ok := resp.(*BlocksInfo) + testutil.Assert(t, ok, "response should be *BlocksInfo") + testutil.Equals(t, 1, len(blocksInfo.Blocks)) // Should return blocks from loadedBlocksInfo +} + +func TestBlocksEndpointGlobalView(t *testing.T) { + logger := log.NewNopLogger() + bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) + + api := NewBlocksAPI(logger, true, "test-label", map[string]string{}, bkt) + + // Create test block metadata + block1 := metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(1, nil), + MinTime: 0, + MaxTime: 1000, + }, + } + block2 := metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(2, nil), + MinTime: 1000, + MaxTime: 2000, + }, + } + + // Set global blocks + api.SetGlobal([]metadata.Meta{block1, block2}, nil) + + // Create a request for the global view (no view param) + req, err := http.NewRequest("GET", "http://example.com", nil) + testutil.Ok(t, err) + + resp, _, apiErr, releaseResources := api.blocks(req) + defer releaseResources() + + testutil.Equals(t, (*baseAPI.ApiError)(nil), apiErr) + + blocksInfo, ok := resp.(*BlocksInfo) + testutil.Assert(t, ok, "response should be *BlocksInfo") + testutil.Equals(t, 2, len(blocksInfo.Blocks)) +} + +func TestBlocksEndpointTenantFilter(t *testing.T) { + logger := log.NewNopLogger() + bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) + + api := NewBlocksAPI(logger, true, "test-label", map[string]string{}, bkt) + + // Create test block metadata for different tenants + block1 := metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(1, nil), + MinTime: 0, + MaxTime: 1000, + }, + } + block2 := metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(2, nil), + MinTime: 1000, + MaxTime: 2000, + }, + } + block3 := metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(3, nil), + MinTime: 2000, + MaxTime: 3000, + }, + } + + // Set blocks for multiple tenants + api.SetLoadedForTenant("tenant-a", []metadata.Meta{block1}, nil) + api.SetLoadedForTenant("tenant-b", []metadata.Meta{block2, block3}, nil) + + // Test filtering by tenant-a + req, err := http.NewRequest("GET", "http://example.com?view=loaded&tenant=tenant-a", nil) + testutil.Ok(t, err) + + resp, _, apiErr, releaseResources := api.blocks(req) + testutil.Equals(t, (*baseAPI.ApiError)(nil), apiErr) + + blocksInfo, ok := resp.(*BlocksInfo) + testutil.Assert(t, ok, "response should be *BlocksInfo") + testutil.Equals(t, 1, len(blocksInfo.Blocks)) // Only tenant-a blocks + releaseResources() + + // Test filtering by tenant-b + req, err = http.NewRequest("GET", "http://example.com?view=loaded&tenant=tenant-b", nil) + testutil.Ok(t, err) + + resp, _, apiErr, releaseResources = api.blocks(req) + testutil.Equals(t, (*baseAPI.ApiError)(nil), apiErr) + + blocksInfo, ok = resp.(*BlocksInfo) + testutil.Assert(t, ok, "response should be *BlocksInfo") + testutil.Equals(t, 2, len(blocksInfo.Blocks)) // Only tenant-b blocks + releaseResources() + + // Test filtering by non-existent tenant + req, err = http.NewRequest("GET", "http://example.com?view=loaded&tenant=tenant-c", nil) + testutil.Ok(t, err) + + resp, _, apiErr, releaseResources = api.blocks(req) + testutil.Equals(t, (*baseAPI.ApiError)(nil), apiErr) + + blocksInfo, ok = resp.(*BlocksInfo) + testutil.Assert(t, ok, "response should be *BlocksInfo") + testutil.Equals(t, 0, len(blocksInfo.Blocks)) // No blocks for non-existent tenant + releaseResources() +}