From bfda26aa7cd5f194185a2a72ed7a911c10eb543f Mon Sep 17 00:00:00 2001 From: Lionel Fleury Date: Tue, 23 Dec 2025 11:11:51 +0200 Subject: [PATCH 1/3] fix(datastore): fail if too many results Fixes #1120 --- Makefile | 4 +- pkg/scd/store/cockroach/subscriptions.go | 14 ++--- pkg/scd/store/cockroach/subscriptions_test.go | 51 +++++++++++++++++++ 3 files changed, 60 insertions(+), 9 deletions(-) diff --git a/Makefile b/Makefile index fdff146fc..d5a7d5ff2 100644 --- a/Makefile +++ b/Makefile @@ -114,7 +114,7 @@ test-go-units: .PHONY: test-go-units-crdb test-go-units-crdb: cleanup-test-go-units-crdb - @docker run -d --name dss-crdb-for-testing -p 26257:26257 -p 8080:8080 cockroachdb/cockroach:v24.1.3 start-single-node --insecure > /dev/null + @docker run -d --name dss-crdb-for-testing -p 26257:26257 -p 8080:8080 --rm cockroachdb/cockroach:v24.1.3 start-single-node --insecure > /dev/null @until [ -n "`docker logs dss-crdb-for-testing | grep 'nodeID'`" ]; do echo "Waiting for CRDB to be ready"; sleep 3; done; go run ./cmds/db-manager/main.go migrate --schemas_dir ./build/db_schemas/rid --db_version latest --cockroach_host localhost go run ./cmds/db-manager/main.go migrate --schemas_dir ./build/db_schemas/scd --db_version latest --cockroach_host localhost @@ -124,12 +124,10 @@ test-go-units-crdb: cleanup-test-go-units-crdb go test -cover -count=1 -v ./pkg/scd/store/cockroach --cockroach_host localhost --cockroach_port 26257 --cockroach_ssl_mode disable --cockroach_user root --cockroach_db_name scd -test.gocoverdir=$(COVERDATA_DIR) go test -cover -count=1 -v ./pkg/aux_/store/datastore --cockroach_host localhost --cockroach_port 26257 --cockroach_ssl_mode disable --cockroach_user root --cockroach_db_name aux -test.gocoverdir=$(COVERDATA_DIR) @docker stop dss-crdb-for-testing > /dev/null - @docker rm dss-crdb-for-testing > /dev/null .PHONY: cleanup-test-go-units-crdb cleanup-test-go-units-crdb: @docker stop dss-crdb-for-testing > /dev/null 2>&1 || true - @docker rm dss-crdb-for-testing > /dev/null 2>&1 || true .PHONY: build-dss build-dss: diff --git a/pkg/scd/store/cockroach/subscriptions.go b/pkg/scd/store/cockroach/subscriptions.go index 53c3d710e..b37f0322b 100644 --- a/pkg/scd/store/cockroach/subscriptions.go +++ b/pkg/scd/store/cockroach/subscriptions.go @@ -102,6 +102,7 @@ func (c *repo) fetchSubscriptions(ctx context.Context, q dsssql.Queryable, query s = new(scdmodels.Subscription) updatedAt time.Time version int + count int ) err = rows.Scan( &s.ID, @@ -116,14 +117,15 @@ func (c *repo) fetchSubscriptions(ctx context.Context, q dsssql.Queryable, query &s.EndTime, &cids, &updatedAt, + &count, ) if err != nil { return nil, stacktrace.Propagate(err, "Error scanning Subscription row") } - s.Version = scdmodels.NewOVNFromTime(updatedAt, s.ID.String()) - if err != nil { - return nil, stacktrace.Propagate(err, "Error generating Subscription version") + if count >= dssmodels.MaxResultLimit { + return nil, stacktrace.NewError("Query returned %d subscriptions which exceeds the maximum allowed %d", count, dssmodels.MaxResultLimit) } + s.Version = scdmodels.NewOVNFromTime(updatedAt, s.ID.String()) s.SetCells(cids) payload = append(payload, s) } @@ -205,7 +207,7 @@ func (c *repo) pushSubscription(ctx context.Context, q dsssql.Queryable, s *scdm cells = $11, updated_at = transaction_timestamp() RETURNING - %s`, + %s,1 -- placeholder for count`, subscriptionFieldsWithoutPrefix, subscriptionFieldsWithPrefix, ) @@ -300,7 +302,7 @@ func (c *repo) SearchSubscriptions(ctx context.Context, v4d *dssmodels.Volume4D) var ( query = fmt.Sprintf(` SELECT - %s + %s,COUNT(*) OVER() FROM scd_subscriptions WHERE @@ -403,7 +405,7 @@ func (c *repo) LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion) func (c *repo) ListExpiredSubscriptions(ctx context.Context, threshold time.Time) ([]*scdmodels.Subscription, error) { expiredSubsQuery := fmt.Sprintf(` SELECT - %s + %s,COUNT(*) OVER() FROM scd_subscriptions WHERE diff --git a/pkg/scd/store/cockroach/subscriptions_test.go b/pkg/scd/store/cockroach/subscriptions_test.go index cd4a11d97..870ab288d 100644 --- a/pkg/scd/store/cockroach/subscriptions_test.go +++ b/pkg/scd/store/cockroach/subscriptions_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/interuss/dss/pkg/models" scdmodels "github.com/interuss/dss/pkg/scd/models" "github.com/stretchr/testify/require" @@ -115,3 +116,53 @@ func TestListExpiredSubscriptions(t *testing.T) { }) } } + +func TestListExpiredSubscriptionsMaxLimit(t *testing.T) { + var ( + ctx = context.Background() + store, tearDownStore = setUpStore(ctx, t) + ) + require.NotNil(t, store) + defer tearDownStore() + + r, err := store.Interact(ctx) + require.NoError(t, err) + + for range models.MaxResultLimit + 1 { + id := uuid.New() + subID := models.ID(id.String()) + sub := &scdmodels.Subscription{ + ID: subID, + NotificationIndex: 1, + Manager: "unittest", + StartTime: &start1, + EndTime: &end1, + USSBaseURL: "https://dummy.uss", + NotifyForOperationalIntents: true, + NotifyForConstraints: false, + ImplicitSubscription: true, + Cells: cells, + } + _, err = r.UpsertSubscription(ctx, sub) + } + + testCases := []struct { + name string + timeRef time.Time + ttl time.Duration + // expired []models.ID + }{{ + name: "all expired", + timeRef: time.Date(2024, time.December, 15, 15, 0, 0, 0, time.UTC), + ttl: time.Hour * 24 * 30, + // expired: []models.ID{sub1ID, sub2ID, sub3ID}, + }} + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + threshold := testCase.timeRef.Add(-testCase.ttl) + _, err := r.ListExpiredSubscriptions(ctx, threshold) + require.Error(t, err) + require.ErrorContainsf(t, err, "Query returned", "%d subscriptions which exceeds the maximum allowed %d", models.MaxResultLimit+1, models.MaxResultLimit) + }) + } +} From 13aff5a9d70a862c54cf107a0fa02e11b9cb36d7 Mon Sep 17 00:00:00 2001 From: Lionel Fleury Date: Tue, 23 Dec 2025 13:05:15 +0200 Subject: [PATCH 2/3] adapt --- Makefile | 4 +- .../cockroach/identification_service_area.go | 53 +++++++++++-------- pkg/rid/store/cockroach/subscriptions.go | 39 +++++++++----- pkg/scd/store/cockroach/constraints.go | 20 +++---- .../store/cockroach/operational_intents.go | 19 ++++--- pkg/scd/store/cockroach/subscriptions.go | 15 ++---- pkg/scd/store/cockroach/subscriptions_test.go | 4 +- 7 files changed, 89 insertions(+), 65 deletions(-) diff --git a/Makefile b/Makefile index d5a7d5ff2..fdff146fc 100644 --- a/Makefile +++ b/Makefile @@ -114,7 +114,7 @@ test-go-units: .PHONY: test-go-units-crdb test-go-units-crdb: cleanup-test-go-units-crdb - @docker run -d --name dss-crdb-for-testing -p 26257:26257 -p 8080:8080 --rm cockroachdb/cockroach:v24.1.3 start-single-node --insecure > /dev/null + @docker run -d --name dss-crdb-for-testing -p 26257:26257 -p 8080:8080 cockroachdb/cockroach:v24.1.3 start-single-node --insecure > /dev/null @until [ -n "`docker logs dss-crdb-for-testing | grep 'nodeID'`" ]; do echo "Waiting for CRDB to be ready"; sleep 3; done; go run ./cmds/db-manager/main.go migrate --schemas_dir ./build/db_schemas/rid --db_version latest --cockroach_host localhost go run ./cmds/db-manager/main.go migrate --schemas_dir ./build/db_schemas/scd --db_version latest --cockroach_host localhost @@ -124,10 +124,12 @@ test-go-units-crdb: cleanup-test-go-units-crdb go test -cover -count=1 -v ./pkg/scd/store/cockroach --cockroach_host localhost --cockroach_port 26257 --cockroach_ssl_mode disable --cockroach_user root --cockroach_db_name scd -test.gocoverdir=$(COVERDATA_DIR) go test -cover -count=1 -v ./pkg/aux_/store/datastore --cockroach_host localhost --cockroach_port 26257 --cockroach_ssl_mode disable --cockroach_user root --cockroach_db_name aux -test.gocoverdir=$(COVERDATA_DIR) @docker stop dss-crdb-for-testing > /dev/null + @docker rm dss-crdb-for-testing > /dev/null .PHONY: cleanup-test-go-units-crdb cleanup-test-go-units-crdb: @docker stop dss-crdb-for-testing > /dev/null 2>&1 || true + @docker rm dss-crdb-for-testing > /dev/null 2>&1 || true .PHONY: build-dss build-dss: diff --git a/pkg/rid/store/cockroach/identification_service_area.go b/pkg/rid/store/cockroach/identification_service_area.go index 75f423d17..bd4178ee8 100644 --- a/pkg/rid/store/cockroach/identification_service_area.go +++ b/pkg/rid/store/cockroach/identification_service_area.go @@ -35,7 +35,10 @@ func (r *repo) fetchISAs(ctx context.Context, query string, args ...interface{}) for rows.Next() { i := new(ridmodels.IdentificationServiceArea) - var updateTime time.Time + var ( + updateTime time.Time + count int + ) err := rows.Scan( &i.ID, @@ -46,10 +49,14 @@ func (r *repo) fetchISAs(ctx context.Context, query string, args ...interface{}) &i.EndTime, &writer, &updateTime, + &count, ) if err != nil { return nil, stacktrace.Propagate(err, "Error scanning ISA row") } + if count > dssmodels.MaxResultLimit { + return nil, stacktrace.NewError("Query returned %d ISAs which exceeds the maximum allowed %d", count, dssmodels.MaxResultLimit) + } i.Writer = writer.String i.SetCells(cids) i.Version = dssmodels.VersionFromTime(updateTime) @@ -80,7 +87,9 @@ func (r *repo) fetchISA(ctx context.Context, query string, args ...interface{}) // Returns nil, nil if not found func (r *repo) GetISA(ctx context.Context, id dssmodels.ID, forUpdate bool) (*ridmodels.IdentificationServiceArea, error) { var query = fmt.Sprintf(` - SELECT %s FROM + SELECT + %s,1 -- placeholder for count + FROM identification_service_areas WHERE id = $1 @@ -108,7 +117,8 @@ func (r *repo) InsertISA(ctx context.Context, isa *ridmodels.IdentificationServi VALUES ($1, $2, $3, $4, $5, $6, $7, transaction_timestamp()) RETURNING - %s`, isaFields, isaFields) + %s,1 -- placeholder for count + `, isaFields, isaFields) ) cids := make([]int64, len(isa.Cells)) @@ -143,7 +153,8 @@ func (r *repo) UpdateISA(ctx context.Context, isa *ridmodels.IdentificationServi SET (%s) = ($1, $2, $3, $4, $5, $7, transaction_timestamp()) WHERE id = $1 AND updated_at = $6 RETURNING - %s`, updateISAFields, isaFields) + %s,1 -- placeholder for count + `, updateISAFields, isaFields) ) cids, err := dssql.CellUnionToCellIdsWithValidation(isa.Cells) @@ -169,7 +180,9 @@ func (r *repo) DeleteISA(ctx context.Context, isa *ridmodels.IdentificationServi id = $1 AND updated_at = $2 - RETURNING %s`, isaFields) + RETURNING + %s,1 -- placeholder for count + `, isaFields) ) id, err := isa.ID.PgUUID() if err != nil { @@ -187,7 +200,7 @@ func (r *repo) SearchISAs(ctx context.Context, cells s2.CellUnion, earliest *tim // Make them real values (not pointers), on the model layer. isasInCellsQuery = fmt.Sprintf(` SELECT - %s + %s, COUNT(*) OVER() -- placeholder for count FROM identification_service_areas WHERE @@ -196,7 +209,7 @@ func (r *repo) SearchISAs(ctx context.Context, cells s2.CellUnion, earliest *tim COALESCE(starts_at <= $2, true) AND cells && $3 - LIMIT $4`, isaFields) + `, isaFields) ) if len(cells) == 0 { @@ -207,7 +220,7 @@ func (r *repo) SearchISAs(ctx context.Context, cells s2.CellUnion, earliest *tim return nil, stacktrace.NewError("Earliest start time is missing") } - return r.fetchISAs(ctx, isasInCellsQuery, earliest, latest, dssql.CellUnionToCellIds(cells), dssmodels.MaxResultLimit) + return r.fetchISAs(ctx, isasInCellsQuery, earliest, latest, dssql.CellUnionToCellIds(cells)) } // ListExpiredISAs lists all expired ISAs based on writer. @@ -219,18 +232,16 @@ func (r *repo) ListExpiredISAs(ctx context.Context, writer string) ([]*ridmodels writerQuery = "'' OR writer = NULL" } - var ( - isasInCellsQuery = fmt.Sprintf(` - SELECT - %s - FROM - identification_service_areas - WHERE - ends_at + INTERVAL '%d' MINUTE <= CURRENT_TIMESTAMP - AND - (writer = %s) - LIMIT $1`, isaFields, expiredDurationInMin, writerQuery) - ) + var isasInCellsQuery = fmt.Sprintf(` + SELECT + %s, COUNT(*) OVER() -- placeholder for count + FROM + identification_service_areas + WHERE + ends_at + INTERVAL '%d' MINUTE <= CURRENT_TIMESTAMP + AND + (writer = %s) + `, isaFields, expiredDurationInMin, writerQuery) - return r.fetchISAs(ctx, isasInCellsQuery, dssmodels.MaxResultLimit) + return r.fetchISAs(ctx, isasInCellsQuery) } diff --git a/pkg/rid/store/cockroach/subscriptions.go b/pkg/rid/store/cockroach/subscriptions.go index ab148966c..71ce5ff5e 100644 --- a/pkg/rid/store/cockroach/subscriptions.go +++ b/pkg/rid/store/cockroach/subscriptions.go @@ -35,7 +35,10 @@ func (r *repo) process(ctx context.Context, query string, args ...interface{}) ( for rows.Next() { s := new(ridmodels.Subscription) - var updateTime time.Time + var ( + updateTime time.Time + count int + ) err := rows.Scan( &s.ID, @@ -47,12 +50,15 @@ func (r *repo) process(ctx context.Context, query string, args ...interface{}) ( &s.EndTime, &writer, &updateTime, + &count, ) if err != nil { return nil, stacktrace.Propagate(err, "Error scanning Subscription row") } + if count > dssmodels.MaxResultLimit { + return nil, stacktrace.NewError("Query returned %d subscriptions which exceeds the maximum allowed %d", count, dssmodels.MaxResultLimit) + } s.Writer = writer.String - s.SetCells(cids) s.Version = dssmodels.VersionFromTime(updateTime) payload = append(payload, s) @@ -113,7 +119,8 @@ func (r *repo) MaxSubscriptionCountInCellsByOwner(ctx context.Context, cells s2. func (r *repo) GetSubscription(ctx context.Context, id dssmodels.ID) (*ridmodels.Subscription, error) { // TODO(steeling) we should enforce startTime and endTime to not be null at the DB level. var query = fmt.Sprintf(` - SELECT %s FROM subscriptions + SELECT %s, COUNT(*) OVER() -- placeholder for count + FROM subscriptions WHERE id = $1`, subscriptionFields) uid, err := id.PgUUID() if err != nil { @@ -132,7 +139,8 @@ func (r *repo) UpdateSubscription(ctx context.Context, s *ridmodels.Subscription SET (%s) = ($1, $2, $3, $4, $5, $6, $7, transaction_timestamp()) WHERE id = $1 AND updated_at = $8 RETURNING - %s`, updateSubscriptionFields, subscriptionFields) + %s,1 -- placeholder for count + `, updateSubscriptionFields, subscriptionFields) ) cids, err := dssql.CellUnionToCellIdsWithValidation(s.Cells) @@ -167,7 +175,8 @@ func (r *repo) InsertSubscription(ctx context.Context, s *ridmodels.Subscription VALUES ($1, $2, $3, $4, $5, $6, $7, $8, transaction_timestamp()) RETURNING - %s`, subscriptionFields, subscriptionFields) + %s,1 -- placeholder for count + `, subscriptionFields, subscriptionFields) ) cids, err := dssql.CellUnionToCellIdsWithValidation(s.Cells) @@ -202,7 +211,8 @@ func (r *repo) DeleteSubscription(ctx context.Context, s *ridmodels.Subscription WHERE id = $1 AND updated_at = $2 - RETURNING %s`, subscriptionFields) + RETURNING %s,1 -- placeholder for count + `, subscriptionFields) ) id, err := s.ID.PgUUID() if err != nil { @@ -219,7 +229,8 @@ func (r *repo) UpdateNotificationIdxsInCells(ctx context.Context, cells s2.CellU WHERE cells && $1 AND ends_at >= $2 - RETURNING %s`, subscriptionFields) + RETURNING %s,1 -- placeholder for count + `, subscriptionFields) return r.process( ctx, updateQuery, dssql.CellUnionToCellIds(cells), r.clock.Now()) @@ -230,21 +241,21 @@ func (r *repo) SearchSubscriptions(ctx context.Context, cells s2.CellUnion) ([]* var ( query = fmt.Sprintf(` SELECT - %s + %s, COUNT(*) OVER() -- placeholder for count FROM subscriptions WHERE cells && $1 AND ends_at >= $2 - LIMIT $3`, subscriptionFields) + `, subscriptionFields) ) if len(cells) == 0 { return nil, stacktrace.NewErrorWithCode(dsserr.BadRequest, "no location provided") } - return r.process(ctx, query, dssql.CellUnionToCellIds(cells), r.clock.Now(), dssmodels.MaxResultLimit) + return r.process(ctx, query, dssql.CellUnionToCellIds(cells), r.clock.Now()) } // SearchSubscriptionsByOwner returns all subscriptions in "cells". @@ -252,7 +263,7 @@ func (r *repo) SearchSubscriptionsByOwner(ctx context.Context, cells s2.CellUnio var ( query = fmt.Sprintf(` SELECT - %s + %s, COUNT(*) OVER() -- placeholder for count FROM subscriptions WHERE @@ -261,14 +272,14 @@ func (r *repo) SearchSubscriptionsByOwner(ctx context.Context, cells s2.CellUnio subscriptions.owner = $2 AND ends_at >= $3 - LIMIT $4`, subscriptionFields) + `, subscriptionFields) ) if len(cells) == 0 { return nil, stacktrace.NewErrorWithCode(dsserr.BadRequest, "no location provided") } - return r.process(ctx, query, dssql.CellUnionToCellIds(cells), owner, r.clock.Now(), dssmodels.MaxResultLimit) + return r.process(ctx, query, dssql.CellUnionToCellIds(cells), owner, r.clock.Now()) } // ListExpiredSubscriptions lists all expired Subscriptions based on writer. @@ -283,7 +294,7 @@ func (r *repo) ListExpiredSubscriptions(ctx context.Context, writer string) ([]* var ( query = fmt.Sprintf(` SELECT - %s + %s, COUNT(*) OVER() -- placeholder for count FROM subscriptions WHERE diff --git a/pkg/scd/store/cockroach/constraints.go b/pkg/scd/store/cockroach/constraints.go index 5fcc68aa2..878131db3 100644 --- a/pkg/scd/store/cockroach/constraints.go +++ b/pkg/scd/store/cockroach/constraints.go @@ -65,6 +65,7 @@ func (c *repo) fetchConstraints(ctx context.Context, q dsssql.Queryable, query s var ( c = new(scdmodels.Constraint) updatedAt time.Time + count int ) err := rows.Scan( &c.ID, @@ -77,10 +78,14 @@ func (c *repo) fetchConstraints(ctx context.Context, q dsssql.Queryable, query s &c.EndTime, &cids, &updatedAt, + &count, ) if err != nil { return nil, stacktrace.Propagate(err, "Error scanning Constraint row") } + if count > dssmodels.MaxResultLimit { + return nil, stacktrace.NewError("Query returned %d Constraints which exceeds the maximum allowed %d", count, dssmodels.MaxResultLimit) + } c.Cells = geo.CellUnionFromInt64(cids) c.OVN = scdmodels.NewOVNFromTime(updatedAt, c.ID.String()) payload = append(payload, c) @@ -110,7 +115,7 @@ func (c *repo) GetConstraint(ctx context.Context, id dssmodels.ID) (*scdmodels.C var ( query = fmt.Sprintf(` SELECT - %s + %s, COUNT(*) OVER() -- placeholder for count FROM scd_constraints WHERE @@ -142,11 +147,9 @@ func (c *repo) UpsertConstraint(ctx context.Context, s *scdmodels.Constraint) (* ends_at = $8, cells = $9, updated_at = transaction_timestamp() - RETURNING %s - `, - constraintFieldsWithoutPrefix, - constraintFieldsWithPrefix, - ) + RETURNING + %s,1 -- placeholder for count + `, constraintFieldsWithoutPrefix, constraintFieldsWithPrefix) ) cids, err := dsssql.CellUnionToCellIdsWithValidation(s.Cells) @@ -207,7 +210,7 @@ func (c *repo) SearchConstraints(ctx context.Context, v4d *dssmodels.Volume4D) ( var ( query = fmt.Sprintf(` SELECT - %s + %s, COUNT(*) OVER() -- placeholder for count FROM scd_constraints WHERE @@ -216,7 +219,6 @@ func (c *repo) SearchConstraints(ctx context.Context, v4d *dssmodels.Volume4D) ( COALESCE(starts_at <= $3, true) AND COALESCE(ends_at >= $2, true) - LIMIT $4 `, constraintFieldsWithoutPrefix) ) @@ -232,7 +234,7 @@ func (c *repo) SearchConstraints(ctx context.Context, v4d *dssmodels.Volume4D) ( } constraints, err := c.fetchConstraints( - ctx, c.q, query, dsssql.CellUnionToCellIds(cells), v4d.StartTime, v4d.EndTime, dssmodels.MaxResultLimit) + ctx, c.q, query, dsssql.CellUnionToCellIds(cells), v4d.StartTime, v4d.EndTime) if err != nil { return nil, stacktrace.Propagate(err, "Error fetching Constraints") } diff --git a/pkg/scd/store/cockroach/operational_intents.go b/pkg/scd/store/cockroach/operational_intents.go index 9f73a3b4e..9f42d614e 100644 --- a/pkg/scd/store/cockroach/operational_intents.go +++ b/pkg/scd/store/cockroach/operational_intents.go @@ -72,6 +72,7 @@ func (s *repo) fetchOperationalIntents(ctx context.Context, q dsssql.Queryable, var ( o = &scdmodels.OperationalIntent{} updatedAt time.Time + count int ) err := rows.Scan( &o.ID, @@ -88,10 +89,14 @@ func (s *repo) fetchOperationalIntents(ctx context.Context, q dsssql.Queryable, &cids, &ussRequestedOVN, &pastOVNs, + &count, ) if err != nil { return nil, stacktrace.Propagate(err, "Error scanning Operation row") } + if count > dssmodels.MaxResultLimit { + return nil, stacktrace.NewError("Query returned %d Operations which exceeds the maximum allowed %d", count, dssmodels.MaxResultLimit) + } // If the managing USS has requested a specific OVN on this operational intent, it will be persisted in DB. // If not, a default DSS-generated OVN based on the last update time is used. @@ -149,7 +154,9 @@ func (s *repo) fetchOperationalIntent(ctx context.Context, q dsssql.Queryable, q func (s *repo) fetchOperationByID(ctx context.Context, q dsssql.Queryable, id dssmodels.ID) (*scdmodels.OperationalIntent, error) { query := fmt.Sprintf(` - SELECT %s FROM + SELECT + %s, COUNT(*) OVER() -- placeholder for count + FROM scd_operations WHERE id = $1`, operationFieldsWithoutPrefix) @@ -216,7 +223,7 @@ func (s *repo) UpsertOperationalIntent(ctx context.Context, operation *scdmodels uss_requested_ovn = $12, past_ovns = $13 RETURNING - %s`, + %s,1 -- placeholder for count`, operationFieldsWithoutPrefix, operationFieldsWithPrefix, ) @@ -292,7 +299,7 @@ func (s *repo) searchOperationalIntents(ctx context.Context, q dsssql.Queryable, COALESCE(scd_operations.ends_at >= $4, true) AND COALESCE(scd_operations.starts_at <= $5, true) - LIMIT $6`, operationFieldsWithPrefix) + `, operationFieldsWithPrefix) ) if v4d.SpatialVolume == nil || v4d.SpatialVolume.Footprint == nil { @@ -313,7 +320,6 @@ func (s *repo) searchOperationalIntents(ctx context.Context, q dsssql.Queryable, v4d.SpatialVolume.AltitudeHi, v4d.StartTime, v4d.EndTime, - dssmodels.MaxResultLimit, ) if err != nil { return nil, stacktrace.Propagate(err, "Error fetching Operations") @@ -364,19 +370,18 @@ func (s *repo) GetDependentOperationalIntents(ctx context.Context, subscriptionI func (s *repo) ListExpiredOperationalIntents(ctx context.Context, threshold time.Time) ([]*scdmodels.OperationalIntent, error) { expiredOpIntentsQuery := fmt.Sprintf(` SELECT - %s + %s, COUNT(*) OVER() -- placeholder for count FROM scd_operations WHERE scd_operations.ends_at IS NOT NULL AND scd_operations.ends_at <= $1 OR scd_operations.ends_at IS NULL AND scd_operations.updated_at <= $1 -- use last update time as reference if there is no end time - LIMIT $2`, operationFieldsWithPrefix) + `, operationFieldsWithPrefix) result, err := s.fetchOperationalIntents( ctx, s.q, expiredOpIntentsQuery, threshold, - dssmodels.MaxResultLimit, ) if err != nil { return nil, stacktrace.Propagate(err, "Error fetching Operations") diff --git a/pkg/scd/store/cockroach/subscriptions.go b/pkg/scd/store/cockroach/subscriptions.go index b37f0322b..189b2bbab 100644 --- a/pkg/scd/store/cockroach/subscriptions.go +++ b/pkg/scd/store/cockroach/subscriptions.go @@ -122,7 +122,7 @@ func (c *repo) fetchSubscriptions(ctx context.Context, q dsssql.Queryable, query if err != nil { return nil, stacktrace.Propagate(err, "Error scanning Subscription row") } - if count >= dssmodels.MaxResultLimit { + if count > dssmodels.MaxResultLimit { return nil, stacktrace.NewError("Query returned %d subscriptions which exceeds the maximum allowed %d", count, dssmodels.MaxResultLimit) } s.Version = scdmodels.NewOVNFromTime(updatedAt, s.ID.String()) @@ -311,7 +311,7 @@ func (c *repo) SearchSubscriptions(ctx context.Context, v4d *dssmodels.Volume4D) COALESCE(starts_at <= $3, true) AND COALESCE(ends_at >= $2, true) - LIMIT $4`, subscriptionFieldsWithPrefix) + `, subscriptionFieldsWithPrefix) ) // TODO: Lazily calculate & cache spatial covering so that it is only ever @@ -326,7 +326,7 @@ func (c *repo) SearchSubscriptions(ctx context.Context, v4d *dssmodels.Volume4D) } subscriptions, err := c.fetchSubscriptions( - ctx, c.q, query, dsssql.CellUnionToCellIds(cells), v4d.StartTime, v4d.EndTime, dssmodels.MaxResultLimit) + ctx, c.q, query, dsssql.CellUnionToCellIds(cells), v4d.StartTime, v4d.EndTime) if err != nil { return nil, stacktrace.Propagate(err, "Unable to fetch Subscriptions") } @@ -412,17 +412,12 @@ func (c *repo) ListExpiredSubscriptions(ctx context.Context, threshold time.Time scd_subscriptions.ends_at IS NOT NULL AND scd_subscriptions.ends_at <= $1 OR scd_subscriptions.ends_at IS NULL AND scd_subscriptions.updated_at <= $1 -- use last update time as reference if there is no end time - LIMIT $2`, subscriptionFieldsWithPrefix) + `, subscriptionFieldsWithPrefix) - subscriptions, err := c.fetchSubscriptions( - ctx, c.q, expiredSubsQuery, - threshold, - dssmodels.MaxResultLimit, - ) + subscriptions, err := c.fetchSubscriptions(ctx, c.q, expiredSubsQuery, threshold) if err != nil { return nil, stacktrace.Propagate(err, "Unable to fetch Subscriptions") } return subscriptions, nil - } diff --git a/pkg/scd/store/cockroach/subscriptions_test.go b/pkg/scd/store/cockroach/subscriptions_test.go index 870ab288d..8e5b471ac 100644 --- a/pkg/scd/store/cockroach/subscriptions_test.go +++ b/pkg/scd/store/cockroach/subscriptions_test.go @@ -150,12 +150,10 @@ func TestListExpiredSubscriptionsMaxLimit(t *testing.T) { name string timeRef time.Time ttl time.Duration - // expired []models.ID }{{ - name: "all expired", + name: "too many expired", timeRef: time.Date(2024, time.December, 15, 15, 0, 0, 0, time.UTC), ttl: time.Hour * 24 * 30, - // expired: []models.ID{sub1ID, sub2ID, sub3ID}, }} for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { From d6452d25ad43651094ffa8edb2faa5a0815bff09 Mon Sep 17 00:00:00 2001 From: Lionel Fleury Date: Tue, 23 Dec 2025 13:50:19 +0200 Subject: [PATCH 3/3] test --- pkg/scd/store/cockroach/subscriptions_test.go | 44 ++++++++++++------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/pkg/scd/store/cockroach/subscriptions_test.go b/pkg/scd/store/cockroach/subscriptions_test.go index 8e5b471ac..a2bdb208b 100644 --- a/pkg/scd/store/cockroach/subscriptions_test.go +++ b/pkg/scd/store/cockroach/subscriptions_test.go @@ -128,7 +128,7 @@ func TestListExpiredSubscriptionsMaxLimit(t *testing.T) { r, err := store.Interact(ctx) require.NoError(t, err) - for range models.MaxResultLimit + 1 { + for range models.MaxResultLimit { id := uuid.New() subID := models.ID(id.String()) sub := &scdmodels.Subscription{ @@ -144,23 +144,33 @@ func TestListExpiredSubscriptionsMaxLimit(t *testing.T) { Cells: cells, } _, err = r.UpsertSubscription(ctx, sub) + require.NoError(t, err) } - testCases := []struct { - name string - timeRef time.Time - ttl time.Duration - }{{ - name: "too many expired", - timeRef: time.Date(2024, time.December, 15, 15, 0, 0, 0, time.UTC), - ttl: time.Hour * 24 * 30, - }} - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - threshold := testCase.timeRef.Add(-testCase.ttl) - _, err := r.ListExpiredSubscriptions(ctx, threshold) - require.Error(t, err) - require.ErrorContainsf(t, err, "Query returned", "%d subscriptions which exceeds the maximum allowed %d", models.MaxResultLimit+1, models.MaxResultLimit) - }) + timeRef := time.Date(2024, time.December, 15, 15, 0, 0, 0, time.UTC) + ttl := time.Hour * 24 * 30 + threshold := timeRef.Add(-ttl) + _, err = r.ListExpiredSubscriptions(ctx, threshold) + require.NoError(t, err) + + // Insert one more to exceed the limit + id := uuid.New() + subID := models.ID(id.String()) + sub := &scdmodels.Subscription{ + ID: subID, + NotificationIndex: 1, + Manager: "unittest", + StartTime: &start1, + EndTime: &end1, + USSBaseURL: "https://dummy.uss", + NotifyForOperationalIntents: true, + NotifyForConstraints: false, + ImplicitSubscription: true, + Cells: cells, } + _, err = r.UpsertSubscription(ctx, sub) + require.NoError(t, err) + _, err = r.ListExpiredSubscriptions(ctx, threshold) + require.Error(t, err) + require.ErrorContainsf(t, err, "Query returned", "%d subscriptions which exceeds the maximum allowed %d", models.MaxResultLimit+1, models.MaxResultLimit) }