Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions build/db_schemas/scd/upto-v3.3.0-add_locks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS scd_locks (
id INT64 PRIMARY KEY
);

INSERT INTO scd_locks (id) VALUES (0);

UPDATE schema_versions
SET schema_version = 'v3.3.0'
WHERE onerow_enforcer = TRUE;
7 changes: 7 additions & 0 deletions build/db_schemas/yugabyte/scd/upto-v1.1.0-add_locks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS scd_locks (
id BIGINT PRIMARY KEY
);

INSERT INTO scd_locks (id) VALUES (0);

UPDATE schema_versions set schema_version = 'v1.1.0' WHERE onerow_enforcer = TRUE;
5 changes: 4 additions & 1 deletion cmds/core-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ var (
jwksKeyIDs = flag.String("jwks_key_ids", "", "IDs of a set of key in a JWKS, separated by commas")
keyRefreshTimeout = flag.Duration("key_refresh_timeout", 1*time.Minute, "Timeout for refreshing keys for JWT verification")
jwtAudiences = flag.String("accepted_jwt_audiences", "", "comma-separated acceptable JWT `aud` claims")

scdGlobalLock = flag.Bool("enable_scd_global_lock", false, "Experimental: Use a global lock when working with SCD subscriptions. Reduce global throughput but improve throughput with lot of subscriptions in the same areas.")
)

const (
Expand Down Expand Up @@ -200,7 +202,7 @@ func createSCDServer(ctx context.Context, logger *zap.Logger) (*scd.Server, erro
return nil, stacktrace.Propagate(err, "Failed to connect to strategic conflict detection database; verify your database configuration is current with https://github.com/interuss/dss/tree/master/build#upgrading-database-schemas")
}

scdStore, err := scdc.NewStore(ctx, datastore)
scdStore, err := scdc.NewStore(ctx, datastore, *scdGlobalLock)
if err != nil {
// TODO: More robustly detect failure to create SCD server is due to a problem that may be temporary
if strings.Contains(err.Error(), "connect: connection refused") || strings.Contains(err.Error(), "database \"scd\" does not exist") {
Expand Down Expand Up @@ -233,6 +235,7 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st
logger.Info("version", zap.Any("version", version.Current()))
logger.Info("build", zap.Any("description", build.Describe()))
logger.Info("config", zap.Bool("scd", *enableSCD))
logger.Info("config", zap.Bool("scdGlobalLock", *scdGlobalLock))

if len(*jwtAudiences) == 0 {
// TODO: Make this flag required once all parties can set audiences
Expand Down
2 changes: 1 addition & 1 deletion cmds/db-manager/cleanup/evict.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func getSCDStore(ctx context.Context) (*scdc.Store, error) {
return nil, fmt.Errorf("failed to connect to SCD database with %+v: %w", logParams, err)
}

scdStore, err := scdc.NewStore(ctx, datastore)
scdStore, err := scdc.NewStore(ctx, datastore, false)
if err != nil {
return nil, fmt.Errorf("failed to create strategic conflict detection store with %+v: %w", connectParameters, err)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
locals {
rid_db_schema = var.desired_rid_db_version == "latest" ? (var.datastore_type == "cockroachdb" ? "4.0.0" : "1.0.1") : var.desired_rid_db_version
scd_db_schema = var.desired_scd_db_version == "latest" ? (var.datastore_type == "cockroachdb" ? "3.2.0" : "1.0.1") : var.desired_scd_db_version
scd_db_schema = var.desired_scd_db_version == "latest" ? (var.datastore_type == "cockroachdb" ? "3.3.0" : "1.1.0") : var.desired_scd_db_version
aux_db_schema = var.desired_aux_db_version == "latest" ? "1.0.0" : var.desired_aux_db_version
}
4 changes: 2 additions & 2 deletions deploy/services/helm-charts/dss/templates/schema-manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
{{- $jobVersion := .Release.Revision -}} {{/* Jobs template definition is immutable, using the revision in the name forces the job to be recreated at each helm upgrade. */}}

{{- $waitForDatastore := include "init-container-wait-for-http" (dict "serviceName" "cockroachdb" "url" (printf "http://%s:8080/health" $datastoreHost)) -}}
{{- $schemas := dict "rid" "4.0.0" "scd" "3.2.0" "aux_" "1.0.0" }}
{{- $schemas := dict "rid" "4.0.0" "scd" "3.3.0" "aux_" "1.0.0" }}

{{- if .Values.yugabyte.enabled }}
{{- $waitForDatastore = include "init-container-wait-for-http" (dict "serviceName" "yb-tserver" "url" (printf "http://%s:9000/status" $datastoreHost)) -}}
{{- $schemas = dict "rid" "1.0.1" "scd" "1.0.1" "aux_" "1.0.0" }}
{{- $schemas = dict "rid" "1.0.1" "scd" "1.1.0" "aux_" "1.0.0" }}
{{- end -}}

{{- range $service, $schemaVersion := $schemas }}
Expand Down
2 changes: 1 addition & 1 deletion deploy/services/tanka/examples/minikube/main.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ local metadata = metadataBase {
enable: true,
image: 'docker.io/interuss-local/dss:latest',
desired_rid_db_version: '1.0.1',
desired_scd_db_version: '1.0.1',
desired_scd_db_version: '1.1.0',
desired_aux_db_version: '1.0.0',
},
evict+: {
Expand Down
2 changes: 1 addition & 1 deletion deploy/services/tanka/examples/minimum/main.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ local metadata = metadataBase {
enable: false, // <-- this boolean value is VAR_ENABLE_SCHEMA_MANAGER
image: 'VAR_DOCKER_IMAGE_NAME',
desired_rid_db_version: '4.0.0',
desired_scd_db_version: '3.2.0',
desired_scd_db_version: '3.3.0',
desired_aux_db_version: '1.0.0',
},
prometheus+: {
Expand Down
2 changes: 1 addition & 1 deletion deploy/services/tanka/examples/schema_manager/main.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ local metadata = metadataBase {
enable: true, // <-- this boolean value is VAR_ENABLE_SCHEMA_MANAGER
image: 'VAR_DOCKER_IMAGE_NAME',
desired_rid_db_version: '4.0.0',
desired_scd_db_version: '3.2.0',
desired_scd_db_version: '3.3.0',
desired_aux_db_version: '1.0.0',
},
};
Expand Down
2 changes: 1 addition & 1 deletion deploy/services/tanka/metadata_base.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
enable: false, // NB: Automatically enabled if should_init is set to true.
image: error 'must specify image',
desired_rid_db_version: '4.0.0',
desired_scd_db_version: '3.2.0',
desired_scd_db_version: '3.3.0',
desired_aux_db_version: '1.0.0',
},
evict: {
Expand Down
Binary file added docs/assets/perfs_scd_lock_notoverlapping.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/perfs_scd_lock_overlapping.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/operations/.nav.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ nav:
- "Pooling (CockroachDB)": pooling-crdb.md
- "Monitoring": monitoring.md
- "Migrations": migrations.md
- "Performances": performances.md
- "Troubleshooting": troubleshooting.md
4 changes: 4 additions & 0 deletions docs/operations/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ See [Leaving a pool](pooling.md#leaving-a-pool)

See [Monitoring](monitoring.md)

## Performances

See [Performances](performances.md)

## Troubleshooting

See [Troubleshooting](troubleshooting.md)
28 changes: 28 additions & 0 deletions docs/operations/performances.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Performances

## The SCD global lock option

!!! danger
All DSS instances in a DSS pool must use the same value for this option. Mixing will result in dramatically lower performance.

It has been reported in issue [#1311](https://github.com/interuss/dss/issues/1311) that creating a lot of overlapping operational intents may increase the datastore load in a way that creates timeouts.

By default, the code will try to lock on required subscriptions when working on operational intents, and having too many of them may lead to issues.

A solution to that is to switch to a global lock, that is just globally locking operational intents operations, regardless of subscriptions.

This will result in lower general throughput for operational intents that don't overlap, as only one of them can be processed at a time, but better performance in the issue's case as lock acquisition is simpler.

You should enable this option depending on your DSS usage/use case and what you want to maximize:
* If you have non-overlapping traffic and maximum global throughput, don't enable this flag
* If you have overlapping traffic and don't need high global throughput, enable this flag

The following graphs show example throughput without (on the left) and with the flag (on the right). This has been run on a local machine; on a real deployment you can expect lower performance (due to various latency), but similar relative numbers.

All graphs have been generated with the [loadtest present in the monitoring repository](https://github.com/interuss/monitoring/blob/main/monitoring/loadtest/README.md) using `SCD.py`.

![](../assets/perfs_scd_lock_overlapping.png)
*Overlapping requests. Notice the huge spikes on the left, as the datastore struggles to acquire locks.*

![](../assets/perfs_scd_lock_notoverlapping.png)
*Non-overlapping requests. Notice the reduction of performance on the right, with a single lock.*
27 changes: 16 additions & 11 deletions pkg/scd/store/datastore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,25 @@ var (
// repo is an implementation of repos.Repo using
// a CockroachDB/Yugabyte transaction.
type repo struct {
q dsssql.Queryable
clock clockwork.Clock
q dsssql.Queryable
clock clockwork.Clock
globalLock bool
}

// Store is an implementation of an scd.Store using
// a CockroachDB or Yugabyte database.
type Store struct {
db *datastore.Datastore
clock clockwork.Clock
db *datastore.Datastore
clock clockwork.Clock
globalLock bool
}

// NewStore returns a Store instance connected to a cockroach or Yugabyte instance via db.
func NewStore(ctx context.Context, db *datastore.Datastore) (*Store, error) {
func NewStore(ctx context.Context, db *datastore.Datastore, globalLock bool) (*Store, error) {
store := &Store{
db: db,
clock: DefaultClock,
db: db,
clock: DefaultClock,
globalLock: globalLock,
}

if err := store.CheckCurrentMajorSchemaVersion(ctx); err != nil {
Expand Down Expand Up @@ -81,8 +84,9 @@ func (s *Store) CheckCurrentMajorSchemaVersion(ctx context.Context) error {
// Interact implements store.Interactor interface.
func (s *Store) Interact(_ context.Context) (repos.Repository, error) {
return &repo{
q: s.db.Pool,
clock: s.clock,
q: s.db.Pool,
clock: s.clock,
globalLock: s.globalLock,
}, nil
}

Expand All @@ -91,8 +95,9 @@ func (s *Store) Transact(ctx context.Context, f func(context.Context, repos.Repo
ctx = crdb.WithMaxRetries(ctx, flags.ConnectParameters().MaxRetries)
return crdbpgx.ExecuteTx(ctx, s.db.Pool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error {
return f(ctx, &repo{
q: tx,
clock: s.clock,
q: tx,
clock: s.clock,
globalLock: s.globalLock,
})
})
}
Expand Down
47 changes: 29 additions & 18 deletions pkg/scd/store/datastore/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,26 +380,37 @@ func (c *repo) IncrementNotificationIndices(ctx context.Context, subscriptionIds

func (c *repo) LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion, subscriptionIds []dssmodels.ID) error {

const query = `
SELECT
id
FROM
scd_subscriptions
WHERE
cells && $1
OR
id = ANY($2)
FOR UPDATE
`
if c.globalLock {

ids := make([]string, len(subscriptionIds))
for i, id := range subscriptionIds {
ids[i] = id.String()
}
const query = `SELECT id FROM scd_locks WHERE id = 0 FOR UPDATE`

_, err := c.q.Exec(ctx, query, dsssql.CellUnionToCellIds(cells), ids)
if err != nil {
return stacktrace.Propagate(err, "Error in query: %s", query)
_, err := c.q.Exec(ctx, query)
if err != nil {
return stacktrace.Propagate(err, "Error in query: %s", query)
}

} else {
const query = `
SELECT
id
FROM
scd_subscriptions
WHERE
cells && $1
OR
id = ANY($2)
FOR UPDATE
`

ids := make([]string, len(subscriptionIds))
for i, id := range subscriptionIds {
ids[i] = id.String()
}

_, err := c.q.Exec(ctx, query, dsssql.CellUnionToCellIds(cells), ids)
if err != nil {
return stacktrace.Propagate(err, "Error in query: %s", query)
}
}

return nil
Expand Down
Loading