From 5c6f496156938e02a09035482188f892ff95fcc0 Mon Sep 17 00:00:00 2001 From: Vedran Ljubovic Date: Tue, 6 Jun 2023 11:27:59 +0200 Subject: [PATCH] IGNITE-19459 Fix dropping messages when extractor isn't initialized --- .../stream/kafka/connect/IgniteSinkTask.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java index 395d2a869..66524c2e0 100644 --- a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java +++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java @@ -68,18 +68,6 @@ public class IgniteSinkTask extends SinkTask { cacheName = props.get(IgniteSinkConstants.CACHE_NAME); igniteConfigFile = props.get(IgniteSinkConstants.CACHE_CFG_PATH); - if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)) - StreamerContext.getStreamer().allowOverwrite( - Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))); - - if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)) - StreamerContext.getStreamer().perNodeBufferSize( - Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))); - - if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)) - StreamerContext.getStreamer().perNodeParallelOperations( - Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))); - if (props.containsKey(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS)) { String transformerCls = props.get(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS); if (transformerCls != null && !transformerCls.isEmpty()) { @@ -96,6 +84,18 @@ public class IgniteSinkTask extends SinkTask { } } + if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)) + StreamerContext.getStreamer().allowOverwrite( + Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))); + + if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)) + StreamerContext.getStreamer().perNodeBufferSize( + Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))); + + if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)) + StreamerContext.getStreamer().perNodeParallelOperations( + Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))); + stopped = false; }