diff --git a/go/logic/applier.go b/go/logic/applier.go index 15443590d..e58835d6c 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -542,6 +542,29 @@ func (this *Applier) UnlockTables() error { return nil } +func (this *Applier) RenameTablesMySQL8() error { + query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds) + if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { + return err + } + + query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetOldTableName()), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + ) + log.Infof("Issuing and expecting this to succeed: %s", query) + if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { + return err + } + return nil +} + // SwapTablesQuickAndBumpy issues a two-step swap table operation: // - rename original table to _old // - rename ghost table to original @@ -977,62 +1000,6 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (result return append(results, newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))) } -// ApplyDMLEventQuery writes an entry to the ghost table, in response to an intercepted -// original-table binlog event -func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error { - for _, buildResult := range this.buildDMLEventQuery(dmlEvent) { - if buildResult.err != nil { - return buildResult.err - } - // TODO The below is in preparation for transactional writes on the ghost tables. - // Such writes would be, for example: - // - prepended with sql_mode setup - // - prepended with time zone setup - // - prepended with SET SQL_LOG_BIN=0 - // - prepended with SET FK_CHECKS=0 - // etc. - // - // a known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql - // is solved by silently converting unsigned bigints to string values. - // - - err := func() error { - tx, err := this.db.Begin() - if err != nil { - return err - } - rollback := func(err error) error { - tx.Rollback() - return err - } - sessionQuery := fmt.Sprintf("SET SESSION time_zone = '+00:00'") - if !this.migrationContext.SkipStrictMode { - sessionQuery += ", sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')" - } - if _, err := tx.Exec(sessionQuery); err != nil { - return rollback(err) - } - if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil { - return rollback(err) - } - if err := tx.Commit(); err != nil { - return err - } - return nil - }() - - if err != nil { - err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), buildResult.query, buildResult.args) - return log.Errore(err) - } - // no error - atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1) - if this.migrationContext.CountTableRows { - atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, buildResult.rowsDelta) - } - } - return nil -} // ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 86de5ac15..fe2a30ffa 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -518,19 +518,23 @@ func (this *Migrator) cutOver() (err error) { } } } - if this.migrationContext.CutOverType == base.CutOverAtomic { + var cutOverFunc func() error + if !mysql.IsSmallerMinorVersion(this.migrationContext.ApplierMySQLVersion, "8.0.13") { + // This is MySQL 8.0 or above. We can utilize a new ALTER TABLE featiure that supports + // RENAME while the table is locked. + cutOverFunc = this.cutOverMySQL8013 + } else if this.migrationContext.CutOverType == base.CutOverAtomic { // Atomic solution: we use low timeout and multiple attempts. But for // each failed attempt, we throttle until replication lag is back to normal - err := this.atomicCutOver() - this.handleCutOverResult(err) - return err - } - if this.migrationContext.CutOverType == base.CutOverTwoStep { - err := this.cutOverTwoStep() - this.handleCutOverResult(err) - return err + cutOverFunc = this.atomicCutOver + } else if this.migrationContext.CutOverType == base.CutOverTwoStep { + cutOverFunc = this.cutOverTwoStep + } else { + return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } - return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) + err = cutOverFunc() + this.handleCutOverResult(err) + return err } // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, @@ -573,6 +577,34 @@ func (this *Migrator) waitForEventsUpToLock() (err error) { return nil } +// cutOverMySQL8013 utilizes a new deveopment starting MySQL 8.0.13 where RENAME TABLE is +// possible where a table is LOCKED under WRITE LOCK. +// This feature was developed specifically at the request of the `gh-ost` maintainers. +func (this *Migrator) cutOverMySQL8013() (err error) { + atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) + defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) + atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0) + + if err := this.retryOperation(this.applier.LockOriginalTable); err != nil { + return err + } + + if err := this.retryOperation(this.waitForEventsUpToLock); err != nil { + return err + } + if err := this.retryOperation(this.applier.RenameTablesMySQL8); err != nil { + return err + } + if err := this.retryOperation(this.applier.UnlockTables); err != nil { + return err + } + + lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) + renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) + log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) + return nil +} + // cutOverTwoStep will lock down the original table, execute // what's left of last DML entries, and **non-atomically** swap original->old, then new->original. // There is a point in time where the "original" table does not exist and queries are non-blocked diff --git a/go/logic/server.go b/go/logic/server.go index 774c4ab21..4c1f5817e 100644 --- a/go/logic/server.go +++ b/go/logic/server.go @@ -144,7 +144,7 @@ func (this *Server) applyServerCommand(command string, writer *bufio.Writer) (pr switch command { case "help": { - fmt.Fprintln(writer, `available commands: + fmt.Fprint(writer, `available commands: status # Print a detailed status message sup # Print a short status message coordinates # Print the currently inspected coordinates diff --git a/go/mysql/utils.go b/go/mysql/utils.go index 17bb5fc32..0b0c64aa0 100644 --- a/go/mysql/utils.go +++ b/go/mysql/utils.go @@ -8,6 +8,7 @@ package mysql import ( gosql "database/sql" "fmt" + "strconv" "strings" "sync" "time" @@ -203,3 +204,47 @@ func GetTableColumns(db *gosql.DB, databaseName, tableName string) (*sql.ColumnL } return sql.NewColumnList(columnNames), sql.NewColumnList(virtualColumnNames), nil } + +func versionTokens(version string, digits int) []int { + v := strings.Split(version, "-")[0] + tokens := strings.Split(v, ".") + intTokens := make([]int, digits) + for i := range tokens { + if i >= digits { + break + } + intTokens[i], _ = strconv.Atoi(tokens[i]) + } + return intTokens +} + +func isSmallerVersion(version string, otherVersion string, digits int) bool { + v := versionTokens(version, digits) + o := versionTokens(otherVersion, digits) + for i := 0; i < len(v); i++ { + if v[i] < o[i] { + return true + } + if v[i] > o[i] { + return false + } + if i == digits { + break + } + } + return false +} + +// IsSmallerMajorVersion tests two versions against another and returns true if +// the former is a smaller "major" varsion than the latter. +// e.g. 5.5.36 is NOT a smaller major version as comapred to 5.5.40, but IS as compared to 5.6.9 +func IsSmallerMajorVersion(version string, otherVersion string) bool { + return isSmallerVersion(version, otherVersion, 2) +} + +// IsSmallerMinorVersion tests two versions against another and returns true if +// the former is a smaller "minor" varsion than the latter. +// e.g. 5.5.36 is a smaller major version as comapred to 5.5.40, as well as compared to 5.6.7 +func IsSmallerMinorVersion(version string, otherVersion string) bool { + return isSmallerVersion(version, otherVersion, 3) +} diff --git a/go/mysql/utils_test.go b/go/mysql/utils_test.go new file mode 100644 index 000000000..f74fc0dcf --- /dev/null +++ b/go/mysql/utils_test.go @@ -0,0 +1,56 @@ +/* + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package mysql + +import ( + "reflect" + "testing" + + "github.com/outbrain/golib/log" + test "github.com/outbrain/golib/tests" +) + +func init() { + log.SetLevel(log.ERROR) +} + +func TestVersionTokens(t *testing.T) { + test.S(t).ExpectTrue(reflect.DeepEqual(versionTokens("5.7.24-log", 3), []int{5, 7, 24})) + test.S(t).ExpectTrue(reflect.DeepEqual(versionTokens("8.0.13", 3), []int{8, 0, 13})) + test.S(t).ExpectTrue(reflect.DeepEqual(versionTokens("5.5", 2), []int{5, 5})) + test.S(t).ExpectTrue(reflect.DeepEqual(versionTokens("5.5", 3), []int{5, 5, 0})) + test.S(t).ExpectTrue(reflect.DeepEqual(versionTokens("5.5-log", 3), []int{5, 5, 0})) +} + +func TestIsSmallerMajorVersion(t *testing.T) { + i55 := "5.5" + i5516 := "5.5.16" + i5517 := "5.5.17" + i56 := "5.6" + i8013 := "8.0.13" + + test.S(t).ExpectFalse(IsSmallerMajorVersion(i55, i5517)) + test.S(t).ExpectFalse(IsSmallerMajorVersion(i5516, i5517)) + test.S(t).ExpectFalse(IsSmallerMajorVersion(i56, i5517)) + test.S(t).ExpectTrue(IsSmallerMajorVersion(i55, i56)) + test.S(t).ExpectTrue(IsSmallerMajorVersion(i56, i8013)) + test.S(t).ExpectFalse(IsSmallerMajorVersion(i8013, i56)) +} + +func TestIsSmallerMinorVersion(t *testing.T) { + i55 := "5.5" + i5516 := "5.5.16" + i5517 := "5.5.17" + i56 := "5.6" + i8013 := "8.0.13" + + test.S(t).ExpectTrue(IsSmallerMinorVersion(i55, i5517)) + test.S(t).ExpectTrue(IsSmallerMinorVersion(i5516, i5517)) + test.S(t).ExpectFalse(IsSmallerMinorVersion(i56, i5517)) + test.S(t).ExpectTrue(IsSmallerMinorVersion(i55, i56)) + test.S(t).ExpectTrue(IsSmallerMinorVersion(i56, i8013)) + test.S(t).ExpectFalse(IsSmallerMinorVersion(i8013, i56)) +}