From 9d4d2fcede48c5bd69dfa5b0c688e109aa46e5b2 Mon Sep 17 00:00:00 2001 From: BarShauli555 Date: Tue, 18 Mar 2025 10:23:49 +0200 Subject: [PATCH] ghost_deletion_feature --- go/base/context.go | 14 +++++ go/cmd/gh-ost/main.go | 2 + go/logic/applier.go | 15 ++++-- go/logic/migrator.go | 3 +- go/sql/builder.go | 10 ++-- go/sql/builder_test.go | 114 +++++++++++++++++++++++++++++++++++++---- 6 files changed, 139 insertions(+), 19 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 8a09c43a8..78ecb45fb 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -126,6 +126,7 @@ type MigrationContext struct { defaultNumRetries int64 ChunkSize int64 niceRatio float64 + copyWhereClause string MaxLagMillisecondsThrottleThreshold int64 throttleControlReplicaKeys *mysql.InstanceKeyMap ThrottleFlagFile string @@ -737,6 +738,19 @@ func (this *MigrationContext) GetCriticalLoad() LoadMap { return this.criticalLoad.Duplicate() } +func (this *MigrationContext) GetWhereClause() string { + this.throttleMutex.Lock() + defer this.throttleMutex.Unlock() + + return this.copyWhereClause +} + +func (this *MigrationContext) SetWhereClause(WhereClause string) { + this.throttleMutex.Lock() + defer this.throttleMutex.Unlock() + this.copyWhereClause = WhereClause +} + func (this *MigrationContext) GetNiceRatio() float64 { this.throttleMutex.Lock() defer this.throttleMutex.Unlock() diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 225676364..5331e8aa7 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -116,6 +116,7 @@ func main() { throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307") throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight") throttleHTTP := flag.String("throttle-http", "", "when given, gh-ost checks given URL via HEAD request; any response code other than 200 (OK) causes throttling; make sure it has low latency response") + copyWhereClause := flag.String("where-clause", "1=1", "added where clause for the insert query, filtering table rows") flag.Int64Var(&migrationContext.ThrottleHTTPIntervalMillis, "throttle-http-interval-millis", 100, "Number of milliseconds to wait before triggering another HTTP throttle check") flag.Int64Var(&migrationContext.ThrottleHTTPTimeoutMillis, "throttle-http-timeout-millis", 1000, "Number of milliseconds to use as an HTTP throttle check timeout") ignoreHTTPErrors := flag.Bool("ignore-http-errors", false, "ignore HTTP connection errors during throttle check") @@ -321,6 +322,7 @@ func main() { migrationContext.SetThrottleHTTP(*throttleHTTP) migrationContext.SetIgnoreHTTPErrors(*ignoreHTTPErrors) migrationContext.SetDefaultNumRetries(*defaultRetries) + migrationContext.SetWhereClause(*copyWhereClause) migrationContext.ApplyCredentials() if err := migrationContext.SetupTLS(); err != nil { migrationContext.Log.Fatale(err) diff --git a/go/logic/applier.go b/go/logic/applier.go index 6c8ac71a8..1c9597675 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -11,6 +11,7 @@ import ( "strings" "sync/atomic" "time" + "context" "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" @@ -729,6 +730,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(), this.migrationContext.GetIteration() == 0, this.migrationContext.IsTransactionalTable(), + this.migrationContext.GetWhereClause(), // TODO: Don't hardcode this strings.HasPrefix(this.migrationContext.ApplierMySQLVersion, "8."), ) @@ -737,7 +739,13 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected } sqlResult, err := func() (gosql.Result, error) { - tx, err := this.db.Begin() + var conn *gosql.Conn + conn, err = this.db.Conn(context.Background()) + if (conn == nil || err != nil) { + fmt.Sprintf("failed to get connection") + return nil, err + } + tx, err := conn.BeginTx(context.Background(), nil) if err != nil { return nil, err } @@ -746,16 +754,17 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, this.migrationContext.ApplierTimeZone) sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, this.generateSqlModeQuery()) - if _, err := tx.Exec(sessionQuery); err != nil { + if _, err := tx.ExecContext(context.Background(), sessionQuery); err != nil { return nil, err } - result, err := tx.Exec(query, explodedArgs...) + result, err := tx.ExecContext(context.Background(), query, explodedArgs...) if err != nil { return nil, err } if err := tx.Commit(); err != nil { return nil, err } + conn.Close() return result, nil }() diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 5e4e4eccf..1c07c129c 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -874,13 +874,14 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) { ) maxLoad := this.migrationContext.GetMaxLoad() criticalLoad := this.migrationContext.GetCriticalLoad() - fmt.Fprintf(w, "# chunk-size: %+v; max-lag-millis: %+vms; dml-batch-size: %+v; max-load: %s; critical-load: %s; nice-ratio: %f\n", + fmt.Fprintf(w, "# chunk-size: %+v; max-lag-millis: %+vms; dml-batch-size: %+v; max-load: %s; critical-load: %s; nice-ratio: %f\n; where-clause: %s\n", atomic.LoadInt64(&this.migrationContext.ChunkSize), atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold), atomic.LoadInt64(&this.migrationContext.DMLBatchSize), maxLoad.String(), criticalLoad.String(), this.migrationContext.GetNiceRatio(), + this.migrationContext.GetWhereClause(), ) if this.migrationContext.ThrottleFlagFile != "" { setIndicator := "" diff --git a/go/sql/builder.go b/go/sql/builder.go index 332aef100..9ff11245a 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -182,7 +182,7 @@ func BuildRangePreparedComparison(columns *ColumnList, args []interface{}, compa return BuildRangeComparison(columns.Names(), values, args, comparisonSign) } -func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool, noWait bool) (result string, explodedArgs []interface{}, err error) { +func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool, noWait bool, whereClause string) (result string, explodedArgs []interface{}, err error) { if len(sharedColumns) == 0 { return "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery") } @@ -236,19 +236,19 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin %s.%s force index (%s) where - (%s and %s) + (%s and %s and %s) %s )`, databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing, sharedColumnsListing, databaseName, originalTableName, uniqueKey, - rangeStartComparison, rangeEndComparison, transactionalClause) + rangeStartComparison, rangeEndComparison, whereClause, transactionalClause) return result, explodedArgs, nil } -func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool, noWait bool) (result string, explodedArgs []interface{}, err error) { +func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool, noWait bool, whereClause string) (result string, explodedArgs []interface{}, err error) { rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns) rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns) - return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable, noWait) + return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable, noWait, whereClause string) } func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) { diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index d43f65056..86f77bdfc 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -162,6 +162,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { databaseName := "mydb" originalTableName := "tbl" ghostTableName := "ghost" + whereClause := "1=1" sharedColumns := []string{"id", "name", "position"} { uniqueKey := "PRIMARY" @@ -171,7 +172,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true, true) + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true, true, whereClause) require.NoError(t, err) expected := ` insert /* gh-ost mydb.tbl */ ignore @@ -185,7 +186,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { force index (PRIMARY) where (((id > @v1s) or ((id = @v1s))) - and ((id < @v1e) or ((id = @v1e)))) + and ((id < @v1e) or ((id = @v1e))) and 1=1) for share nowait )` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) @@ -199,7 +200,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true, true) + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true, true, whereClause) require.NoError(t, err) expected := ` insert /* gh-ost mydb.tbl */ ignore @@ -219,7 +220,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) - or ((name = @v1e) and (position = @v2e)))) + or ((name = @v1e) and (position = @v2e))) and 1=1) for share nowait )` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) @@ -227,10 +228,102 @@ func TestBuildRangeInsertQuery(t *testing.T) { } } +func TestBuildRangeInsertQueryWhereClauseFiltering(t *testing.T) { + databaseName := "mydb" + originalTableName := "tbl" + ghostTableName := "ghost" + sharedColumns := []string{"id", "name", "position"} + { + uniqueKey := "PRIMARY" + uniqueKeyColumns := NewColumnList([]string{"id"}) + rangeStartValues := []string{"@v1s"} + rangeEndValues := []string{"@v1e"} + rangeStartArgs := []interface{}{3} + rangeEndArgs := []interface{}{103} + whereClause := "id = 1" + + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false, whereClause) + test.S(t).ExpectNil(err) + expected := ` + insert /* gh-ost mydb.tbl */ ignore + into + mydb.ghost + (id, name, position) + ( + select id, name, position + from + mydb.tbl + force index (PRIMARY) + where + (((id > @v1s) or ((id = @v1s))) + and ((id < @v1e) or ((id = @v1e))) and id = 1) + )` + test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 103, 103})) + } + { + uniqueKey := "PRIMARY" + uniqueKeyColumns := NewColumnList([]string{"id"}) + rangeStartValues := []string{"@v1s"} + rangeEndValues := []string{"@v1e"} + rangeStartArgs := []interface{}{3} + rangeEndArgs := []interface{}{103} + whereClause := "id not in (1,2,3)" + + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false, whereClause) + test.S(t).ExpectNil(err) + expected := ` + insert /* gh-ost mydb.tbl */ ignore + into + mydb.ghost + (id, name, position) + ( + select id, name, position + from + mydb.tbl + force index (PRIMARY) + where + (((id > @v1s) or ((id = @v1s))) + and ((id < @v1e) or ((id = @v1e))) and id not in (1,2,3)) + )` + test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 103, 103})) + } + { + uniqueKey := "PRIMARY" + uniqueKeyColumns := NewColumnList([]string{"id"}) + rangeStartValues := []string{"@v1s"} + rangeEndValues := []string{"@v1e"} + rangeStartArgs := []interface{}{3} + rangeEndArgs := []interface{}{103} + whereClause := "id in (select id from ids)" + + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false, whereClause) + test.S(t).ExpectNil(err) + expected := ` + insert /* gh-ost mydb.tbl */ ignore + into + mydb.ghost + (id, name, position) + ( + select id, name, position + from + mydb.tbl + force index (PRIMARY) + where + (((id > @v1s) or ((id = @v1s))) + and ((id < @v1e) or ((id = @v1e))) and id in (select id from ids)) + )` + test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 103, 103})) + } +} + func TestBuildRangeInsertQueryRenameMap(t *testing.T) { databaseName := "mydb" originalTableName := "tbl" ghostTableName := "ghost" + whereClause := "1=1" sharedColumns := []string{"id", "name", "position"} mappedSharedColumns := []string{"id", "name", "location"} { @@ -241,7 +334,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true, true) + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true, true, whereClause) require.NoError(t, err) expected := ` insert /* gh-ost mydb.tbl */ ignore @@ -256,7 +349,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { where (((id > @v1s) or ((id = @v1s))) and - ((id < @v1e) or ((id = @v1e)))) + ((id < @v1e) or ((id = @v1e))) and 1=1) for share nowait )` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) @@ -270,7 +363,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true, true) + query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true, true, whereClause) require.NoError(t, err) expected := ` insert /* gh-ost mydb.tbl */ ignore @@ -285,7 +378,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) - and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) + and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e) and 1=1) or ((name = @v1e) and (position = @v2e)))) for share nowait )` @@ -298,6 +391,7 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) { databaseName := "mydb" originalTableName := "tbl" ghostTableName := "ghost" + whereClause := "1=1" sharedColumns := []string{"id", "name", "position"} { uniqueKey := "name_position_uidx" @@ -305,7 +399,7 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) { rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true, true) + query, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true, true, whereClause) require.NoError(t, err) expected := ` insert /* gh-ost mydb.tbl */ ignore @@ -317,7 +411,7 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) { from mydb.tbl force index (name_position_uidx) - where (((name > ?) or (((name = ?)) AND (position > ?)) or ((name = ?) and (position = ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?)))) + where (((name > ?) or (((name = ?)) AND (position > ?)) or ((name = ?) and (position = ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?))) and 1=1) for share nowait )` require.Equal(t, normalizeQuery(expected), normalizeQuery(query))