From 5136fc2e97461ba80fddfcb0bbf16b3f3f5d79e1 Mon Sep 17 00:00:00 2001 From: donkey <328021040@qq.com> Date: Wed, 26 Nov 2025 00:47:56 +0800 Subject: [PATCH 1/2] [Bug Fix] http sink NEP checkpoint restore issue --- CHANGELOG.md | 2 ++ .../connectors/http/internal/sink/HttpSinkInternal.java | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0672dcf8..4582ed8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # Changelog ## [Unreleased] +### Fixed +- Fixed http sink NEP issue when flink job restore from checkpoint. ## [0.23.0] - 2025-11-07 diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java index de37faac..174bbfa3 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java @@ -150,6 +150,11 @@ public StatefulSinkWriter> re Collection> recoveredState) throws IOException { + ElementConverter elementConverter = getElementConverter(); + if (elementConverter instanceof SchemaLifecycleAwareElementConverter) { + ((SchemaLifecycleAwareElementConverter) elementConverter).open(context); + } + return new HttpSinkWriter<>( getElementConverter(), context, From 63e7754c7f4d6293fe601bc6279f6f443b003ddc Mon Sep 17 00:00:00 2001 From: donkey <328021040@qq.com> Date: Fri, 5 Dec 2025 23:38:59 +0800 Subject: [PATCH 2/2] [Bug Fix] extract common method for schema open --- .../http/internal/sink/HttpSinkInternal.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java index 174bbfa3..8fca3085 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java @@ -117,11 +117,7 @@ protected HttpSinkInternal( public StatefulSinkWriter> createWriter( InitContext context) throws IOException { - ElementConverter elementConverter = getElementConverter(); - if (elementConverter instanceof SchemaLifecycleAwareElementConverter) { - // This cast is needed for Flink 1.15.3 build - ((SchemaLifecycleAwareElementConverter) elementConverter).open(context); - } + ElementConverter elementConverter = initElementConverterOfSchema(context); return new HttpSinkWriter<>( elementConverter, @@ -144,16 +140,21 @@ public StatefulSinkWriter> cr ); } + private ElementConverter initElementConverterOfSchema(InitContext context) { + ElementConverter elementConverter = getElementConverter(); + if (elementConverter instanceof SchemaLifecycleAwareElementConverter) { + ((SchemaLifecycleAwareElementConverter) elementConverter).open(context); + } + return elementConverter; + } + @Override public StatefulSinkWriter> restoreWriter( InitContext context, Collection> recoveredState) throws IOException { - ElementConverter elementConverter = getElementConverter(); - if (elementConverter instanceof SchemaLifecycleAwareElementConverter) { - ((SchemaLifecycleAwareElementConverter) elementConverter).open(context); - } + initElementConverterOfSchema(context); return new HttpSinkWriter<>( getElementConverter(),