Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/command-line-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ Why is this behavior configurable? Different workloads have different characteri

Noteworthy is that setting `--dml-batch-size` to higher value _does not_ mean `gh-ost` blocks or waits on writes. The batch size is an upper limit on transaction size, not a minimal one. If `gh-ost` doesn't have "enough" events in the pipe, it does not wait on the binary log, it just writes what it already has. This conveniently suggests that if write load is light enough for `gh-ost` to only see a few events in the binary log at a given time, then it is also light enough for `gh-ost` to apply a fraction of the batch size.


### ignore-over-iteration-range-max-binlog

Defaults to false. When binlog unique key value is over `MigrationIterationRangeMaxValues`, and less than `MigrationRangeMaxValues`, the binlog will be ignored. Because the data will be synced by copy chunk. When binlog unique key value is over `MigrationRangeMaxValues` or less than `MigrationIterationRangeMaxValues`, the binlog will be applied. Currently when enabled, this only takes effect for single-column unique index of int type.


### exact-rowcount

A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can and often will be, a large number. Exactly what that number is?
Expand Down
4 changes: 4 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,11 @@ type MigrationContext struct {
controlReplicasLagResult mysql.ReplicationLagResult
TotalRowsCopied int64
TotalDMLEventsApplied int64
TotalDMLEventsIgnored int64
DMLBatchSize int64
IgnoreOverIterationRangeMaxBinlog bool
IsMigrationRangeMaxValuesLocked bool
MigrationRangeMaxValuesInitial *sql.ColumnValues
isThrottled bool
throttleReason string
throttleReasonHint ThrottleReasonHint
Expand Down
1 change: 1 addition & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func main() {
flag.BoolVar(&migrationContext.PanicOnWarnings, "panic-on-warnings", false, "Panic when SQL warnings are encountered when copying a batch indicating data loss")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout) or attempting instant DDL")
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")
flag.BoolVar(&migrationContext.IgnoreOverIterationRangeMaxBinlog, "ignore-over-iteration-range-max-binlog", false, "When binlog unique key value is over MigrationIterationRangeMaxValues, and less than MigrationRangeMaxValues, the binlog will be ignored. Because the data will be synced by copy chunk")

maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation")
replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query")
Expand Down
197 changes: 183 additions & 14 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,20 @@ const (
)

type dmlBuildResult struct {
query string
args []interface{}
rowsDelta int64
err error
query string
args []interface{}
uniqueKeyArgs []interface{}
rowsDelta int64
err error
}

func newDmlBuildResult(query string, args []interface{}, rowsDelta int64, err error) *dmlBuildResult {
func newDmlBuildResult(query string, args []interface{}, uniqueKeyArgs []interface{}, rowsDelta int64, err error) *dmlBuildResult {
return &dmlBuildResult{
query: query,
args: args,
rowsDelta: rowsDelta,
err: err,
query: query,
args: args,
uniqueKeyArgs: uniqueKeyArgs,
rowsDelta: rowsDelta,
err: err,
}
}

Expand Down Expand Up @@ -131,6 +133,7 @@ func (this *Applier) prepareQueries() (err error) {
this.migrationContext.OriginalTableColumns,
this.migrationContext.SharedColumns,
this.migrationContext.MappedSharedColumns,
&this.migrationContext.UniqueKey.Columns,
); err != nil {
return err
}
Expand Down Expand Up @@ -640,7 +643,15 @@ func (this *Applier) readMigrationMaxValues(tx *gosql.Tx, uniqueKey *sql.UniqueK
return err
}
}
this.migrationContext.Log.Infof("Migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues)

// Save a snapshot copy of the initial MigrationRangeMaxValues
if this.migrationContext.MigrationRangeMaxValues == nil {
this.migrationContext.MigrationRangeMaxValuesInitial = nil
} else {
abstractValues := make([]interface{}, len(this.migrationContext.MigrationRangeMaxValues.AbstractValues()))
copy(abstractValues, this.migrationContext.MigrationRangeMaxValues.AbstractValues())
this.migrationContext.MigrationRangeMaxValuesInitial = sql.ToColumnValues(abstractValues)
}

return rows.Err()
}
Expand Down Expand Up @@ -683,6 +694,63 @@ func (this *Applier) ReadMigrationRangeValues() error {
return tx.Commit()
}

// ResetMigrationRangeMaxValues updates the MigrationRangeMaxValues with new values
func (this *Applier) ResetMigrationRangeMaxValues(uniqueKeyAbstractValues []interface{}) {
abstractValues := make([]interface{}, len(uniqueKeyAbstractValues))
copy(abstractValues, uniqueKeyAbstractValues)
this.migrationContext.MigrationRangeMaxValues = sql.ToColumnValues(abstractValues)
this.migrationContext.Log.Debugf("Reset migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues)
}

// LockMigrationRangeMaxValues locks the MigrationRangeMaxValues to prevent further updates
func (this *Applier) LockMigrationRangeMaxValues() {
if this.migrationContext.IsMigrationRangeMaxValuesLocked {
return
}
this.migrationContext.IsMigrationRangeMaxValuesLocked = true
this.migrationContext.Log.Infof("Lock migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues)
}

// AttemptToLockMigrationRangeMaxValues attempts to lock MigrationRangeMaxValues to prevent endless copying.
// To avoid infinite updates of MigrationRangeMaxValues causing the copy to never end,
// we need a strategy to stop updates. When the initial copy target is achieved,
// MigrationRangeMaxValues will be locked.
func (this *Applier) AttemptToLockMigrationRangeMaxValues() {
if this.migrationContext.IsMigrationRangeMaxValuesLocked {
return
}

// Currently only supports single-column unique index of int type
uniqueKeyCols := this.migrationContext.UniqueKey.Columns.Columns()
if len(uniqueKeyCols) != 1 {
this.LockMigrationRangeMaxValues()
return
}
uniqueKeyCol := uniqueKeyCols[0]
if uniqueKeyCol.CompareValueFunc == nil {
this.LockMigrationRangeMaxValues()
return
}

// Compare MigrationIterationRangeMinValues with MigrationRangeMaxValuesInitial to determine copy progress
if this.migrationContext.MigrationIterationRangeMinValues == nil {
return
}
than, err := uniqueKeyCol.CompareValueFunc(
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues()[0],
this.migrationContext.MigrationRangeMaxValuesInitial.AbstractValues()[0],
)
if err != nil {
// If comparison fails, fallback to locking MigrationRangeMaxValues
this.migrationContext.Log.Errore(err)
this.LockMigrationRangeMaxValues()
return
}
if than >= 0 {
this.LockMigrationRangeMaxValues()
}
}

// CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values,
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
// no further chunk to work through, i.e. we're past the last chunk and are done with
Expand All @@ -692,6 +760,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
if this.migrationContext.MigrationIterationRangeMinValues == nil {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
}
this.AttemptToLockMigrationRangeMaxValues()
for i := 0; i < 2; i++ {
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
if i == 1 {
Expand Down Expand Up @@ -733,6 +802,8 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
}
}
this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
// Ensure MigrationRangeMaxValues is locked after iteration is complete
this.LockMigrationRangeMaxValues()
return hasFurtherRange, nil
}

Expand Down Expand Up @@ -1315,12 +1386,12 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB
case binlog.DeleteDML:
{
query, uniqueKeyArgs, err := this.dmlDeleteQueryBuilder.BuildQuery(dmlEvent.WhereColumnValues.AbstractValues())
return []*dmlBuildResult{newDmlBuildResult(query, uniqueKeyArgs, -1, err)}
return []*dmlBuildResult{newDmlBuildResult(query, uniqueKeyArgs, uniqueKeyArgs, -1, err)}
}
case binlog.InsertDML:
{
query, sharedArgs, err := this.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues())
return []*dmlBuildResult{newDmlBuildResult(query, sharedArgs, 1, err)}
query, sharedArgs, uniqueKeyArgs, err := this.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues())
return []*dmlBuildResult{newDmlBuildResult(query, sharedArgs, uniqueKeyArgs, 1, err)}
}
case binlog.UpdateDML:
{
Expand All @@ -1336,12 +1407,94 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB
args := sqlutils.Args()
args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...)
return []*dmlBuildResult{newDmlBuildResult(query, args, 0, err)}
return []*dmlBuildResult{newDmlBuildResult(query, args, uniqueKeyArgs, 0, err)}
}
}
return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))}
}

// IsIgnoreOverMaxChunkRangeEvent returns true if this event can be ignored, because the data will be synced by copy chunk
// min rangeMax max
// the value > rangeMax and value < max, ignore = true
// otherwise ignore = false
func (this *Applier) IsIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs []interface{}) (bool, error) {
if !this.migrationContext.IgnoreOverIterationRangeMaxBinlog {
return false, nil
}

// Currently only supports single-column unique index of int type
uniqueKeyCols := this.migrationContext.UniqueKey.Columns.Columns()
if len(uniqueKeyCols) != 1 {
return false, nil
}
uniqueKeyCol := uniqueKeyCols[0]
if uniqueKeyCol.CompareValueFunc == nil {
return false, nil
}

// Compare whether it is less than the MigrationIterationRangeMaxValues boundary value. If it is, it cannot be ignored and the corresponding binlog needs to be applied.
ignore, err := func() (bool, error) {
compareValues := this.migrationContext.MigrationIterationRangeMaxValues
if compareValues == nil {
// It means that the migration has not started yet, use MigrationRangeMinValues instead
compareValues = this.migrationContext.MigrationRangeMinValues
}

than, err := uniqueKeyCol.CompareValueFunc(uniqueKeyArgs[0], compareValues.StringColumn(0))
if err != nil {
return false, err
}

switch {
case than > 0:
return true, nil
case than < 0:
return false, nil
default:
// Since rowcopy is left-open-right-closed, when it is equal to the MigrationIterationRangeMaxValues boundary value, it cannot be ignored.
return false, nil
}
}()
if err != nil {
return false, err
}

if !ignore {
return false, nil
}

// Compare whether it is greater than the MigrationRangeMaxValues boundary value. If it is, it cannot be ignored and the corresponding binlog needs to be applied.
ignore, err = func() (bool, error) {
compareValues := this.migrationContext.MigrationRangeMaxValues
than, err := uniqueKeyCol.CompareValueFunc(uniqueKeyArgs[0], compareValues)
if err != nil {
return false, err
}

switch {
case than < 0:
return true, nil
case than > 0:
// When the value is greater than MigrationRangeMaxValues boundary, attempt to dynamically expand MigrationRangeMaxValues
// After expand, treat this comparison as equal, otherwise it cannot be ignored
if !this.migrationContext.IsMigrationRangeMaxValuesLocked {
this.ResetMigrationRangeMaxValues(uniqueKeyArgs)
return true, nil
} else {
return false, nil
}
default:
// Since rowcopy is left-open-right-closed, when it is equal to the MigrationRangeMaxValues boundary value, it can be ignored.
return true, nil
}
}()
if err != nil {
return false, err
}

return ignore, nil
}

// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
var totalDelta int64
Expand Down Expand Up @@ -1369,17 +1522,33 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
return err
}

var ignoredEventSize int64
buildResults := make([]*dmlBuildResult, 0, len(dmlEvents))
nArgs := 0
for _, dmlEvent := range dmlEvents {
for _, buildResult := range this.buildDMLEventQuery(dmlEvent) {
if buildResult.err != nil {
return rollback(buildResult.err)
}
if ignore, err := this.IsIgnoreOverMaxChunkRangeEvent(buildResult.uniqueKeyArgs); err != nil {
return rollback(err)
} else if ignore {
ignoredEventSize++
continue
}
nArgs += len(buildResult.args)
buildResults = append(buildResults, buildResult)
}
}
atomic.AddInt64(&this.migrationContext.TotalDMLEventsIgnored, ignoredEventSize)

// If there are no statements to execute, return directly
if len(buildResults) == 0 {
if err := tx.Commit(); err != nil {
return err
}
return nil
}

// We batch together the DML queries into multi-statements to minimize network trips.
// We have to use the raw driver connection to access the rows affected
Expand Down
59 changes: 59 additions & 0 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import (
"context"
gosql "database/sql"
"fmt"

Check failure on line 11 in go/logic/applier_test.go

View workflow job for this annotation

GitHub Actions / lint

other declaration of fmt

Check failure on line 11 in go/logic/applier_test.go

View workflow job for this annotation

GitHub Actions / build

other declaration of fmt
"math/big"
"strings"
"testing"

Expand All @@ -17,7 +19,7 @@
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/mysql"

"fmt"

Check failure on line 22 in go/logic/applier_test.go

View workflow job for this annotation

GitHub Actions / lint

"fmt" imported and not used (typecheck)

Check failure on line 22 in go/logic/applier_test.go

View workflow job for this annotation

GitHub Actions / lint

fmt redeclared in this block

Check failure on line 22 in go/logic/applier_test.go

View workflow job for this annotation

GitHub Actions / build

"fmt" imported and not used

Check failure on line 22 in go/logic/applier_test.go

View workflow job for this annotation

GitHub Actions / build

fmt redeclared in this block

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
Expand Down Expand Up @@ -184,6 +186,63 @@
})
}

func TestIsIgnoreOverMaxChunkRangeEvent(t *testing.T) {
migrationContext := base.NewMigrationContext()
migrationContext.IgnoreOverIterationRangeMaxBinlog = true
uniqueColumns := sql.NewColumnList([]string{"id"})
uniqueColumns.SetColumnCompareValueFunc("id", func(a interface{}, b interface{}) (int, error) {
_a := new(big.Int)
if _a, _ = _a.SetString(fmt.Sprintf("%+v", a), 10); a == nil {
return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", a)
}
_b := new(big.Int)
if _b, _ = _b.SetString(fmt.Sprintf("%+v", b), 10); b == nil {
return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", b)
}
return _a.Cmp(_b), nil
})

migrationContext.UniqueKey = &sql.UniqueKey{
Name: "PRIMARY KEY",
Columns: *uniqueColumns,
}
migrationContext.MigrationRangeMinValues = sql.ToColumnValues([]interface{}{10})
migrationContext.MigrationRangeMaxValues = sql.ToColumnValues([]interface{}{123456})
migrationContext.MigrationIterationRangeMaxValues = sql.ToColumnValues([]interface{}{11111})

applier := NewApplier(migrationContext)

t.Run("less than MigrationRangeMinValues", func(t *testing.T) {
ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{5})
require.NoError(t, err)
require.False(t, ignore)
})

t.Run("equal to MigrationIterationRangeMaxValues", func(t *testing.T) {
ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{11111})
require.NoError(t, err)
require.False(t, ignore)
})

t.Run("ignore event", func(t *testing.T) {
ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{88888})
require.NoError(t, err)
require.True(t, ignore)
})

t.Run("equal to MigrationRangeMaxValues", func(t *testing.T) {
ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{123456})
require.NoError(t, err)
require.True(t, ignore)
})

t.Run("larger than MigrationRangeMaxValues", func(t *testing.T) {
ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{123457})
require.NoError(t, err)
require.True(t, ignore)
})
}

func TestApplierInstantDDL(t *testing.T) {
migrationContext := base.NewMigrationContext()
migrationContext.DatabaseName = "test"
Expand Down
Loading
Loading