diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java index 395d2a869..66524c2e0 100644 --- a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java +++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java @@ -68,18 +68,6 @@ public class IgniteSinkTask extends SinkTask { cacheName = props.get(IgniteSinkConstants.CACHE_NAME); igniteConfigFile = props.get(IgniteSinkConstants.CACHE_CFG_PATH); - if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)) - StreamerContext.getStreamer().allowOverwrite( - Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))); - - if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)) - StreamerContext.getStreamer().perNodeBufferSize( - Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))); - - if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)) - StreamerContext.getStreamer().perNodeParallelOperations( - Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))); - if (props.containsKey(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS)) { String transformerCls = props.get(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS); if (transformerCls != null && !transformerCls.isEmpty()) { @@ -96,6 +84,18 @@ public class IgniteSinkTask extends SinkTask { } } + if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)) + StreamerContext.getStreamer().allowOverwrite( + Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))); + + if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)) + StreamerContext.getStreamer().perNodeBufferSize( + Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))); + + if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)) + StreamerContext.getStreamer().perNodeParallelOperations( + Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))); + stopped = false; }