diff --git a/CHANGELOG.md b/CHANGELOG.md index 0672dcf..4582ed8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # Changelog ## [Unreleased] +### Fixed +- Fixed http sink NEP issue when flink job restore from checkpoint. ## [0.23.0] - 2025-11-07 diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java index de37faa..8fca308 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java @@ -117,11 +117,7 @@ protected HttpSinkInternal( public StatefulSinkWriter> createWriter( InitContext context) throws IOException { - ElementConverter elementConverter = getElementConverter(); - if (elementConverter instanceof SchemaLifecycleAwareElementConverter) { - // This cast is needed for Flink 1.15.3 build - ((SchemaLifecycleAwareElementConverter) elementConverter).open(context); - } + ElementConverter elementConverter = initElementConverterOfSchema(context); return new HttpSinkWriter<>( elementConverter, @@ -144,12 +140,22 @@ public StatefulSinkWriter> cr ); } + private ElementConverter initElementConverterOfSchema(InitContext context) { + ElementConverter elementConverter = getElementConverter(); + if (elementConverter instanceof SchemaLifecycleAwareElementConverter) { + ((SchemaLifecycleAwareElementConverter) elementConverter).open(context); + } + return elementConverter; + } + @Override public StatefulSinkWriter> restoreWriter( InitContext context, Collection> recoveredState) throws IOException { + initElementConverterOfSchema(context); + return new HttpSinkWriter<>( getElementConverter(), context,