From 12cca55b7d27c1c5fcce955a217ca14506146849 Mon Sep 17 00:00:00 2001 From: zengyi Date: Sat, 24 May 2025 15:48:29 +0800 Subject: [PATCH 1/6] [FLINK-34864][cdc-connector-mysql] Add the IgnoreNoPrimaryKeyTable parameter to skip tables without primary keys in multi-table synchronization --- .../mysql/factory/MySqlDataSourceFactory.java | 4 ++ .../mysql/source/MySqlDataSourceOptions.java | 7 +++ .../debezium/reader/BinlogSplitReader.java | 3 +- .../source/assigners/MySqlChunkSplitter.java | 31 ++++++++-- .../source/config/MySqlSourceConfig.java | 9 ++- .../config/MySqlSourceConfigFactory.java | 13 ++++- .../source/config/MySqlSourceOptions.java | 11 ++++ .../mysql/source/utils/ChunkUtils.java | 27 +++++++-- .../assigners/MySqlChunkSplitterTest.java | 57 +++++++++++++++++++ 9 files changed, 150 insertions(+), 12 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index f522adfdb1a..3c1445cd32c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -71,6 +71,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_TIMEOUT; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.IGNORE_NO_PRIMARY_KEY_TABLE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.INCLUDE_COMMENTS_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.METADATA_LIST; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES; @@ -168,6 +169,7 @@ public DataSource createDataSource(Context context) { boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT); boolean isAssignUnboundedChunkFirst = config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); + boolean ignoreNoPrimaryKeyTable = config.get(IGNORE_NO_PRIMARY_KEY_TABLE); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); @@ -221,6 +223,7 @@ public DataSource createDataSource(Context context) { .treatTinyInt1AsBoolean(treatTinyInt1AsBoolean) .useLegacyJsonFormat(useLegacyJsonFormat) .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst) + .ignoreNoPrimaryKeyTable(ignoreNoPrimaryKeyTable) .skipSnapshotBackfill(skipSnapshotBackfill); List tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null); @@ -359,6 +362,7 @@ public Set> optionalOptions() { options.add(PARSE_ONLINE_SCHEMA_CHANGES); options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); + options.add(IGNORE_NO_PRIMARY_KEY_TABLE); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 6aff556e7fa..1f96796d0d6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -330,4 +330,11 @@ public class MySqlDataSourceOptions { .defaultValue(false) .withDescription( "Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially."); + + @Experimental + public static final ConfigOption IGNORE_NO_PRIMARY_KEY_TABLE = + ConfigOptions.key("ignore-no-primary-key-table") + .booleanType() + .defaultValue(false) + .withDescription("Whether to ignore tables without primary key in MySQL."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 87a435ff62b..3b3d6b071c1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -267,7 +267,8 @@ private boolean shouldEmit(SourceRecord sourceRecord) { ChunkUtils.getChunkKeyColumnType( statefulTaskContext.getDatabaseSchema().tableFor(tableId), statefulTaskContext.getSourceConfig().getChunkKeyColumns(), - statefulTaskContext.getSourceConfig().isTreatTinyInt1AsBoolean()); + statefulTaskContext.getSourceConfig().isTreatTinyInt1AsBoolean(), + statefulTaskContext.getSourceConfig()); Struct target = RecordUtils.getStructContainsChunkKey(sourceRecord); Object[] chunkKey = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java index 4821eaba2ea..efc252c57c9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java @@ -114,6 +114,10 @@ public List splitChunks(MySqlPartition partition, TableId ta throws Exception { if (!hasNextChunk()) { analyzeTable(partition, tableId); + // Skip tables without primary key + if (splitColumn == null && sourceConfig.isIgnoreNoPrimaryKeyTable()) { + return Collections.emptyList(); + } Optional> evenlySplitChunks = trySplitAllEvenlySizedChunks(partition, tableId); if (evenlySplitChunks.isPresent()) { @@ -133,6 +137,10 @@ public List splitChunks(MySqlPartition partition, TableId ta "Can not split a new table before the previous table splitting finish."); if (currentSplittingTable == null) { analyzeTable(partition, currentSplittingTableId); + // Skip tables without primary key + if (splitColumn == null && sourceConfig.isIgnoreNoPrimaryKeyTable()) { + return Collections.emptyList(); + } } synchronized (lock) { return Collections.singletonList(splitOneUnevenlySizedChunk(partition, tableId)); @@ -145,12 +153,22 @@ private void analyzeTable(MySqlPartition partition, TableId tableId) { try { currentSplittingTable = mySqlSchema.getTableSchema(partition, jdbcConnection, tableId).getTable(); - splitColumn = - ChunkUtils.getChunkKeyColumn( - currentSplittingTable, sourceConfig.getChunkKeyColumns()); + splitColumn = getChunkKeyColumn(currentSplittingTable); + if (splitColumn == null && sourceConfig.isIgnoreNoPrimaryKeyTable()) { + LOG.warn( + "Table {} doesn't have primary key and ignore-no-primary-key-table is set to true, skipping incremental snapshot.", + tableId); + currentSplittingTableId = null; + nextChunkStart = null; + nextChunkId = null; + return; + } splitType = ChunkUtils.getChunkKeyColumnType( - splitColumn, sourceConfig.isTreatTinyInt1AsBoolean()); + currentSplittingTable, + sourceConfig.getChunkKeyColumns(), + sourceConfig.isTreatTinyInt1AsBoolean(), + sourceConfig); minMaxOfSplitColumn = StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name()); approximateRowCnt = StatementUtils.queryApproximateRowCnt(jdbcConnection, tableId); @@ -479,4 +497,9 @@ public void close() throws Exception { } mySqlSchema.close(); } + + private Column getChunkKeyColumn(Table table) { + return ChunkUtils.getChunkKeyColumn( + table, sourceConfig.getChunkKeyColumns(), sourceConfig); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 260a7cd2b5d..305628b66de 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -70,6 +70,7 @@ public class MySqlSourceConfig implements Serializable { private final boolean parseOnLineSchemaChanges; public static boolean useLegacyJsonFormat = true; private final boolean assignUnboundedChunkFirst; + private final boolean ignoreNoPrimaryKeyTable; // -------------------------------------------------------------------------------------------- // Debezium Configurations @@ -108,7 +109,8 @@ public class MySqlSourceConfig implements Serializable { boolean parseOnLineSchemaChanges, boolean treatTinyInt1AsBoolean, boolean useLegacyJsonFormat, - boolean assignUnboundedChunkFirst) { + boolean assignUnboundedChunkFirst, + boolean ignoreNoPrimaryKeyTable) { this.hostname = checkNotNull(hostname); this.port = port; this.username = checkNotNull(username); @@ -152,6 +154,7 @@ public class MySqlSourceConfig implements Serializable { this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean; this.useLegacyJsonFormat = useLegacyJsonFormat; this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; + this.ignoreNoPrimaryKeyTable = ignoreNoPrimaryKeyTable; } public String getHostname() { @@ -285,4 +288,8 @@ public boolean isSkipSnapshotBackfill() { public boolean isTreatTinyInt1AsBoolean() { return treatTinyInt1AsBoolean; } + + public boolean isIgnoreNoPrimaryKeyTable() { + return ignoreNoPrimaryKeyTable; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 427115edea7..dc7db6d97b7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -74,6 +74,7 @@ public class MySqlSourceConfigFactory implements Serializable { private boolean treatTinyInt1AsBoolean = true; private boolean useLegacyJsonFormat = true; private boolean assignUnboundedChunkFirst = false; + private boolean ignoreNoPrimaryKeyTable = false; public MySqlSourceConfigFactory hostname(String hostname) { this.hostname = hostname; @@ -280,6 +281,15 @@ public MySqlSourceConfigFactory skipSnapshotBackfill(boolean skipSnapshotBackfil return this; } + /** + * Whether to ignore tables without primary key. When enabled, the connector will skip tables + * that don't have a primary key. + */ + public MySqlSourceConfigFactory ignoreNoPrimaryKeyTable(boolean ignoreNoPrimaryKeyTable) { + this.ignoreNoPrimaryKeyTable = ignoreNoPrimaryKeyTable; + return this; + } + /** * Whether to use legacy json format. The default value is true, which means there is no * whitespace before value and after comma in json format. @@ -421,6 +431,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { parseOnLineSchemaChanges, treatTinyInt1AsBoolean, useLegacyJsonFormat, - assignUnboundedChunkFirst); + assignUnboundedChunkFirst, + ignoreNoPrimaryKeyTable); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index a00d6d564ad..9229207c72a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -293,4 +293,15 @@ public class MySqlSourceOptions { .defaultValue(false) .withDescription( "Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false."); + + @Experimental + public static final ConfigOption IGNORE_NO_PRIMARY_KEY_TABLE = + ConfigOptions.key("ignore-no-primary-key-table") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to ignore tables without primary key. When enabled, the connector will skip tables " + + "that don't have a primary key. By default these tables will be processed, but for some " + + "scenarios it may be desirable to ignore them since tables without primary keys " + + "might cause performance issues during incremental snapshot."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java index 794abd2b5f9..ca360b86b5e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java @@ -23,6 +23,9 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.debezium.relational.Column; import io.debezium.relational.Table; @@ -41,12 +44,13 @@ /** Utilities to split chunks of table. */ public class ChunkUtils { + private static final Logger LOG = LoggerFactory.getLogger(ChunkUtils.class); private ChunkUtils() {} public static RowType getChunkKeyColumnType( - Table table, Map chunkKeyColumns, boolean tinyInt1isBit) { - return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumns), tinyInt1isBit); + Table table, Map chunkKeyColumns, boolean tinyInt1isBit, MySqlSourceConfig sourceConfig) { + return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumns, sourceConfig), tinyInt1isBit); } public static RowType getChunkKeyColumnType(Column chunkKeyColumn, boolean tinyInt1isBit) { @@ -62,12 +66,25 @@ public static RowType getChunkKeyColumnType(Column chunkKeyColumn, boolean tinyI * have primary keys, `chunkKeyColumn` must be set. When the parameter `chunkKeyColumn` is not * set and the table has primary keys, return the first column of primary keys. */ - public static Column getChunkKeyColumn(Table table, Map chunkKeyColumns) { + public static Column getChunkKeyColumn( + Table table, Map chunkKeyColumns, MySqlSourceConfig sourceConfig) { List primaryKeys = table.primaryKeyColumns(); String chunkKeyColumn = findChunkKeyColumn(table.id(), chunkKeyColumns); if (primaryKeys.isEmpty() && chunkKeyColumn == null) { - throw new ValidationException( - "To use incremental snapshot, 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys."); + if (sourceConfig != null && sourceConfig.isIgnoreNoPrimaryKeyTable()) { + LOG.warn( + "Table {} has no primary key and no chunk key column specified. This table will be skipped.", + table.id()); + return null; + } else { + throw new ValidationException( + String.format( + "Table %s has no primary key and no chunk key column specified. " + + "To use incremental snapshot, either: " + + "1. Set 'scan.incremental.snapshot.chunk.key-column' for this table, or " + + "2. Set 'scan.incremental.snapshot.ignore-no-primary-key-table' to true to skip tables without primary keys.", + table.id())); + } } List searchColumns = table.columns(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java index 33f50333af8..9739d61f36d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java @@ -17,11 +17,19 @@ package org.apache.flink.cdc.connectors.mysql.source.assigners; +import org.apache.flink.cdc.connectors.mysql.schema.MySqlSchema; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; +import org.apache.flink.table.catalog.ObjectPath; +import io.debezium.connector.mysql.MySqlPartition; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Column; +import io.debezium.relational.Table; import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -85,4 +93,53 @@ void testSplitEvenlySizedChunksNormal() { ChunkRange.of(2147483637, 2147483647), ChunkRange.of(2147483647, null)); } + + @Test + void testIgnoreNoPrimaryKeyTable() throws Exception { + // 创建配置,设置ignoreNoPrimaryKeyTable为true + MySqlSourceConfig sourceConfig = + new MySqlSourceConfigFactory() + .startupOptions(StartupOptions.initial()) + .databaseList("test_db") + .tableList("test_db.test_table") + .hostname("localhost") + .username("test") + .password("test") + .serverTimeZone(ZoneId.of("UTC").toString()) + .ignoreNoPrimaryKeyTable(true) + .createConfig(0); + + // 创建一个简单的MySqlSchema实现 + MySqlSchema schema = + new MySqlSchema(sourceConfig, true) { + @Override + public TableChanges.TableChange getTableSchema( + MySqlPartition partition, JdbcConnection jdbc, TableId tableId) { + // 创建一个没有主键的表 + Table noPkTable = + Table.editor() + .tableId(tableId) + .addColumn( + Column.editor() + .name("id") + .type("BIGINT") + .jdbcType(-5) + .optional(false) + .create()) + .create(); + return new TableChanges.TableChange( + TableChanges.TableChangeType.CREATE, noPkTable); + } + }; + + MySqlChunkSplitter splitter = new MySqlChunkSplitter(schema, sourceConfig); + MySqlPartition partition = new MySqlPartition("mysql_binlog_source"); + + // 测试无主键表 + List splits = + splitter.splitChunks(partition, new TableId("test_db", null, "test_table")); + + // 验证对于没有主键的表,返回空的分片列表 + Assertions.assertThat(splits).isEmpty(); + } } From 75aebed591fbe877518ded78c4df4a0dc285bd59 Mon Sep 17 00:00:00 2001 From: zengyi Date: Wed, 28 May 2025 10:51:04 +0800 Subject: [PATCH 2/6] [FLINK-34864][cdc-connector-mysql] revise details and add content to doc --- .../docs/connectors/flink-sources/mysql-cdc.md | 13 +++++++++++-- .../docs/connectors/flink-sources/mysql-cdc.md | 12 +++++++++++- .../mysql/source/MySqlDataSourceOptions.java | 3 ++- .../debezium/reader/BinlogSplitReader.java | 2 +- .../source/assigners/MySqlChunkSplitter.java | 10 ++++++++-- .../mysql/source/config/MySqlSourceOptions.java | 5 +---- .../mysql/source/utils/ChunkUtils.java | 17 ++++++++++------- .../assigners/MySqlChunkSplitterTest.java | 5 ----- 8 files changed, 44 insertions(+), 23 deletions(-) diff --git a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md index 9ce67d6432e..e2dca2812e8 100644 --- a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md @@ -424,6 +424,13 @@ Flink SQL> SELECT * FROM orders; 警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些 binlog 事件可能会被重放(仅保证 at-least-once )。 例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的 binlog 事件应进行特殊处理。 + + ignore-no-primary-key-table + optional + false + Boolean + 是否跳过没有主键的表。如果设置为true,连接器将跳过没有主键的表。 + @@ -621,8 +628,8 @@ Flink 定期为 Source 执行 checkpoint,在故障转移的情况下,作业 在执行增量快照读取时,MySQL CDC source 需要一个用于分片的的算法。 MySQL CDC Source 使用主键列将表划分为多个分片(chunk)。 默认情况下,MySQL CDC source 会识别表的主键列,并使用主键中的第一列作为用作分片列。 -如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`、 -否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。 +如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`作为分块键,或者 +设置 `ignore-no-primary-key-table` 参数为 true 以跳过没有主键的表。否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。 请注意,使用不在主键中的列作为分块键可能会降低表的查询性能。 对于数值和自动增量拆分列,MySQL CDC Source 按固定步长高效地拆分块。 @@ -820,6 +827,8 @@ $ ./bin/flink run \ * 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。 * 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。 +从 3.5.0 版本开始,MySQL 变更数据捕获(CDC)提供了一个忽略无主键表的选项。 +当 “ignore-no-primary-key-table”(忽略无主键表)设置为 “true”(真)时,连接器将跳过没有主键的表。 ### 可用的指标 指标系统能够帮助了解分片分发的进展, 下面列举出了支持的 Flink 指标 [Flink metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/): diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index 063a5b1263e..5a3cd1b2c36 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -451,6 +451,13 @@ During a snapshot operation, the connector will query each included table to pro For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially. + + ignore-no-primary-key-table + optional + false + Boolean + Whether to skip tables without primary keys. If set to true, the connector will skip tables that don't have a primary key. + @@ -656,7 +663,7 @@ Flink performs checkpoints for the source periodically, in case of failover, the When performing incremental snapshot reading, MySQL CDC source need a criterion which used to split the table. MySQL CDC Source use a splitting column to split the table to multiple splits (chunks). By default, MySQL CDC source will identify the primary key column of the table and use the first column in primary key as the splitting column. -If there is no primary key in the table, user must specify `scan.incremental.snapshot.chunk.key-column`, +If there is no primary key in the table, users must specify scan.incremental.snapshot.chunk.key-column as the chunk key, or set ignore-no-primary-key-table to true to skip tables without primary keys, otherwise incremental snapshot reading will fail and you can disable `scan.incremental.snapshot.enabled` to fallback to old snapshot reading mechanism. Please note that using a column not in primary key as a chunk key can result in slower table query performance. @@ -858,6 +865,9 @@ There are two places that need to be taken care of. * If no update operation is performed on the specified column, the exactly-once semantics is ensured. * If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness. +Starting from version 3.5.0, MySQL CDC provides an option to ignore tables without primary keys. +When `ignore-no-primary-key-table` is set to `true`, the connector will skip tables that don't have a primary key. + ### About converting binary type data to base64 encoded data ```sql diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 1f96796d0d6..f521b799652 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -336,5 +336,6 @@ public class MySqlDataSourceOptions { ConfigOptions.key("ignore-no-primary-key-table") .booleanType() .defaultValue(false) - .withDescription("Whether to ignore tables without primary key in MySQL."); + .withDescription( + "Whether to ignore tables without primary key in MySQL. When enabled, the connector will skip tables."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 3b3d6b071c1..7f596448ab0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -268,7 +268,7 @@ private boolean shouldEmit(SourceRecord sourceRecord) { statefulTaskContext.getDatabaseSchema().tableFor(tableId), statefulTaskContext.getSourceConfig().getChunkKeyColumns(), statefulTaskContext.getSourceConfig().isTreatTinyInt1AsBoolean(), - statefulTaskContext.getSourceConfig()); + statefulTaskContext.getSourceConfig().isIgnoreNoPrimaryKeyTable()); Struct target = RecordUtils.getStructContainsChunkKey(sourceRecord); Object[] chunkKey = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java index efc252c57c9..8bc4bd10490 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java @@ -116,6 +116,9 @@ public List splitChunks(MySqlPartition partition, TableId ta analyzeTable(partition, tableId); // Skip tables without primary key if (splitColumn == null && sourceConfig.isIgnoreNoPrimaryKeyTable()) { + LOG.warn( + "Table {} doesn't have primary key and ignore-no-primary-key-table is set to true, skipping incremental snapshot.", + tableId); return Collections.emptyList(); } Optional> evenlySplitChunks = @@ -139,6 +142,9 @@ public List splitChunks(MySqlPartition partition, TableId ta analyzeTable(partition, currentSplittingTableId); // Skip tables without primary key if (splitColumn == null && sourceConfig.isIgnoreNoPrimaryKeyTable()) { + LOG.warn( + "Table {} doesn't have primary key and ignore-no-primary-key-table is set to true, skipping incremental snapshot.", + currentSplittingTableId); return Collections.emptyList(); } } @@ -168,7 +174,7 @@ private void analyzeTable(MySqlPartition partition, TableId tableId) { currentSplittingTable, sourceConfig.getChunkKeyColumns(), sourceConfig.isTreatTinyInt1AsBoolean(), - sourceConfig); + sourceConfig.isIgnoreNoPrimaryKeyTable()); minMaxOfSplitColumn = StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name()); approximateRowCnt = StatementUtils.queryApproximateRowCnt(jdbcConnection, tableId); @@ -500,6 +506,6 @@ public void close() throws Exception { private Column getChunkKeyColumn(Table table) { return ChunkUtils.getChunkKeyColumn( - table, sourceConfig.getChunkKeyColumns(), sourceConfig); + table, sourceConfig.getChunkKeyColumns(), sourceConfig.isIgnoreNoPrimaryKeyTable()); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index 9229207c72a..73ae1d887fe 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -300,8 +300,5 @@ public class MySqlSourceOptions { .booleanType() .defaultValue(false) .withDescription( - "Whether to ignore tables without primary key. When enabled, the connector will skip tables " - + "that don't have a primary key. By default these tables will be processed, but for some " - + "scenarios it may be desirable to ignore them since tables without primary keys " - + "might cause performance issues during incremental snapshot."); + "Whether to ignore tables without primary key in MySQL. When enabled, the connector will skip tables."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java index ca360b86b5e..43a13f2e6a8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java @@ -23,14 +23,13 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; -import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import io.debezium.relational.Column; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.Tables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -49,8 +48,12 @@ public class ChunkUtils { private ChunkUtils() {} public static RowType getChunkKeyColumnType( - Table table, Map chunkKeyColumns, boolean tinyInt1isBit, MySqlSourceConfig sourceConfig) { - return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumns, sourceConfig), tinyInt1isBit); + Table table, + Map chunkKeyColumns, + boolean tinyInt1isBit, + boolean ignoreNoPrimaryKeyTable) { + return getChunkKeyColumnType( + getChunkKeyColumn(table, chunkKeyColumns, ignoreNoPrimaryKeyTable), tinyInt1isBit); } public static RowType getChunkKeyColumnType(Column chunkKeyColumn, boolean tinyInt1isBit) { @@ -67,11 +70,11 @@ public static RowType getChunkKeyColumnType(Column chunkKeyColumn, boolean tinyI * set and the table has primary keys, return the first column of primary keys. */ public static Column getChunkKeyColumn( - Table table, Map chunkKeyColumns, MySqlSourceConfig sourceConfig) { + Table table, Map chunkKeyColumns, boolean ignoreNoPrimaryKeyTable) { List primaryKeys = table.primaryKeyColumns(); String chunkKeyColumn = findChunkKeyColumn(table.id(), chunkKeyColumns); if (primaryKeys.isEmpty() && chunkKeyColumn == null) { - if (sourceConfig != null && sourceConfig.isIgnoreNoPrimaryKeyTable()) { + if (ignoreNoPrimaryKeyTable) { LOG.warn( "Table {} has no primary key and no chunk key column specified. This table will be skipped.", table.id()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java index 9739d61f36d..2320fec2817 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java @@ -22,7 +22,6 @@ import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; -import org.apache.flink.table.catalog.ObjectPath; import io.debezium.connector.mysql.MySqlPartition; import io.debezium.jdbc.JdbcConnection; @@ -96,7 +95,6 @@ void testSplitEvenlySizedChunksNormal() { @Test void testIgnoreNoPrimaryKeyTable() throws Exception { - // 创建配置,设置ignoreNoPrimaryKeyTable为true MySqlSourceConfig sourceConfig = new MySqlSourceConfigFactory() .startupOptions(StartupOptions.initial()) @@ -109,7 +107,6 @@ void testIgnoreNoPrimaryKeyTable() throws Exception { .ignoreNoPrimaryKeyTable(true) .createConfig(0); - // 创建一个简单的MySqlSchema实现 MySqlSchema schema = new MySqlSchema(sourceConfig, true) { @Override @@ -135,11 +132,9 @@ public TableChanges.TableChange getTableSchema( MySqlChunkSplitter splitter = new MySqlChunkSplitter(schema, sourceConfig); MySqlPartition partition = new MySqlPartition("mysql_binlog_source"); - // 测试无主键表 List splits = splitter.splitChunks(partition, new TableId("test_db", null, "test_table")); - // 验证对于没有主键的表,返回空的分片列表 Assertions.assertThat(splits).isEmpty(); } } From c7e38e53e7c2b219b296f26fe86898ac8272eda5 Mon Sep 17 00:00:00 2001 From: zengyi Date: Tue, 3 Jun 2025 10:36:36 +0800 Subject: [PATCH 3/6] [FLINK-34864][cdc-connector-mysql] Fix the bug regarding the automatic table creation of tables without primary keys and the issue of incremental data synchronization --- .../source/reader/MySqlPipelineRecordEmitter.java | 10 ++++++++++ .../mysql/debezium/reader/BinlogSplitReader.java | 6 ++++++ .../connectors/mysql/table/MySqlConnectorITCase.java | 8 ++++++++ 3 files changed, 24 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index 007333b0745..aa2b2d5e0b6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -132,6 +132,11 @@ protected void processElement( private void sendCreateTableEvent( JdbcConnection jdbc, TableId tableId, SourceOutput output) { Schema schema = getSchema(jdbc, tableId); + // Check if table has primary key and ignore-no-primary-key-table is enabled + if (schema.primaryKeys().isEmpty() && sourceConfig.isIgnoreNoPrimaryKeyTable()) { + LOG.warn("Table {} has no primary key and ignore-no-primary-key-table is set to true, skipping table creation.", tableId); + return; + } output.collect( new CreateTableEvent( org.apache.flink.cdc.common.event.TableId.tableId( @@ -263,6 +268,11 @@ private Map generateCreateTableEvent( jdbc, sourceConfig.getDatabaseFilter(), sourceConfig.getTableFilter()); for (TableId tableId : capturedTableIds) { Schema schema = getSchema(jdbc, tableId); + // Skip tables without primary keys if ignore-no-primary-key-table is enabled + if (schema.primaryKeys().isEmpty() && sourceConfig.isIgnoreNoPrimaryKeyTable()) { + LOG.warn("Table {} has no primary key and ignore-no-primary-key-table is set to true, skipping table creation.", tableId); + continue; + } createTableEventCache.put( tableId, new CreateTableEvent( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 7f596448ab0..182f2216299 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -249,6 +249,12 @@ private Optional parseOnLineSchemaChangeEvent(SourceRecord sourceR private boolean shouldEmit(SourceRecord sourceRecord) { if (RecordUtils.isDataChangeRecord(sourceRecord)) { TableId tableId = RecordUtils.getTableId(sourceRecord); + // Skip events for tables without primary keys if ignore-no-primary-key-table is enabled + if (statefulTaskContext.getSourceConfig().isIgnoreNoPrimaryKeyTable() + && statefulTaskContext.getDatabaseSchema().tableFor(tableId).primaryKeyColumns().isEmpty()) { + LOG.warn("Table {} has no primary key and ignore-no-primary-key-table is set to true, skipping binlog event.", tableId); + return false; + } if (pureBinlogPhaseTables.contains(tableId)) { return true; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java index 7cc5fe5861e..5752d502a3d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -2445,4 +2445,12 @@ public void testReadChangelogAppendOnly(boolean incrementalSnapshot) throws Exce Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); result.getJobClient().get().cancel().get(); } + + @ParameterizedTest(name = "incrementalSnapshot = {0}") + @ValueSource(booleans = {true, false}) + void testNoPKTableWithIgnoreNoPrimaryKeyTable(boolean incrementalSnapshot) throws Exception { + setup(incrementalSnapshot); + runConsumingForNoPKTableTest( + ", 'scan.incremental.snapshot.ignore-no-primary-key-table'='true'", incrementalSnapshot); + } } From 67e9891578104f306a14dc3946366d4da85c6901 Mon Sep 17 00:00:00 2001 From: zengyi Date: Tue, 10 Jun 2025 17:44:00 +0800 Subject: [PATCH 4/6] [FLINK-34864][cdc-connector-mysql] Fix the bug regarding the automatic table creation of tables without primary keys and the issue of incremental data synchronization --- .../source/reader/MySqlPipelineRecordEmitter.java | 8 ++++++-- .../mysql/debezium/reader/BinlogSplitReader.java | 12 +++++++++--- .../connectors/mysql/table/MySqlConnectorITCase.java | 3 ++- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index aa2b2d5e0b6..2d4371da841 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -134,7 +134,9 @@ private void sendCreateTableEvent( Schema schema = getSchema(jdbc, tableId); // Check if table has primary key and ignore-no-primary-key-table is enabled if (schema.primaryKeys().isEmpty() && sourceConfig.isIgnoreNoPrimaryKeyTable()) { - LOG.warn("Table {} has no primary key and ignore-no-primary-key-table is set to true, skipping table creation.", tableId); + LOG.warn( + "Table {} has no primary key and ignore-no-primary-key-table is set to true, skipping table creation.", + tableId); return; } output.collect( @@ -270,7 +272,9 @@ private Map generateCreateTableEvent( Schema schema = getSchema(jdbc, tableId); // Skip tables without primary keys if ignore-no-primary-key-table is enabled if (schema.primaryKeys().isEmpty() && sourceConfig.isIgnoreNoPrimaryKeyTable()) { - LOG.warn("Table {} has no primary key and ignore-no-primary-key-table is set to true, skipping table creation.", tableId); + LOG.warn( + "Table {} has no primary key and ignore-no-primary-key-table is set to true, skipping table creation.", + tableId); continue; } createTableEventCache.put( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 182f2216299..7ff67b878fd 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -250,9 +250,15 @@ private boolean shouldEmit(SourceRecord sourceRecord) { if (RecordUtils.isDataChangeRecord(sourceRecord)) { TableId tableId = RecordUtils.getTableId(sourceRecord); // Skip events for tables without primary keys if ignore-no-primary-key-table is enabled - if (statefulTaskContext.getSourceConfig().isIgnoreNoPrimaryKeyTable() - && statefulTaskContext.getDatabaseSchema().tableFor(tableId).primaryKeyColumns().isEmpty()) { - LOG.warn("Table {} has no primary key and ignore-no-primary-key-table is set to true, skipping binlog event.", tableId); + if (statefulTaskContext.getSourceConfig().isIgnoreNoPrimaryKeyTable() + && statefulTaskContext + .getDatabaseSchema() + .tableFor(tableId) + .primaryKeyColumns() + .isEmpty()) { + LOG.warn( + "Table {} has no primary key and ignore-no-primary-key-table is set to true, skipping binlog event.", + tableId); return false; } if (pureBinlogPhaseTables.contains(tableId)) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java index 5752d502a3d..472b355a84c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -2451,6 +2451,7 @@ public void testReadChangelogAppendOnly(boolean incrementalSnapshot) throws Exce void testNoPKTableWithIgnoreNoPrimaryKeyTable(boolean incrementalSnapshot) throws Exception { setup(incrementalSnapshot); runConsumingForNoPKTableTest( - ", 'scan.incremental.snapshot.ignore-no-primary-key-table'='true'", incrementalSnapshot); + ", 'scan.incremental.snapshot.ignore-no-primary-key-table'='true'", + incrementalSnapshot); } } From 57aceab6b28eefc50b7626ad2ffd2935fae3b086 Mon Sep 17 00:00:00 2001 From: zengyi Date: Wed, 11 Jun 2025 10:30:24 +0800 Subject: [PATCH 5/6] [FLINK-34864][cdc-connector-mysql] Fix the unit test --- .../source/assigners/MySqlSnapshotSplitAssignerTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index d9755559357..18d13fe39af 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -404,7 +404,10 @@ void testTableWithoutPrimaryKey() { new String[] {tableWithoutPrimaryKey}); }) .hasStackTraceContaining( - "To use incremental snapshot, 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys."); + "Table " + customerDatabase.getDatabaseName() + "." + tableWithoutPrimaryKey + " has no primary key and no chunk key column specified. " + + "To use incremental snapshot, either: " + + "1. Set 'scan.incremental.snapshot.chunk.key-column' for this table, or " + + "2. Set 'scan.incremental.snapshot.ignore-no-primary-key-table' to true to skip tables without primary keys."); } @Test From d4bc59a8937512b3ba0185fe31e359797682a509 Mon Sep 17 00:00:00 2001 From: zengyi Date: Wed, 11 Jun 2025 10:34:38 +0800 Subject: [PATCH 6/6] [FLINK-34864][cdc-connector-mysql] Fix the unit test --- .../assigners/MySqlSnapshotSplitAssignerTest.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 18d13fe39af..bd166900dff 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -404,10 +404,14 @@ void testTableWithoutPrimaryKey() { new String[] {tableWithoutPrimaryKey}); }) .hasStackTraceContaining( - "Table " + customerDatabase.getDatabaseName() + "." + tableWithoutPrimaryKey + " has no primary key and no chunk key column specified. " - + "To use incremental snapshot, either: " - + "1. Set 'scan.incremental.snapshot.chunk.key-column' for this table, or " - + "2. Set 'scan.incremental.snapshot.ignore-no-primary-key-table' to true to skip tables without primary keys."); + "Table " + + customerDatabase.getDatabaseName() + + "." + + tableWithoutPrimaryKey + + " has no primary key and no chunk key column specified. " + + "To use incremental snapshot, either: " + + "1. Set 'scan.incremental.snapshot.chunk.key-column' for this table, or " + + "2. Set 'scan.incremental.snapshot.ignore-no-primary-key-table' to true to skip tables without primary keys."); } @Test