From 2dad504a48d7600993346c501273a33d1214f325 Mon Sep 17 00:00:00 2001 From: Vedran Ljubovic Date: Tue, 6 Jun 2023 11:42:37 +0200 Subject: [PATCH 1/2] IGNITE-19459 Fix dropping messages when extractor isn't initialized (#1) --- .../stream/kafka/connect/IgniteSinkTask.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java index 395d2a869..66524c2e0 100644 --- a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java +++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java @@ -68,18 +68,6 @@ public class IgniteSinkTask extends SinkTask { cacheName = props.get(IgniteSinkConstants.CACHE_NAME); igniteConfigFile = props.get(IgniteSinkConstants.CACHE_CFG_PATH); - if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)) - StreamerContext.getStreamer().allowOverwrite( - Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))); - - if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)) - StreamerContext.getStreamer().perNodeBufferSize( - Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))); - - if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)) - StreamerContext.getStreamer().perNodeParallelOperations( - Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))); - if (props.containsKey(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS)) { String transformerCls = props.get(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS); if (transformerCls != null && !transformerCls.isEmpty()) { @@ -96,6 +84,18 @@ public class IgniteSinkTask extends SinkTask { } } + if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)) + StreamerContext.getStreamer().allowOverwrite( + Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))); + + if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)) + StreamerContext.getStreamer().perNodeBufferSize( + Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))); + + if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)) + StreamerContext.getStreamer().perNodeParallelOperations( + Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))); + stopped = false; } From fdc7df5edec87dbd56202310bd5719041652689c Mon Sep 17 00:00:00 2001 From: Vedran Ljubovic Date: Tue, 6 Jun 2023 12:16:03 +0200 Subject: [PATCH 2/2] IGNITE-19458 Allow multiple caches in Kafka Connect IgniteSink --- .../stream/kafka/connect/IgniteSinkTask.java | 47 +++++++------------ 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java index 395d2a869..304e864c2 100644 --- a/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java +++ b/modules/kafka-ext/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java @@ -45,10 +45,13 @@ public class IgniteSinkTask extends SinkTask { private static String igniteConfigFile; /** Cache name. */ - private static String cacheName; + private String cacheName; /** Entry transformer. */ - private static StreamSingleTupleExtractor extractor; + private StreamSingleTupleExtractor extractor; + + /** Data streamer instance. */ + private IgniteDataStreamer streamer; /** {@inheritDoc} */ @Override public String version() { @@ -61,23 +64,21 @@ public class IgniteSinkTask extends SinkTask { * @param props Task properties. */ @Override public void start(Map props) { - // Each task has the same parameters -- avoid setting more than once. - if (cacheName != null) - return; - cacheName = props.get(IgniteSinkConstants.CACHE_NAME); igniteConfigFile = props.get(IgniteSinkConstants.CACHE_CFG_PATH); + streamer = IgniteContext.getIgnite().dataStreamer(cacheName); + if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)) - StreamerContext.getStreamer().allowOverwrite( + streamer.allowOverwrite( Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))); if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)) - StreamerContext.getStreamer().perNodeBufferSize( + streamer.perNodeBufferSize( Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))); if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)) - StreamerContext.getStreamer().perNodeParallelOperations( + streamer.perNodeParallelOperations( Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))); if (props.containsKey(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS)) { @@ -111,11 +112,11 @@ public class IgniteSinkTask extends SinkTask { // Data is flushed asynchronously when CACHE_PER_NODE_DATA_SIZE is reached. if (extractor != null) { Map.Entry entry = extractor.extract(record); - StreamerContext.getStreamer().addData(entry.getKey(), entry.getValue()); + streamer.addData(entry.getKey(), entry.getValue()); } else { if (record.key() != null) { - StreamerContext.getStreamer().addData(record.key(), record.value()); + streamer.addData(record.key(), record.value()); } else { log.error("Failed to stream a record with null key!"); @@ -139,7 +140,7 @@ public class IgniteSinkTask extends SinkTask { if (stopped) return; - StreamerContext.getStreamer().flush(); + streamer.flush(); } /** @@ -151,7 +152,7 @@ public class IgniteSinkTask extends SinkTask { stopped = true; - StreamerContext.getIgnite().close(); + IgniteContext.getIgnite().close(); } /** @@ -161,25 +162,20 @@ public class IgniteSinkTask extends SinkTask { */ protected static void setStopped(boolean stopped) { IgniteSinkTask.stopped = stopped; - - extractor = null; } /** - * Streamer context initializing grid and data streamer instances on demand. + * Ignite context initializing grid instance on demand. */ - public static class StreamerContext { + public static class IgniteContext { /** Constructor. */ - private StreamerContext() { + private IgniteContext() { } /** Instance holder. */ private static class Holder { /** */ private static final Ignite IGNITE = Ignition.start(igniteConfigFile); - - /** */ - private static final IgniteDataStreamer STREAMER = IGNITE.dataStreamer(cacheName); } /** @@ -190,14 +186,5 @@ private static class Holder { public static Ignite getIgnite() { return Holder.IGNITE; } - - /** - * Obtains data streamer instance. - * - * @return Data streamer instance. - */ - public static IgniteDataStreamer getStreamer() { - return Holder.STREAMER; - } } }