diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java index cb06cc45a6d..9cd40f01530 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java @@ -38,6 +38,7 @@ import org.apache.flink.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState; import org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState; import org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer; +import org.apache.flink.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator; @@ -251,6 +252,13 @@ public SplitEnumerator restoreEnumerator( enumContext.currentParallelism(), (HybridPendingSplitsState) checkpoint, enumContext); + } else if (checkpoint instanceof SnapshotPendingSplitsState) { + splitAssigner = + new MySqlSnapshotSplitAssigner( + sourceConfig, + enumContext.currentParallelism(), + (SnapshotPendingSplitsState) checkpoint, + enumContext); } else if (checkpoint instanceof BinlogPendingSplitsState) { splitAssigner = new MySqlBinlogSplitAssigner( diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java index 2a875d9f774..82a0a9ff3f7 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java @@ -18,6 +18,8 @@ package org.apache.flink.cdc.pipeline.tests.migration; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.cdc.common.test.utils.TestUtils; import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; @@ -32,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.file.Path; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; @@ -276,6 +279,78 @@ void testStartingJobFromSavepointWithSchemaChange(TarballFetcher.CdcVersion migr cancelJob(newJobID); } + @ParameterizedTest(name = "{0} -> SNAPSHOT") + @EnumSource(names = {"SNAPSHOT"}) + void testRestartingJobFromSavepointInSnapshotMode(TarballFetcher.CdcVersion migrateFromVersion) + throws Exception { + TarballFetcher.fetch(jobManager, migrateFromVersion); + runInContainerAsRoot(jobManager, "chmod", "0777", "-R", "/tmp/cdc/"); + Path udfJar = TestUtils.getResource("udf-examples.jar"); + LOG.info("Successfully fetched CDC {}.", migrateFromVersion); + String content = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: %d\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + " scan.startup.mode: snapshot\n" + + " scan.incremental.snapshot.chunk.size: 2\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "\n" + + "transform:\n" + + " - source-table: %s.\\.*\n" + + " projection: \\*, throttle(id, 10) AS throttled\n" + + "\n" + + "pipeline:\n" + + " parallelism: %d\n" + + " user-defined-function:\n" + + " - name: throttle\n" + + " classpath: org.apache.flink.cdc.udf.examples.java.ThrottlerFunctionClass\n", + INTER_CONTAINER_MYSQL_ALIAS, + MySqlContainer.MYSQL_PORT, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName(), + mysqlInventoryDatabase.getDatabaseName(), + parallelism); + JobID jobID = submitPipelineJob(migrateFromVersion, content, udfJar); + Assertions.assertThat(jobID).isNotNull(); + LOG.info("Submitted Job ID is {} ", jobID); + waitUntilJobState(Duration.ofSeconds(30), JobStatus.RUNNING); + + String savepointPath = stopJobWithSavepoint(jobID); + LOG.info("Stopped Job {} and created a savepoint at {}.", jobID, savepointPath); + + JobID newJobID = submitPipelineJob(content, savepointPath, true, udfJar); + LOG.info("Reincarnated Job {} has been submitted successfully.", newJobID); + validateResult( + dbNameFormatter, + "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512),`throttled` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING,`throttled` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234, throttled_101], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234, throttled_102], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234, throttled_103], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234, throttled_104], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, throttled_101], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}, throttled_102], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}, throttled_103], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}, throttled_104], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}, throttled_105], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null, throttled_106], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null, throttled_107], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null, throttled_108], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null, throttled_109], op=INSERT, meta=()}"); + + LOG.info("Snapshot stage finished successfully."); + } + private void generateIncrementalEventsPhaseOne() { executeMySqlStatements( mysqlInventoryDatabase, diff --git a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ThrottlerFunctionClass.java b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ThrottlerFunctionClass.java new file mode 100644 index 00000000000..aef1ab95e76 --- /dev/null +++ b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ThrottlerFunctionClass.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.udf.examples.java; + +import org.apache.flink.cdc.common.udf.UserDefinedFunction; + +/** This is an example UDF class for slowing down the pipeline deliberately. */ +public class ThrottlerFunctionClass implements UserDefinedFunction { + public String eval(Object o, int seconds) { + try { + Thread.sleep(seconds * 1000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return "throttled_" + o; + } +}