From a95a5cd3fb5fa744ce9f0c6355010f99a6b3d92f Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Thu, 23 Oct 2025 23:40:57 +0300 Subject: [PATCH] tests: replace benchmarks Existing benchmarks are no longer relevant - no one knows what they're testing. Instead, benchmarks have been added for the following metrics: * Number of allocations. * RPS on synchronous and asynchronous modes. --- tarantool_test.go | 626 ++++++++++++---------------------------------- 1 file changed, 157 insertions(+), 469 deletions(-) diff --git a/tarantool_test.go b/tarantool_test.go index 437f7f96..3bab6964 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -12,9 +12,9 @@ import ( "path/filepath" "reflect" "regexp" - "runtime" "strings" "sync" + "sync/atomic" "testing" "time" @@ -95,7 +95,7 @@ var opts = Opts{ const N = 500 -func BenchmarkClientSerial(b *testing.B) { +func BenchmarkSync_naive(b *testing.B) { var err error conn := test_helpers.ConnectWithValidation(b, dialer, opts) @@ -103,19 +103,29 @@ func BenchmarkClientSerial(b *testing.B) { _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) if err != nil { - b.Errorf("No connection available") + b.Fatalf("failed to initialize database: %s", err) } b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err = conn.Select(spaceNo, indexNo, 0, 1, IterEq, []interface{}{uint(1111)}) + + for b.Loop() { + req := NewSelectRequest(spaceNo). + Index(indexNo). + Iterator(IterEq). + Key([]interface{}{uint(1111)}) + data, err := conn.Do(req).Get() if err != nil { - b.Errorf("No connection available") + b.Errorf("request error: %s", err) + } + + tuple := data[0].([]any) + if tuple[0].(uint16) != uint16(1111) { + b.Errorf("invalid result") } } } -func BenchmarkClientSerialRequestObject(b *testing.B) { +func BenchmarkSync_naive_with_single_request(b *testing.B) { var err error conn := test_helpers.ConnectWithValidation(b, dialer, opts) @@ -123,157 +133,61 @@ func BenchmarkClientSerialRequestObject(b *testing.B) { _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) if err != nil { - b.Error(err) + b.Fatalf("failed to initialize database: %s", err) } + req := NewSelectRequest(spaceNo). Index(indexNo). - Offset(0). - Limit(1). Iterator(IterEq). - Key([]interface{}{uint(1111)}) + Key(UintKey{I: 1111}) b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := conn.Do(req).Get() + for b.Loop() { + data, err := conn.Do(req).Get() if err != nil { - b.Error(err) + b.Errorf("request error: %s", err) } - } -} - -func BenchmarkClientSerialRequestObjectWithContext(b *testing.B) { - var err error - - conn := test_helpers.ConnectWithValidation(b, dialer, opts) - defer conn.Close() - - _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) - if err != nil { - b.Error(err) - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - b.ResetTimer() - for i := 0; i < b.N; i++ { - req := NewSelectRequest(spaceNo). - Index(indexNo). - Limit(1). - Iterator(IterEq). - Key([]interface{}{uint(1111)}). - Context(ctx) - _, err := conn.Do(req).Get() - if err != nil { - b.Error(err) + tuple := data[0].([]any) + if tuple[0].(uint16) != uint16(1111) { + b.Errorf("invalid result") } } } -func BenchmarkClientSerialTyped(b *testing.B) { - var err error - - conn := test_helpers.ConnectWithValidation(b, dialer, opts) - defer conn.Close() - - _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) - if err != nil { - b.Fatal("No connection available") - } - - var r []Tuple - b.ResetTimer() - for i := 0; i < b.N; i++ { - err = conn.SelectTyped(spaceNo, indexNo, 0, 1, IterEq, IntKey{1111}, &r) - if err != nil { - b.Errorf("No connection available") - } - } +type benchTuple struct { + id uint } -func BenchmarkClientSerialSQL(b *testing.B) { - test_helpers.SkipIfSQLUnsupported(b) - - conn := test_helpers.ConnectWithValidation(b, dialer, opts) - defer conn.Close() - - _, err := conn.Replace("SQL_TEST", []interface{}{uint(1111), "hello", "world"}) +func (t *benchTuple) DecodeMsgpack(dec *msgpack.Decoder) error { + l, err := dec.DecodeArrayLen() if err != nil { - b.Errorf("Failed to replace: %s", err) + return fmt.Errorf("failed to decode tuples array: %w", err) } - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := conn.Execute("SELECT NAME0,NAME1,NAME2 FROM SQL_TEST WHERE NAME0=?", - []interface{}{uint(1111)}) - if err != nil { - b.Errorf("Select failed: %s", err.Error()) - break - } - } -} - -func BenchmarkClientSerialSQLPrepared(b *testing.B) { - test_helpers.SkipIfSQLUnsupported(b) - - conn := test_helpers.ConnectWithValidation(b, dialer, opts) - defer conn.Close() - - _, err := conn.Replace("SQL_TEST", []interface{}{uint(1111), "hello", "world"}) - if err != nil { - b.Errorf("Failed to replace: %s", err) + if l != 1 { + return fmt.Errorf("unexpected tuples array with len %d", l) } - stmt, err := conn.NewPrepared("SELECT NAME0,NAME1,NAME2 FROM SQL_TEST WHERE NAME0=?") + l, err = dec.DecodeArrayLen() if err != nil { - b.Fatalf("failed to prepare a SQL statement") + return fmt.Errorf("failed to decode tuple array: %w", err) } - executeReq := NewExecutePreparedRequest(stmt) - unprepareReq := NewUnprepareRequest(stmt) - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := conn.Do(executeReq.Args([]interface{}{uint(1111)})).Get() - if err != nil { - b.Errorf("Select failed: %s", err.Error()) - break - } + if l < 1 { + return fmt.Errorf("too small tuple have 0 fields") } - _, err = conn.Do(unprepareReq).Get() - if err != nil { - b.Fatalf("failed to unprepare a SQL statement") - } -} - -func BenchmarkClientFuture(b *testing.B) { - var err error - - conn := test_helpers.ConnectWithValidation(b, dialer, opts) - defer conn.Close() - _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) + t.id, err = dec.DecodeUint() if err != nil { - b.Error(err) + return fmt.Errorf("failed to decode id: %w", err) } - b.ResetTimer() - for i := 0; i < b.N; i += N { - var fs [N]*Future - for j := 0; j < N; j++ { - fs[j] = conn.SelectAsync(spaceNo, indexNo, 0, 1, IterEq, []interface{}{uint(1111)}) - } - for j := 0; j < N; j++ { - _, err = fs[j].Get() - if err != nil { - b.Error(err) - } - } - - } + return nil } -func BenchmarkClientFutureTyped(b *testing.B) { +func BenchmarkSync_naive_with_custom_type(b *testing.B) { var err error conn := test_helpers.ConnectWithValidation(b, dialer, opts) @@ -281,62 +195,31 @@ func BenchmarkClientFutureTyped(b *testing.B) { _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) if err != nil { - b.Errorf("No connection available") + b.Fatalf("failed to initialize database: %s", err) } - b.ResetTimer() - for i := 0; i < b.N; i += N { - var fs [N]*Future - for j := 0; j < N; j++ { - fs[j] = conn.SelectAsync(spaceNo, indexNo, 0, 1, IterEq, IntKey{1111}) - } - var r []Tuple - for j := 0; j < N; j++ { - err = fs[j].GetTyped(&r) - if err != nil { - b.Error(err) - } - if len(r) != 1 || r[0].Id != 1111 { - b.Errorf("Doesn't match %v", r) - } - } - } -} + req := NewSelectRequest(spaceNo). + Index(indexNo). + Iterator(IterEq). + Key(UintKey{I: 1111}) -func BenchmarkClientFutureParallel(b *testing.B) { - var err error + var tuple benchTuple - conn := test_helpers.ConnectWithValidation(b, dialer, opts) - defer conn.Close() + b.ResetTimer() - _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) - if err != nil { - b.Errorf("No connection available") - } + for b.Loop() { + err := conn.Do(req).GetTyped(&tuple) + if err != nil { + b.Errorf("request error: %s", err) + } - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - exit := false - for !exit { - var fs [N]*Future - var j int - for j = 0; j < N && pb.Next(); j++ { - fs[j] = conn.SelectAsync(spaceNo, indexNo, 0, 1, IterEq, []interface{}{uint(1111)}) - } - exit = j < N - for j > 0 { - j-- - _, err := fs[j].Get() - if err != nil { - b.Error(err) - break - } - } + if tuple.id != 1111 { + b.Errorf("invalid result") } - }) + } } -func BenchmarkClientFutureParallelTyped(b *testing.B) { +func BenchmarkSync_multithread(b *testing.B) { var err error conn := test_helpers.ConnectWithValidation(b, dialer, opts) @@ -344,362 +227,167 @@ func BenchmarkClientFutureParallelTyped(b *testing.B) { _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) if err != nil { - b.Fatal("No connection available") - } - - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - exit := false - for !exit { - var fs [N]*Future - var j int - for j = 0; j < N && pb.Next(); j++ { - fs[j] = conn.SelectAsync(spaceNo, indexNo, 0, 1, IterEq, IntKey{1111}) - } - exit = j < N - var r []Tuple - for j > 0 { - j-- - err := fs[j].GetTyped(&r) - if err != nil { - b.Error(err) - break - } - if len(r) != 1 || r[0].Id != 1111 { - b.Errorf("Doesn't match %v", r) - break - } - } - } - }) -} - -func BenchmarkClientParallel(b *testing.B) { - conn := test_helpers.ConnectWithValidation(b, dialer, opts) - defer conn.Close() - - _, err := conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) - if err != nil { - b.Fatal("No connection available") - } - - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - _, err := conn.Select(spaceNo, indexNo, 0, 1, IterEq, []interface{}{uint(1111)}) - if err != nil { - b.Errorf("No connection available") - break - } - } - }) -} - -func benchmarkClientParallelRequestObject(multiplier int, b *testing.B) { - conn := test_helpers.ConnectWithValidation(b, dialer, opts) - defer conn.Close() - - _, err := conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) - if err != nil { - b.Fatal("No connection available") + b.Fatalf("failed to initialize database: %s", err) } req := NewSelectRequest(spaceNo). Index(indexNo). - Limit(1). Iterator(IterEq). - Key([]interface{}{uint(1111)}) + Key(UintKey{I: 1111}) - b.SetParallelism(multiplier) b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + var tuple benchTuple + for pb.Next() { - _ = conn.Do(req) - _, err := conn.Do(req).Get() + err := conn.Do(req).GetTyped(&tuple) if err != nil { - b.Error(err) + b.Errorf("request error: %s", err) } - } - }) -} - -func benchmarkClientParallelRequestObjectWithContext(multiplier int, b *testing.B) { - conn := test_helpers.ConnectWithValidation(b, dialer, opts) - defer conn.Close() - - _, err := conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) - if err != nil { - b.Fatal("No connection available") - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - req := NewSelectRequest(spaceNo). - Index(indexNo). - Limit(1). - Iterator(IterEq). - Key([]interface{}{uint(1111)}). - Context(ctx) - - b.SetParallelism(multiplier) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - _ = conn.Do(req) - _, err := conn.Do(req).Get() - if err != nil { - b.Error(err) + if tuple.id != 1111 { + b.Errorf("invalid result") } } }) } -func benchmarkClientParallelRequestObjectMixed(multiplier int, b *testing.B) { +func BenchmarkAsync_multithread_parallelism(b *testing.B) { + var err error + conn := test_helpers.ConnectWithValidation(b, dialer, opts) defer conn.Close() - _, err := conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) + _, err = conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) if err != nil { - b.Fatal("No connection available") + b.Fatalf("failed to initialize database: %s", err) } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - req := NewSelectRequest(spaceNo). Index(indexNo). - Limit(1). - Iterator(IterEq). - Key([]interface{}{uint(1111)}) - - reqWithCtx := NewSelectRequest(spaceNo). - Index(indexNo). - Limit(1). Iterator(IterEq). - Key([]interface{}{uint(1111)}). - Context(ctx) + Key(UintKey{I: 1111}) - b.SetParallelism(multiplier) b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - _ = conn.Do(req) - _, err := conn.Do(reqWithCtx).Get() - if err != nil { - b.Error(err) - } - } - }) -} -func BenchmarkClientParallelRequestObject(b *testing.B) { - multipliers := []int{10, 50, 500, 1000} - conn := test_helpers.ConnectWithValidation(b, dialer, opts) - defer conn.Close() + for p := 1; p <= 1024; p *= 2 { + b.Run(fmt.Sprintf("%d", p), func(b *testing.B) { + b.SetParallelism(p) + b.ResetTimer() - _, err := conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) - if err != nil { - b.Fatal("No connection available") - } - - for _, m := range multipliers { - goroutinesNum := runtime.GOMAXPROCS(0) * m - - b.Run(fmt.Sprintf("Plain %d goroutines", goroutinesNum), func(b *testing.B) { - benchmarkClientParallelRequestObject(m, b) - }) + b.RunParallel(func(pb *testing.PB) { + var tuple benchTuple - b.Run(fmt.Sprintf("With Context %d goroutines", goroutinesNum), func(b *testing.B) { - benchmarkClientParallelRequestObjectWithContext(m, b) - }) + for pb.Next() { + err := conn.Do(req).GetTyped(&tuple) + if err != nil { + b.Errorf("request error: %s", err) + } - b.Run(fmt.Sprintf("Mixed %d goroutines", goroutinesNum), func(b *testing.B) { - benchmarkClientParallelRequestObjectMixed(m, b) + if tuple.id != 1111 { + b.Errorf("invalid result") + } + } + }) }) } } -func BenchmarkClientParallelMassive(b *testing.B) { - conn := test_helpers.ConnectWithValidation(b, dialer, opts) - defer conn.Close() +// TestPerformance is a benchmark for the async API that is unable to implement +// with a Go-benchmark. It can be used to test performance with different +// numbers of connections and processing goroutines. +func TestBenchmarkAsync(t *testing.T) { + requests := int64(10_000_000) + connections := 1 // 16 max. - _, err := conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) - if err != nil { - b.Fatal("No connection available") - } - - var wg sync.WaitGroup - limit := make(chan struct{}, 128*1024) - for i := 0; i < 512; i++ { - go func() { - var r []Tuple - for { - if _, ok := <-limit; !ok { - break - } - err = conn.SelectTyped(spaceNo, indexNo, 0, 1, IterEq, IntKey{1111}, &r) - wg.Done() - if err != nil { - b.Errorf("No connection available") - } - } - }() - } - b.ResetTimer() - for i := 0; i < b.N; i++ { - wg.Add(1) - limit <- struct{}{} - } - wg.Wait() - close(limit) -} + ops := opts + // ops.Concurrency = 2 // 4 max. // 2 max. -func BenchmarkClientParallelMassiveUntyped(b *testing.B) { - conn := test_helpers.ConnectWithValidation(b, dialer, opts) - defer conn.Close() + conns := make([]*Connection, 0, connections) + for range connections { + conn := test_helpers.ConnectWithValidation(t, dialer, ops) + defer conn.Close() - _, err := conn.Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) - if err != nil { - b.Errorf("No connection available") + conns = append(conns, conn) } - var wg sync.WaitGroup - limit := make(chan struct{}, 128*1024) - for i := 0; i < 512; i++ { - go func() { - for { - if _, ok := <-limit; !ok { - break - } - _, err = conn.Select(spaceNo, indexNo, 0, 1, IterEq, []interface{}{uint(1111)}) - wg.Done() - if err != nil { - b.Errorf("No connection available") - } - } - }() + _, err := conns[0].Replace(spaceNo, []interface{}{uint(1111), "hello", "world"}) + if err != nil { + t.Fatalf("failed to initialize database: %s", err) } - b.ResetTimer() - for i := 0; i < b.N; i++ { - wg.Add(1) - limit <- struct{}{} - } - wg.Wait() - close(limit) -} + req := NewSelectRequest(spaceNo). + Index(indexNo). + Iterator(IterEq). + Key(UintKey{I: 1111}) -func BenchmarkClientReplaceParallel(b *testing.B) { - conn := test_helpers.ConnectWithValidation(b, dialer, opts) - defer conn.Close() + maxRps := float64(0) + maxConnections := 0 + maxConcurrency := 0 - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - _, err := conn.Replace("test_perf", []interface{}{uint(1), "hello", []interface{}{}}) - if err != nil { - b.Error(err) - } - } - }) -} + for cn := 1; cn <= connections; cn *= 2 { + for cc := 1; cc <= 1; cc *= 2 { // for cc := 1; cc <= 512; cc *=2 { + var wg sync.WaitGroup -func BenchmarkClientLargeSelectParallel(b *testing.B) { - conn := test_helpers.ConnectWithValidation(b, dialer, opts) - defer conn.Close() + curRequests := requests - offset, limit := uint32(0), uint32(1000) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - _, err := conn.Select("test_perf", "secondary", offset, limit, IterEq, - []interface{}{"test_name"}) - if err != nil { - b.Fatal(err) - } - } - }) -} + start := time.Now() -func BenchmarkClientParallelSQL(b *testing.B) { - test_helpers.SkipIfSQLUnsupported(b) + for i := range cc { + wg.Add(1) - conn := test_helpers.ConnectWithValidation(b, dialer, opts) - defer conn.Close() + ch := make(chan *Future, 1024) - _, err := conn.Replace("SQL_TEST", []interface{}{uint(1111), "hello", "world"}) - if err != nil { - b.Errorf("No connection available") - } + go func(i int) { + defer close(ch) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - _, err := conn.Execute("SELECT NAME0,NAME1,NAME2 FROM SQL_TEST WHERE NAME0=?", - []interface{}{uint(1111)}) - if err != nil { - b.Errorf("Select failed: %s", err.Error()) - break - } - } - }) -} - -func BenchmarkClientParallelSQLPrepared(b *testing.B) { - test_helpers.SkipIfSQLUnsupported(b) + for atomic.AddInt64(&curRequests, -1) >= 0 { + ch <- conns[i%cn].Do(req) + } + }(i) - conn := test_helpers.ConnectWithValidation(b, dialer, opts) - defer conn.Close() + go func() { + defer wg.Done() - _, err := conn.Replace("SQL_TEST", []interface{}{uint(1111), "hello", "world"}) - if err != nil { - b.Errorf("No connection available") - } + var tuple benchTuple - stmt, err := conn.NewPrepared("SELECT NAME0,NAME1,NAME2 FROM SQL_TEST WHERE NAME0=?") - if err != nil { - b.Fatalf("failed to prepare a SQL statement") - } - executeReq := NewExecutePreparedRequest(stmt) - unprepareReq := NewUnprepareRequest(stmt) + for fut := range ch { + err := fut.GetTyped(&tuple) + if err != nil { + t.Errorf("request error: %s", err) + } - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - _, err := conn.Do(executeReq.Args([]interface{}{uint(1111)})).Get() - if err != nil { - b.Errorf("Select failed: %s", err.Error()) - break + if tuple.id != 1111 { + t.Errorf("invalid result") + } + } + }() } - } - }) - _, err = conn.Do(unprepareReq).Get() - if err != nil { - b.Fatalf("failed to unprepare a SQL statement") - } -} -func BenchmarkSQLSerial(b *testing.B) { - test_helpers.SkipIfSQLUnsupported(b) + wg.Wait() - conn := test_helpers.ConnectWithValidation(b, dialer, opts) - defer conn.Close() + duration := time.Since(start) - _, err := conn.Replace("SQL_TEST", []interface{}{uint(1111), "hello", "world"}) - if err != nil { - b.Errorf("Failed to replace: %s", err) - } + rps := float64(requests) / duration.Seconds() + fmt.Println("requests :", requests) // nolint + fmt.Println("concurrency:", cc) // nolint + fmt.Println("connections:", cn) // nolint + fmt.Printf("duration : %.2f\n", duration.Seconds()) // nolint + fmt.Printf("requests/s : %.2f\n", rps) // nolint + fmt.Println("============") // nolint - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := conn.Execute("SELECT NAME0,NAME1,NAME2 FROM SQL_TEST WHERE NAME0=?", - []interface{}{uint(1111)}) - if err != nil { - b.Errorf("Select failed: %s", err.Error()) - break + if maxRps < rps { + maxRps = rps + maxConnections = cn + maxConcurrency = cc + } } } + + fmt.Println("max connections:", maxConnections) // nolint + fmt.Println("max concurrency:", maxConcurrency) // nolint + fmt.Printf("max requests/s : %.2f", maxRps) // nolint } type mockRequest struct { @@ -2562,7 +2250,7 @@ func TestConnectionDoSelectRequest(t *testing.T) { req := NewSelectRequest(spaceNo). Index(indexNo). - Limit(20). + Limit(10). Iterator(IterGe). Key([]interface{}{uint(1010)}) resp, err := conn.Do(req).GetResponse()