Skip to content

Commit dd48d5f

Browse files
committed
Add testRestartingJobFromSavepointInSnapshotMode
Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
1 parent 31e47a9 commit dd48d5f

File tree

3 files changed

+109
-2
lines changed

3 files changed

+109
-2
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@
3636
import org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner;
3737
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.BinlogPendingSplitsState;
3838
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState;
39-
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
4039
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
4140
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer;
41+
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
4242
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
4343
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
4444
import org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
@@ -258,7 +258,7 @@ public SplitEnumerator<MySqlSplit, PendingSplitsState> restoreEnumerator(
258258
sourceConfig,
259259
enumContext.currentParallelism(),
260260
(SnapshotPendingSplitsState) checkpoint,
261-
enumContext);
261+
enumContext);
262262
} else if (checkpoint instanceof BinlogPendingSplitsState) {
263263
splitAssigner =
264264
new MySqlBinlogSplitAssigner(

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.flink.cdc.pipeline.tests.migration;
1919

2020
import org.apache.flink.api.common.JobID;
21+
import org.apache.flink.api.common.JobStatus;
22+
import org.apache.flink.cdc.common.test.utils.TestUtils;
2123
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
2224
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
2325
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
@@ -32,6 +34,7 @@
3234
import org.slf4j.Logger;
3335
import org.slf4j.LoggerFactory;
3436

37+
import java.nio.file.Path;
3538
import java.sql.Connection;
3639
import java.sql.DriverManager;
3740
import java.sql.SQLException;
@@ -276,6 +279,78 @@ void testStartingJobFromSavepointWithSchemaChange(TarballFetcher.CdcVersion migr
276279
cancelJob(newJobID);
277280
}
278281

282+
@ParameterizedTest(name = "{0} -> SNAPSHOT")
283+
@EnumSource(names = {"SNAPSHOT"})
284+
void testRestartingJobFromSavepointInSnapshotMode(TarballFetcher.CdcVersion migrateFromVersion)
285+
throws Exception {
286+
TarballFetcher.fetch(jobManager, migrateFromVersion);
287+
runInContainerAsRoot(jobManager, "chmod", "0777", "-R", "/tmp/cdc/");
288+
Path udfJar = TestUtils.getResource("udf-examples.jar");
289+
LOG.info("Successfully fetched CDC {}.", migrateFromVersion);
290+
String content =
291+
String.format(
292+
"source:\n"
293+
+ " type: mysql\n"
294+
+ " hostname: %s\n"
295+
+ " port: %d\n"
296+
+ " username: %s\n"
297+
+ " password: %s\n"
298+
+ " tables: %s.\\.*\n"
299+
+ " server-id: 5400-5404\n"
300+
+ " server-time-zone: UTC\n"
301+
+ " scan.startup.mode: snapshot\n"
302+
+ " scan.incremental.snapshot.chunk.size: 2\n"
303+
+ "\n"
304+
+ "sink:\n"
305+
+ " type: values\n"
306+
+ "\n"
307+
+ "transform:\n"
308+
+ " - source-table: %s.\\.*\n"
309+
+ " projection: \\*, throttle(id, 10) AS throttled\n"
310+
+ "\n"
311+
+ "pipeline:\n"
312+
+ " parallelism: %d\n"
313+
+ " user-defined-function:\n"
314+
+ " - name: throttle\n"
315+
+ " classpath: org.apache.flink.cdc.udf.examples.java.ThrottlerFunctionClass\n",
316+
INTER_CONTAINER_MYSQL_ALIAS,
317+
MySqlContainer.MYSQL_PORT,
318+
MYSQL_TEST_USER,
319+
MYSQL_TEST_PASSWORD,
320+
mysqlInventoryDatabase.getDatabaseName(),
321+
mysqlInventoryDatabase.getDatabaseName(),
322+
parallelism);
323+
JobID jobID = submitPipelineJob(migrateFromVersion, content, udfJar);
324+
Assertions.assertThat(jobID).isNotNull();
325+
LOG.info("Submitted Job ID is {} ", jobID);
326+
waitUntilJobState(Duration.ofSeconds(30), JobStatus.RUNNING);
327+
328+
String savepointPath = stopJobWithSavepoint(jobID);
329+
LOG.info("Stopped Job {} and created a savepoint at {}.", jobID, savepointPath);
330+
331+
JobID newJobID = submitPipelineJob(content, savepointPath, true, udfJar);
332+
LOG.info("Reincarnated Job {} has been submitted successfully.", newJobID);
333+
validateResult(
334+
dbNameFormatter,
335+
"CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512),`throttled` STRING}, primaryKeys=id, options=()}",
336+
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING,`throttled` STRING}, primaryKeys=id, options=()}",
337+
"DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234, throttled_101], op=INSERT, meta=()}",
338+
"DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234, throttled_102], op=INSERT, meta=()}",
339+
"DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234, throttled_103], op=INSERT, meta=()}",
340+
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234, throttled_104], op=INSERT, meta=()}",
341+
"DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, throttled_101], op=INSERT, meta=()}",
342+
"DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}, throttled_102], op=INSERT, meta=()}",
343+
"DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}, throttled_103], op=INSERT, meta=()}",
344+
"DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}, throttled_104], op=INSERT, meta=()}",
345+
"DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}, throttled_105], op=INSERT, meta=()}",
346+
"DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null, throttled_106], op=INSERT, meta=()}",
347+
"DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null, throttled_107], op=INSERT, meta=()}",
348+
"DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null, throttled_108], op=INSERT, meta=()}",
349+
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null, throttled_109], op=INSERT, meta=()}");
350+
351+
LOG.info("Snapshot stage finished successfully.");
352+
}
353+
279354
private void generateIncrementalEventsPhaseOne() {
280355
executeMySqlStatements(
281356
mysqlInventoryDatabase,
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.udf.examples.java;
19+
20+
import org.apache.flink.cdc.common.udf.UserDefinedFunction;
21+
22+
/** This is an example UDF class for slowing down the pipeline deliberately. */
23+
public class ThrottlerFunctionClass implements UserDefinedFunction {
24+
public String eval(Object o, int seconds) {
25+
try {
26+
Thread.sleep(seconds * 1000L);
27+
} catch (InterruptedException e) {
28+
throw new RuntimeException(e);
29+
}
30+
return "throttled_" + o;
31+
}
32+
}

0 commit comments

Comments
 (0)