Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func runCompactForTenant(
cf := baseMetaFetcher.NewMetaFetcher(
extprom.WrapRegistererWithPrefix("thanos_", reg), filters)
cf.UpdateOnChange(func(blocks []metadata.Meta, err error) {
api.SetLoaded(blocks, err)
api.SetLoadedForTenant(tenant, blocks, err)
})

// Still use blockViewerSyncBlockTimeout to retain original behavior before this upstream change:
Expand Down
58 changes: 58 additions & 0 deletions pkg/api/blocks/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@ type BlocksAPI struct {
globalBlocksInfo *BlocksInfo
loadedBlocksInfo *BlocksInfo

// Per-tenant loaded blocks for multi-tenant mode aggregation.
loadedBlocksByTenant map[string][]metadata.Meta
loadedRefreshedAt time.Time
loadedErr error

globalLock, loadedLock sync.Mutex
disableCORS bool
bkt objstore.Bucket
disableAdminOperations bool
label string
}

type BlocksInfo struct {
Expand Down Expand Up @@ -77,9 +83,11 @@ func NewBlocksAPI(logger log.Logger, disableCORS bool, label string, flagsMap ma
Blocks: []metadata.Meta{},
Label: label,
},
loadedBlocksByTenant: make(map[string][]metadata.Meta),
disableCORS: disableCORS,
bkt: bkt,
disableAdminOperations: disableAdminOperations,
label: label,
}
}

Expand Down Expand Up @@ -137,6 +145,38 @@ func (bapi *BlocksAPI) blocks(r *http.Request) (interface{}, []error, *api.ApiEr
bapi.loadedLock.Lock()
defer bapi.loadedLock.Unlock()

// If we have per-tenant blocks (multi-tenant mode), handle tenant filtering.
if len(bapi.loadedBlocksByTenant) > 0 {
tenantParam := r.URL.Query().Get("tenant")

// If a specific tenant is requested, return only that tenant's blocks.
if tenantParam != "" {
tenantBlocks, exists := bapi.loadedBlocksByTenant[tenantParam]
if !exists {
tenantBlocks = []metadata.Meta{}
}
return &BlocksInfo{
Label: bapi.label,
Blocks: tenantBlocks,
RefreshedAt: bapi.loadedRefreshedAt,
Err: bapi.loadedErr,
}, nil, nil, func() {}
}

// No tenant specified, so we aggregate all tenants' blocks.
var allBlocks []metadata.Meta
for _, tenantBlocks := range bapi.loadedBlocksByTenant {
allBlocks = append(allBlocks, tenantBlocks...)
}
return &BlocksInfo{
Label: bapi.label,
Blocks: allBlocks,
RefreshedAt: bapi.loadedRefreshedAt,
Err: bapi.loadedErr,
}, nil, nil, func() {}
}

// Fall back to single-tenant loadedBlocksInfo for backward compatibility.
return bapi.loadedBlocksInfo, nil, nil, func() {}
}

Expand Down Expand Up @@ -168,9 +208,27 @@ func (bapi *BlocksAPI) SetGlobal(blocks []metadata.Meta, err error) {
}

// SetLoaded updates the local blocks' metadata in the API.
// This is used for single-tenant mode backward compatibility.
func (bapi *BlocksAPI) SetLoaded(blocks []metadata.Meta, err error) {
bapi.loadedLock.Lock()
defer bapi.loadedLock.Unlock()

bapi.loadedBlocksInfo.set(blocks, err)
}

// SetLoadedForTenant updates the loaded blocks for a specific tenant.
// This is used in multi-tenant mode to aggregate blocks from all tenants.
func (bapi *BlocksAPI) SetLoadedForTenant(tenant string, blocks []metadata.Meta, err error) {
bapi.loadedLock.Lock()
defer bapi.loadedLock.Unlock()

if err != nil {
bapi.loadedErr = err
bapi.loadedRefreshedAt = time.Now()
return
}

bapi.loadedBlocksByTenant[tenant] = blocks
bapi.loadedRefreshedAt = time.Now()
bapi.loadedErr = nil
}
239 changes: 237 additions & 2 deletions pkg/api/blocks/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/oklog/ulid"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"
Expand Down Expand Up @@ -126,8 +127,10 @@ func TestMarkBlockEndpoint(t *testing.T) {
Blocks: []metadata.Meta{},
Label: "foo",
},
disableCORS: true,
bkt: bkt,
loadedBlocksByTenant: make(map[string][]metadata.Meta),
disableCORS: true,
bkt: bkt,
label: "foo",
}

var tests = []endpointTestCase{
Expand Down Expand Up @@ -186,3 +189,235 @@ func TestMarkBlockEndpoint(t *testing.T) {
_, err = os.Stat(file)
testutil.Ok(t, err)
}

func TestSetLoadedForTenant(t *testing.T) {
logger := log.NewNopLogger()
bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())

api := NewBlocksAPI(logger, true, "test-label", map[string]string{}, bkt)

// Create some test block metadata
block1 := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(1, nil),
MinTime: 0,
MaxTime: 1000,
},
}
block2 := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(2, nil),
MinTime: 1000,
MaxTime: 2000,
},
}
block3 := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(3, nil),
MinTime: 2000,
MaxTime: 3000,
},
}

// Test setting blocks for multiple tenants
api.SetLoadedForTenant("tenant-a", []metadata.Meta{block1, block2}, nil)
api.SetLoadedForTenant("tenant-b", []metadata.Meta{block3}, nil)

// Verify blocks are stored per tenant
testutil.Equals(t, 2, len(api.loadedBlocksByTenant["tenant-a"]))
testutil.Equals(t, 1, len(api.loadedBlocksByTenant["tenant-b"]))

// Test single-tenant mode (empty string tenant)
api2 := NewBlocksAPI(logger, true, "test-label", map[string]string{}, bkt)
api2.SetLoadedForTenant("", []metadata.Meta{block1, block2, block3}, nil)
testutil.Equals(t, 3, len(api2.loadedBlocksByTenant[""]))
}

func TestBlocksEndpointMultiTenantAggregation(t *testing.T) {
logger := log.NewNopLogger()
bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())

api := NewBlocksAPI(logger, true, "test-label", map[string]string{}, bkt)

// Create test block metadata for different tenants
block1 := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(1, nil),
MinTime: 0,
MaxTime: 1000,
},
}
block2 := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(2, nil),
MinTime: 1000,
MaxTime: 2000,
},
}
block3 := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(3, nil),
MinTime: 2000,
MaxTime: 3000,
},
}

// Set blocks for multiple tenants
api.SetLoadedForTenant("tenant-a", []metadata.Meta{block1}, nil)
api.SetLoadedForTenant("tenant-b", []metadata.Meta{block2, block3}, nil)

// Create a request for the loaded view
req, err := http.NewRequest("GET", "http://example.com?view=loaded", nil)
testutil.Ok(t, err)

resp, _, apiErr, releaseResources := api.blocks(req)
defer releaseResources()

testutil.Equals(t, (*baseAPI.ApiError)(nil), apiErr)

blocksInfo, ok := resp.(*BlocksInfo)
testutil.Assert(t, ok, "response should be *BlocksInfo")
testutil.Equals(t, "test-label", blocksInfo.Label)
testutil.Equals(t, 3, len(blocksInfo.Blocks)) // Should aggregate all blocks from all tenants
}

func TestBlocksEndpointSingleTenantFallback(t *testing.T) {
logger := log.NewNopLogger()
bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())

api := NewBlocksAPI(logger, true, "test-label", map[string]string{}, bkt)

// Create test block metadata
block1 := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(1, nil),
MinTime: 0,
MaxTime: 1000,
},
}

// Use SetLoaded (single-tenant backward compatibility)
api.SetLoaded([]metadata.Meta{block1}, nil)

// Create a request for the loaded view
req, err := http.NewRequest("GET", "http://example.com?view=loaded", nil)
testutil.Ok(t, err)

resp, _, apiErr, releaseResources := api.blocks(req)
defer releaseResources()

testutil.Equals(t, (*baseAPI.ApiError)(nil), apiErr)

blocksInfo, ok := resp.(*BlocksInfo)
testutil.Assert(t, ok, "response should be *BlocksInfo")
testutil.Equals(t, 1, len(blocksInfo.Blocks)) // Should return blocks from loadedBlocksInfo
}

func TestBlocksEndpointGlobalView(t *testing.T) {
logger := log.NewNopLogger()
bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())

api := NewBlocksAPI(logger, true, "test-label", map[string]string{}, bkt)

// Create test block metadata
block1 := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(1, nil),
MinTime: 0,
MaxTime: 1000,
},
}
block2 := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(2, nil),
MinTime: 1000,
MaxTime: 2000,
},
}

// Set global blocks
api.SetGlobal([]metadata.Meta{block1, block2}, nil)

// Create a request for the global view (no view param)
req, err := http.NewRequest("GET", "http://example.com", nil)
testutil.Ok(t, err)

resp, _, apiErr, releaseResources := api.blocks(req)
defer releaseResources()

testutil.Equals(t, (*baseAPI.ApiError)(nil), apiErr)

blocksInfo, ok := resp.(*BlocksInfo)
testutil.Assert(t, ok, "response should be *BlocksInfo")
testutil.Equals(t, 2, len(blocksInfo.Blocks))
}

func TestBlocksEndpointTenantFilter(t *testing.T) {
logger := log.NewNopLogger()
bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())

api := NewBlocksAPI(logger, true, "test-label", map[string]string{}, bkt)

// Create test block metadata for different tenants
block1 := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(1, nil),
MinTime: 0,
MaxTime: 1000,
},
}
block2 := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(2, nil),
MinTime: 1000,
MaxTime: 2000,
},
}
block3 := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(3, nil),
MinTime: 2000,
MaxTime: 3000,
},
}

// Set blocks for multiple tenants
api.SetLoadedForTenant("tenant-a", []metadata.Meta{block1}, nil)
api.SetLoadedForTenant("tenant-b", []metadata.Meta{block2, block3}, nil)

// Test filtering by tenant-a
req, err := http.NewRequest("GET", "http://example.com?view=loaded&tenant=tenant-a", nil)
testutil.Ok(t, err)

resp, _, apiErr, releaseResources := api.blocks(req)
testutil.Equals(t, (*baseAPI.ApiError)(nil), apiErr)

blocksInfo, ok := resp.(*BlocksInfo)
testutil.Assert(t, ok, "response should be *BlocksInfo")
testutil.Equals(t, 1, len(blocksInfo.Blocks)) // Only tenant-a blocks
releaseResources()

// Test filtering by tenant-b
req, err = http.NewRequest("GET", "http://example.com?view=loaded&tenant=tenant-b", nil)
testutil.Ok(t, err)

resp, _, apiErr, releaseResources = api.blocks(req)
testutil.Equals(t, (*baseAPI.ApiError)(nil), apiErr)

blocksInfo, ok = resp.(*BlocksInfo)
testutil.Assert(t, ok, "response should be *BlocksInfo")
testutil.Equals(t, 2, len(blocksInfo.Blocks)) // Only tenant-b blocks
releaseResources()

// Test filtering by non-existent tenant
req, err = http.NewRequest("GET", "http://example.com?view=loaded&tenant=tenant-c", nil)
testutil.Ok(t, err)

resp, _, apiErr, releaseResources = api.blocks(req)
testutil.Equals(t, (*baseAPI.ApiError)(nil), apiErr)

blocksInfo, ok = resp.(*BlocksInfo)
testutil.Assert(t, ok, "response should be *BlocksInfo")
testutil.Equals(t, 0, len(blocksInfo.Blocks)) // No blocks for non-existent tenant
releaseResources()
}
Loading