Skip to content
Draft
45 changes: 45 additions & 0 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1595,6 +1595,51 @@ func (s *Spec) GetCreateTableSql(tableName string) (string, error) {

return createSql, nil
}
func (s *Spec) ModifyPartitionProperty(destTableName string, batchModifyPartitionsInfo *record.BatchModifyPartitionsInfo) error {
if batchModifyPartitionsInfo == nil || len(batchModifyPartitionsInfo.Infos) == 0 {
log.Warnf("empty partition infos, skip modify partition property")
return nil
}

dbName := utils.FormatKeywordName(s.Database)
destTableName = utils.FormatKeywordName(destTableName)

var lastErr error
successCount := 0
for _, partitionInfo := range batchModifyPartitionsInfo.Infos {
if partitionInfo.DataProperty == nil || partitionInfo.DataProperty.StorageMedium == "" {
log.Warnf("partition %d has no storage medium, skip modify partition property", partitionInfo.PartitionId)
continue
}

sql := fmt.Sprintf("ALTER TABLE %s.%s MODIFY PARTITION %s SET (\"storage_medium\" = \"%s\")",
dbName, destTableName, utils.FormatKeywordName(partitionInfo.PartitionName), partitionInfo.DataProperty.StorageMedium)

log.Infof("modify partition property sql: %s", sql)
if err := s.Exec(sql); err != nil {
errMsg := err.Error()
// Skip if partition not found (partition may have been dropped)
if strings.Contains(errMsg, "does not exist") || strings.Contains(errMsg, "not found") {
log.Warnf("partition %s not found, skip: %v", partitionInfo.PartitionName, err)
continue
}
// For other errors, record and continue to try remaining partitions
log.Warnf("modify partition %s property failed: %v", partitionInfo.PartitionName, err)
lastErr = err
} else {
successCount++
}
}

// Return error if any partition modification failed (except partition not found)
if lastErr != nil {
return xerror.Wrapf(lastErr, xerror.Normal,
"modify partition storage medium failed, success: %d, total: %d",
successCount, len(batchModifyPartitionsInfo.Infos))
}

return nil
}

// Determine whether the error are network related, eg connection refused, connection reset, exposed from net packages.
func isNetworkRelated(err error) bool {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccr/base/specer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Specer interface {
AddPartition(destTableName string, addPartition *record.AddPartition) error
DropPartition(destTableName string, dropPartition *record.DropPartition) error
RenamePartition(destTableName, oldPartition, newPartition string) error
ModifyPartitionProperty(destTableName string, batchModifyPartitionsInfo *record.BatchModifyPartitionsInfo) error

LightningIndexChange(tableAlias string, changes *record.ModifyTableAddOrDropInvertedIndices) error
BuildIndex(tableAlias string, buildIndex *record.IndexChangeJob) error
Expand Down
101 changes: 95 additions & 6 deletions pkg/ccr/handle/create_table.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package handle

import (
"fmt"
"regexp"
"strings"

"github.com/selectdb/ccr_syncer/pkg/ccr"
Expand All @@ -18,6 +20,70 @@ type CreateTableHandle struct {
IdempotentJobHandle[*record.CreateTable]
}

// Check if error message indicates storage medium or capacity related issues
func isStorageMediumError(errMsg string) bool {
// Doris returns "Failed to find enough backend" for storage/capacity issues
return strings.Contains(strings.ToLower(errMsg), "failed to find enough backend")
}

// Set specific property in CREATE TABLE SQL
func setPropertyInCreateTableSql(createSql string, key string, value string) string {
// Add property to PROPERTIES clause
pattern := `(?i)(PROPERTIES\s*\(\s*)`
replacement := fmt.Sprintf(`${1}"%s" = "%s", `, key, value)
createSql = regexp.MustCompile(pattern).ReplaceAllString(createSql, replacement)

// Clean up trailing comma if PROPERTIES was empty
return ccr.FilterTailingCommaFromCreateTableSql(createSql)
}

// Set specific storage_medium in CREATE TABLE SQL
func setStorageMediumInCreateTableSql(createSql string, medium string) string {
// Remove existing storage_medium first
createSql = ccr.FilterStorageMediumFromCreateTableSql(createSql)
return setPropertyInCreateTableSql(createSql, "storage_medium", medium)
}

// Set specific medium_allocation_mode in CREATE TABLE SQL
func setMediumAllocationModeInCreateTableSql(createSql string, mode string) string {
// Remove existing medium_allocation_mode first
createSql = ccr.FilterMediumAllocationModeFromCreateTableSql(createSql)
return setPropertyInCreateTableSql(createSql, "medium_allocation_mode", mode)
}

// Process CREATE TABLE SQL according to storage medium policy
func processCreateTableSqlByMediumPolicy(j *ccr.Job, createTable *record.CreateTable) {
storageMedium := j.StorageMedium
mediumAllocationMode := j.MediumAllocationMode

// Process storage_medium
switch storageMedium {
case ccr.StorageMediumSameWithUpstream:
// Keep upstream storage_medium unchanged
log.Infof("using same_with_upstream storage medium, keeping original storage_medium")

case ccr.StorageMediumHDD:
log.Infof("using hdd storage medium, setting storage_medium to hdd")
createTable.Sql = setStorageMediumInCreateTableSql(createTable.Sql, "hdd")

case ccr.StorageMediumSSD:
log.Infof("using ssd storage medium, setting storage_medium to ssd")
createTable.Sql = setStorageMediumInCreateTableSql(createTable.Sql, "ssd")

default:
log.Warnf("unknown storage medium: %s, falling back to filter storage_medium", storageMedium)
if ccr.FeatureFilterStorageMedium {
createTable.Sql = ccr.FilterStorageMediumFromCreateTableSql(createTable.Sql)
}
}

// Process medium_allocation_mode from CCR job parameter
if mediumAllocationMode != "" {
log.Infof("setting medium_allocation_mode to %s", mediumAllocationMode)
createTable.Sql = setMediumAllocationModeInCreateTableSql(createTable.Sql, mediumAllocationMode)
}
}

func (h *CreateTableHandle) Handle(j *ccr.Job, commitSeq int64, createTable *record.CreateTable) error {
if j.SyncType != ccr.DBSync {
return xerror.Errorf(xerror.Normal, "invalid sync type: %v", j.SyncType)
Expand Down Expand Up @@ -68,33 +134,56 @@ func (h *CreateTableHandle) Handle(j *ccr.Job, commitSeq int64, createTable *rec
}
}

if ccr.FeatureFilterStorageMedium {
createTable.Sql = ccr.FilterStorageMediumFromCreateTableSql(createTable.Sql)
}
// Process SQL according to storage medium policy
processCreateTableSqlByMediumPolicy(j, createTable)
createTable.Sql = ccr.FilterDynamicPartitionStoragePolicyFromCreateTableSql(createTable.Sql)

// Execute create table
if err := j.IDest.CreateTableOrView(createTable, j.Src.Database); err != nil {
errMsg := err.Error()

// Skip unsupported features
if strings.Contains(errMsg, "Can not found function") {
log.Warnf("skip creating table/view because the UDF function is not supported yet: %s", errMsg)
return nil
} else if strings.Contains(errMsg, "Can not find resource") {
}
if strings.Contains(errMsg, "Can not find resource") {
log.Warnf("skip creating table/view for the resource is not supported yet: %s", errMsg)
return nil
} else if createTable.IsCreateView() && strings.Contains(errMsg, "Unknown column") {
}

// Trigger partial snapshot for recoverable errors
if createTable.IsCreateView() && strings.Contains(errMsg, "Unknown column") {
log.Warnf("create view but the column is not found, trigger partial snapshot, commit seq: %d, msg: %s",
commitSeq, errMsg)
replace := false // new view no need to replace
isView := true
return j.NewPartialSnapshot(createTable.TableId, createTable.TableName, nil, replace, isView)
}
if len(createTable.TableName) > 0 && ccr.IsSessionVariableRequired(errMsg) { // ignore doris 2.0.3
if len(createTable.TableName) > 0 && ccr.IsSessionVariableRequired(errMsg) {
log.Infof("a session variable is required to create table %s, force partial snapshot, commit seq: %d, msg: %s",
createTable.TableName, commitSeq, errMsg)
replace := false // new table no need to replace
isView := false
return j.NewPartialSnapshot(createTable.TableId, createTable.TableName, nil, replace, isView)
}

// Storage medium related error: pause job and require manual intervention
if isStorageMediumError(errMsg) {
log.Errorf("create table %s failed due to storage medium issue, job will be paused. "+
"Current storage_medium=%s. Please check target cluster resources or update storage_medium via API. Error: %s",
createTable.TableName, j.StorageMedium, errMsg)
return xerror.Panicf(xerror.Normal,
"Create table failed: storage medium issue for table %s. "+
"Current storage_medium=%s. Possible causes:\n"+
"1. Storage medium (%s) not available on target cluster\n"+
"2. Insufficient disk capacity\n"+
"3. Replication number exceeds available BE nodes\n"+
"Please check target cluster configuration or update storage_medium via /update_storage_medium API. "+
"Original error: %s",
createTable.TableName, j.StorageMedium, j.StorageMedium, errMsg)
}

return xerror.Wrapf(err, xerror.Normal, "create table %d", createTable.TableId)
}

Expand Down
105 changes: 105 additions & 0 deletions pkg/ccr/handle/modify_partitions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package handle

import (
"strings"

"github.com/selectdb/ccr_syncer/pkg/ccr"
"github.com/selectdb/ccr_syncer/pkg/ccr/record"
festruct "github.com/selectdb/ccr_syncer/pkg/rpc/kitex_gen/frontendservice"
"github.com/selectdb/ccr_syncer/pkg/xerror"
log "github.com/sirupsen/logrus"
)

func init() {
ccr.RegisterJobHandle[*record.BatchModifyPartitionsInfo](festruct.TBinlogType_MODIFY_PARTITIONS, &ModifyPartitionsHandle{})
}

type ModifyPartitionsHandle struct {
// The modify partitions binlog is idempotent
IdempotentJobHandle[*record.BatchModifyPartitionsInfo]
}

// Filter partitions that have storage medium changes and are not temporary partitions
func filterStorageMediumChanges(infos []*record.ModifyPartitionInfo) []*record.ModifyPartitionInfo {
filtered := make([]*record.ModifyPartitionInfo, 0)
for _, info := range infos {
// Skip temporary partitions (they are not synced)
if info.IsTempPartition {
log.Debugf("skip temporary partition %d", info.PartitionId)
continue
}
// Only process partitions with storage medium specified
if info.DataProperty != nil && info.DataProperty.StorageMedium != "" {
filtered = append(filtered, info)
}
}
return filtered
}

func (h *ModifyPartitionsHandle) Handle(j *ccr.Job, commitSeq int64, batchModifyPartitionsInfo *record.BatchModifyPartitionsInfo) error {
// Skip if using fixed storage medium (hdd/ssd)
// Only process when using same_with_upstream storage medium
if j.StorageMedium == ccr.StorageMediumHDD ||
j.StorageMedium == ccr.StorageMediumSSD {
log.Infof("skip modify partitions for storage_medium is fixed to %s", j.StorageMedium)
return nil
}

// Safety check: ensure we have partition infos to process
if batchModifyPartitionsInfo == nil || len(batchModifyPartitionsInfo.Infos) == 0 {
log.Warnf("batch modify partitions info is empty or nil, skip")
return nil
}

// Filter to only process storage medium changes
filteredInfos := filterStorageMediumChanges(batchModifyPartitionsInfo.Infos)
if len(filteredInfos) == 0 {
log.Infof("no storage medium changes in modify partitions binlog, skip")
return nil
}
log.Infof("processing %d partition storage medium changes out of %d total modifications",
len(filteredInfos), len(batchModifyPartitionsInfo.Infos))

// Update to use filtered infos
batchModifyPartitionsInfo.Infos = filteredInfos

// Get table ID from the first partition info (all partitions should belong to the same table)
tableId := batchModifyPartitionsInfo.GetTableId()
if tableId <= 0 {
log.Warnf("invalid table ID: %d, skip modify partitions", tableId)
return nil
}

// Check if it's a materialized view table
if isAsyncMv, err := j.IsMaterializedViewTable(tableId); err != nil {
return err
} else if isAsyncMv {
log.Infof("skip modify partitions for materialized view table %d", tableId)
return nil
}

// Get destination table name
destTableName, err := j.GetDestNameBySrcId(tableId)
if err != nil {
errMsg := err.Error()
// If table not found in mapping, it may not be synced yet or already dropped
if strings.Contains(errMsg, "not found") || strings.Contains(errMsg, "does not exist") {
log.Warnf("table %d not found in dest, skip modify partitions: %v", tableId, err)
return nil
}
// Other errors (network, etc.): return error to retry
return xerror.Wrapf(err, xerror.Normal, "failed to get dest table name for table %d", tableId)
}

// Execute modify partition property
// Note: ModifyPartition only updates FE metadata, actual BE storage migration is async
// So this SQL won't fail due to backend resource issues
if err := j.Dest.ModifyPartitionProperty(destTableName, batchModifyPartitionsInfo); err != nil {
// Return error to let job framework retry (network issues, etc.)
return xerror.Wrapf(err, xerror.Normal, "modify partition storage medium failed for table %s", destTableName)
}

log.Infof("successfully modified storage medium for %d partitions in table %s",
len(filteredInfos), destTableName)
return nil
}
Loading