Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ const (
// 32 used to be the highest value allowed by strconv. The new value is 36,
// although changes to this will result in RMW errors.
versionBase = 32

// Set a max limit for the SELECT query result
MaxResultLimit = 10000
)

// Set a max limit for looping on results
// NOTE: var so it can be overridden in unit tests
var MaxResultLimit = 10000

// PgUUID converts an ID to a pgtype.UUID.
// If the ID this is called on is nil, nil will be returned
func (id *ID) PgUUID() (*pgtype.UUID, error) {
Expand Down
13 changes: 9 additions & 4 deletions pkg/rid/store/cockroach/identification_service_area.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (r *repo) fetchISAs(ctx context.Context, query string, args ...interface{})
var cids []int64

var writer pgtype.Text
var count int
for rows.Next() {
i := new(ridmodels.IdentificationServiceArea)

Expand All @@ -50,6 +51,10 @@ func (r *repo) fetchISAs(ctx context.Context, query string, args ...interface{})
if err != nil {
return nil, stacktrace.Propagate(err, "Error scanning ISA row")
}
count++
if count > dssmodels.MaxResultLimit {
return nil, stacktrace.NewError("Result set exceeded max limit of %d", dssmodels.MaxResultLimit)
}
i.Writer = writer.String
i.SetCells(cids)
i.Version = dssmodels.VersionFromTime(updateTime)
Expand Down Expand Up @@ -196,7 +201,7 @@ func (r *repo) SearchISAs(ctx context.Context, cells s2.CellUnion, earliest *tim
COALESCE(starts_at <= $2, true)
AND
cells && $3
LIMIT $4`, isaFields)
`, isaFields)
)

if len(cells) == 0 {
Expand All @@ -207,7 +212,7 @@ func (r *repo) SearchISAs(ctx context.Context, cells s2.CellUnion, earliest *tim
return nil, stacktrace.NewError("Earliest start time is missing")
}

return r.fetchISAs(ctx, isasInCellsQuery, earliest, latest, dssql.CellUnionToCellIds(cells), dssmodels.MaxResultLimit)
return r.fetchISAs(ctx, isasInCellsQuery, earliest, latest, dssql.CellUnionToCellIds(cells))
}

// ListExpiredISAs lists all expired ISAs based on writer.
Expand All @@ -229,8 +234,8 @@ func (r *repo) ListExpiredISAs(ctx context.Context, writer string) ([]*ridmodels
ends_at + INTERVAL '%d' MINUTE <= CURRENT_TIMESTAMP
AND
(writer = %s)
LIMIT $1`, isaFields, expiredDurationInMin, writerQuery)
`, isaFields, expiredDurationInMin, writerQuery)
)

return r.fetchISAs(ctx, isasInCellsQuery, dssmodels.MaxResultLimit)
return r.fetchISAs(ctx, isasInCellsQuery)
}
14 changes: 9 additions & 5 deletions pkg/rid/store/cockroach/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (r *repo) process(ctx context.Context, query string, args ...interface{}) (
var cids []int64

var writer pgtype.Text
var count int
for rows.Next() {
s := new(ridmodels.Subscription)

Expand All @@ -51,8 +52,11 @@ func (r *repo) process(ctx context.Context, query string, args ...interface{}) (
if err != nil {
return nil, stacktrace.Propagate(err, "Error scanning Subscription row")
}
count++
if count > dssmodels.MaxResultLimit {
return nil, stacktrace.NewError("Result set exceeded max limit of %d", dssmodels.MaxResultLimit)
}
s.Writer = writer.String

s.SetCells(cids)
s.Version = dssmodels.VersionFromTime(updateTime)
payload = append(payload, s)
Expand Down Expand Up @@ -237,14 +241,14 @@ func (r *repo) SearchSubscriptions(ctx context.Context, cells s2.CellUnion) ([]*
cells && $1
AND
ends_at >= $2
LIMIT $3`, subscriptionFields)
`, subscriptionFields)
)

if len(cells) == 0 {
return nil, stacktrace.NewErrorWithCode(dsserr.BadRequest, "no location provided")
}

return r.process(ctx, query, dssql.CellUnionToCellIds(cells), r.clock.Now(), dssmodels.MaxResultLimit)
return r.process(ctx, query, dssql.CellUnionToCellIds(cells), r.clock.Now())
}

// SearchSubscriptionsByOwner returns all subscriptions in "cells".
Expand All @@ -261,14 +265,14 @@ func (r *repo) SearchSubscriptionsByOwner(ctx context.Context, cells s2.CellUnio
subscriptions.owner = $2
AND
ends_at >= $3
LIMIT $4`, subscriptionFields)
`, subscriptionFields)
)

if len(cells) == 0 {
return nil, stacktrace.NewErrorWithCode(dsserr.BadRequest, "no location provided")
}

return r.process(ctx, query, dssql.CellUnionToCellIds(cells), owner, r.clock.Now(), dssmodels.MaxResultLimit)
return r.process(ctx, query, dssql.CellUnionToCellIds(cells), owner, r.clock.Now())
}

// ListExpiredSubscriptions lists all expired Subscriptions based on writer.
Expand Down
8 changes: 6 additions & 2 deletions pkg/scd/store/cockroach/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (c *repo) fetchConstraints(ctx context.Context, q dsssql.Queryable, query s

var payload []*scdmodels.Constraint
var cids []int64
var count int
for rows.Next() {
var (
c = new(scdmodels.Constraint)
Expand All @@ -81,6 +82,10 @@ func (c *repo) fetchConstraints(ctx context.Context, q dsssql.Queryable, query s
if err != nil {
return nil, stacktrace.Propagate(err, "Error scanning Constraint row")
}
count++
if count > dssmodels.MaxResultLimit {
return nil, stacktrace.NewError("Result set exceeded max limit of %d", dssmodels.MaxResultLimit)
}
c.Cells = geo.CellUnionFromInt64(cids)
c.OVN = scdmodels.NewOVNFromTime(updatedAt, c.ID.String())
payload = append(payload, c)
Expand Down Expand Up @@ -216,7 +221,6 @@ func (c *repo) SearchConstraints(ctx context.Context, v4d *dssmodels.Volume4D) (
COALESCE(starts_at <= $3, true)
AND
COALESCE(ends_at >= $2, true)
LIMIT $4
`, constraintFieldsWithoutPrefix)
)

Expand All @@ -232,7 +236,7 @@ func (c *repo) SearchConstraints(ctx context.Context, v4d *dssmodels.Volume4D) (
}

constraints, err := c.fetchConstraints(
ctx, c.q, query, dsssql.CellUnionToCellIds(cells), v4d.StartTime, v4d.EndTime, dssmodels.MaxResultLimit)
ctx, c.q, query, dsssql.CellUnionToCellIds(cells), v4d.StartTime, v4d.EndTime)
if err != nil {
return nil, stacktrace.Propagate(err, "Error fetching Constraints")
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/scd/store/cockroach/operational_intents.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (s *repo) fetchOperationalIntents(ctx context.Context, q dsssql.Queryable,
cids []int64
ussRequestedOVN pgtype.Text
pastOVNs []string
count int
)
ussAvailabilities := map[dssmodels.Manager]scdmodels.UssAvailabilityState{}
for rows.Next() {
Expand All @@ -92,6 +93,10 @@ func (s *repo) fetchOperationalIntents(ctx context.Context, q dsssql.Queryable,
if err != nil {
return nil, stacktrace.Propagate(err, "Error scanning Operation row")
}
count++
if count > dssmodels.MaxResultLimit {
return nil, stacktrace.NewError("Result set exceeded max limit of %d", dssmodels.MaxResultLimit)
}

// If the managing USS has requested a specific OVN on this operational intent, it will be persisted in DB.
// If not, a default DSS-generated OVN based on the last update time is used.
Expand Down Expand Up @@ -292,7 +297,7 @@ func (s *repo) searchOperationalIntents(ctx context.Context, q dsssql.Queryable,
COALESCE(scd_operations.ends_at >= $4, true)
AND
COALESCE(scd_operations.starts_at <= $5, true)
LIMIT $6`, operationFieldsWithPrefix)
`, operationFieldsWithPrefix)
)

if v4d.SpatialVolume == nil || v4d.SpatialVolume.Footprint == nil {
Expand All @@ -313,7 +318,6 @@ func (s *repo) searchOperationalIntents(ctx context.Context, q dsssql.Queryable,
v4d.SpatialVolume.AltitudeHi,
v4d.StartTime,
v4d.EndTime,
dssmodels.MaxResultLimit,
)
if err != nil {
return nil, stacktrace.Propagate(err, "Error fetching Operations")
Expand Down Expand Up @@ -371,12 +375,11 @@ func (s *repo) ListExpiredOperationalIntents(ctx context.Context, threshold time
scd_operations.ends_at IS NOT NULL AND scd_operations.ends_at <= $1
OR
scd_operations.ends_at IS NULL AND scd_operations.updated_at <= $1 -- use last update time as reference if there is no end time
LIMIT $2`, operationFieldsWithPrefix)
`, operationFieldsWithPrefix)

result, err := s.fetchOperationalIntents(
ctx, s.q, expiredOpIntentsQuery,
threshold,
dssmodels.MaxResultLimit,
)
if err != nil {
return nil, stacktrace.Propagate(err, "Error fetching Operations")
Expand Down
15 changes: 8 additions & 7 deletions pkg/scd/store/cockroach/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (c *repo) fetchSubscriptions(ctx context.Context, q dsssql.Queryable, query

var payload []*scdmodels.Subscription
var cids []int64
var count int
for rows.Next() {
var (
s = new(scdmodels.Subscription)
Expand All @@ -120,10 +121,11 @@ func (c *repo) fetchSubscriptions(ctx context.Context, q dsssql.Queryable, query
if err != nil {
return nil, stacktrace.Propagate(err, "Error scanning Subscription row")
}
s.Version = scdmodels.NewOVNFromTime(updatedAt, s.ID.String())
if err != nil {
return nil, stacktrace.Propagate(err, "Error generating Subscription version")
count++
if count > dssmodels.MaxResultLimit {
return nil, stacktrace.NewError("Result set exceeded max limit of %d", dssmodels.MaxResultLimit)
}
s.Version = scdmodels.NewOVNFromTime(updatedAt, s.ID.String())
s.SetCells(cids)
payload = append(payload, s)
}
Expand Down Expand Up @@ -309,7 +311,7 @@ func (c *repo) SearchSubscriptions(ctx context.Context, v4d *dssmodels.Volume4D)
COALESCE(starts_at <= $3, true)
AND
COALESCE(ends_at >= $2, true)
LIMIT $4`, subscriptionFieldsWithPrefix)
`, subscriptionFieldsWithPrefix)
)

// TODO: Lazily calculate & cache spatial covering so that it is only ever
Expand All @@ -324,7 +326,7 @@ func (c *repo) SearchSubscriptions(ctx context.Context, v4d *dssmodels.Volume4D)
}

subscriptions, err := c.fetchSubscriptions(
ctx, c.q, query, dsssql.CellUnionToCellIds(cells), v4d.StartTime, v4d.EndTime, dssmodels.MaxResultLimit)
ctx, c.q, query, dsssql.CellUnionToCellIds(cells), v4d.StartTime, v4d.EndTime)
if err != nil {
return nil, stacktrace.Propagate(err, "Unable to fetch Subscriptions")
}
Expand Down Expand Up @@ -410,12 +412,11 @@ func (c *repo) ListExpiredSubscriptions(ctx context.Context, threshold time.Time
scd_subscriptions.ends_at IS NOT NULL AND scd_subscriptions.ends_at <= $1
OR
scd_subscriptions.ends_at IS NULL AND scd_subscriptions.updated_at <= $1 -- use last update time as reference if there is no end time
LIMIT $2`, subscriptionFieldsWithPrefix)
`, subscriptionFieldsWithPrefix)

subscriptions, err := c.fetchSubscriptions(
ctx, c.q, expiredSubsQuery,
threshold,
dssmodels.MaxResultLimit,
)
if err != nil {
return nil, stacktrace.Propagate(err, "Unable to fetch Subscriptions")
Expand Down
61 changes: 61 additions & 0 deletions pkg/scd/store/cockroach/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/google/uuid"
"github.com/interuss/dss/pkg/models"
scdmodels "github.com/interuss/dss/pkg/scd/models"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -115,3 +116,63 @@ func TestListExpiredSubscriptions(t *testing.T) {
})
}
}

func TestListExpiredSubscriptionsMaxLimit(t *testing.T) {
// Set lower limit for testing
models.MaxResultLimit = 10
var (
ctx = context.Background()
store, tearDownStore = setUpStore(ctx, t)
)
require.NotNil(t, store)
defer tearDownStore()

r, err := store.Interact(ctx)
require.NoError(t, err)

for range models.MaxResultLimit {
id := uuid.New()
subID := models.ID(id.String())
sub := &scdmodels.Subscription{
ID: subID,
NotificationIndex: 1,
Manager: "unittest",
StartTime: &start1,
EndTime: &end1,
USSBaseURL: "https://dummy.uss",
NotifyForOperationalIntents: true,
NotifyForConstraints: false,
ImplicitSubscription: true,
Cells: cells,
}
_, err = r.UpsertSubscription(ctx, sub)
require.NoError(t, err)
}

timeRef := time.Date(2024, time.December, 15, 15, 0, 0, 0, time.UTC)
ttl := time.Hour * 24 * 30
threshold := timeRef.Add(-ttl)
_, err = r.ListExpiredSubscriptions(ctx, threshold)
require.NoError(t, err)

// Insert one more to exceed the limit
id := uuid.New()
subID := models.ID(id.String())
sub := &scdmodels.Subscription{
ID: subID,
NotificationIndex: 1,
Manager: "unittest",
StartTime: &start1,
EndTime: &end1,
USSBaseURL: "https://dummy.uss",
NotifyForOperationalIntents: true,
NotifyForConstraints: false,
ImplicitSubscription: true,
Cells: cells,
}
_, err = r.UpsertSubscription(ctx, sub)
require.NoError(t, err)
_, err = r.ListExpiredSubscriptions(ctx, threshold)
require.Error(t, err)
require.ErrorContainsf(t, err, "Result set exceeded max limit of", "%d", models.MaxResultLimit)
}