diff --git a/conn.go b/conn.go index 7f9ece77..a78e856c 100644 --- a/conn.go +++ b/conn.go @@ -401,6 +401,16 @@ func (c *conn) setReadOnlyStaleness(staleness spanner.TimestampBound) (driver.Re return driver.ResultNoRows, nil } +func (c *conn) readOnlyStalenessPointer() *spanner.TimestampBound { + val := propertyReadOnlyStaleness.GetConnectionPropertyValue(c.state) + if val == nil || !val.HasValue() { + return nil + } + staleness, _ := val.GetValue() + timestampBound := staleness.(spanner.TimestampBound) + return ×tampBound +} + func (c *conn) IsolationLevel() sql.IsolationLevel { return propertyIsolationLevel.GetValueOrDefault(c.state) } @@ -987,6 +997,7 @@ func (c *conn) options(reset bool) *ExecOptions { }, }, PartitionedQueryOptions: PartitionedQueryOptions{}, + TimestampBound: c.readOnlyStalenessPointer(), } if c.tempExecOptions != nil { effectiveOptions.merge(c.tempExecOptions) @@ -1274,6 +1285,9 @@ func (c *conn) rollback(ctx context.Context) error { } func queryInSingleUse(ctx context.Context, c *spanner.Client, statement spanner.Statement, tb spanner.TimestampBound, options *ExecOptions) *spanner.RowIterator { + if options.TimestampBound != nil { + tb = *options.TimestampBound + } return c.Single().WithTimestampBound(tb).QueryWithOptions(ctx, statement, options.QueryOptions) } diff --git a/conn_with_mockserver_test.go b/conn_with_mockserver_test.go index 848ee24a..befa2780 100644 --- a/conn_with_mockserver_test.go +++ b/conn_with_mockserver_test.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" "testing" + "time" "cloud.google.com/go/longrunning/autogen/longrunningpb" "cloud.google.com/go/spanner" @@ -587,6 +588,34 @@ func TestSetLocalReadLockMode(t *testing.T) { } } +func TestTimestampBound(t *testing.T) { + t.Parallel() + + db, server, teardown := setupTestDBConnection(t) + defer teardown() + ctx := context.Background() + + staleness := spanner.MaxStaleness(10 * time.Second) + row := db.QueryRowContext(ctx, testutil.SelectFooFromBar, ExecOptions{TimestampBound: &staleness}) + if row.Err() != nil { + t.Fatal(row.Err()) + } + var val int64 + if err := row.Scan(&val); err != nil { + t.Fatal(err) + } + + requests := server.TestSpanner.DrainRequestsFromServer() + executeRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.ExecuteSqlRequest{})) + if g, w := len(executeRequests), 1; g != w { + t.Fatalf("execute requests count mismatch\n Got: %v\nWant: %v", g, w) + } + request := executeRequests[0].(*spannerpb.ExecuteSqlRequest) + if g, w := request.Transaction.GetSingleUse().GetReadOnly().GetMaxStaleness().GetSeconds(), int64(10); g != w { + t.Fatalf("read staleness mismatch\n Got: %v\nWant: %v", g, w) + } +} + func TestCreateDatabase(t *testing.T) { t.Parallel() diff --git a/driver.go b/driver.go index e76c5a59..3d38ed55 100644 --- a/driver.go +++ b/driver.go @@ -173,6 +173,10 @@ type ExecOptions struct { TransactionOptions spanner.TransactionOptions // QueryOptions are the query options that will be used for the statement. QueryOptions spanner.QueryOptions + // TimestampBound is the timestamp bound that will be used for the statement + // if it is a query outside a transaction. Setting this option will override + // the default TimestampBound that is set on the connection. + TimestampBound *spanner.TimestampBound // PartitionedQueryOptions are used for partitioned queries, and ignored // for all other statements. @@ -234,6 +238,9 @@ func (dest *ExecOptions) merge(src *ExecOptions) { if src.AutocommitDMLMode != Unspecified { dest.AutocommitDMLMode = src.AutocommitDMLMode } + if src.TimestampBound != nil { + dest.TimestampBound = src.TimestampBound + } (&dest.PartitionedQueryOptions).merge(&src.PartitionedQueryOptions) mergeQueryOptions(&dest.QueryOptions, &src.QueryOptions) mergeTransactionOptions(&dest.TransactionOptions, &src.TransactionOptions)