From fb434e683a9d6b5cc36600ba281c4bc7de97a709 Mon Sep 17 00:00:00 2001 From: Maksim Davydov Date: Wed, 7 May 2025 00:10:29 +0300 Subject: [PATCH 1/2] IGNITE-25309 Compilation fix after IGNITE-15803, IGNITE-24786, IGNITE-24846 --- .../optimized/OptimizedMarshallerAopTest.java | 99 ------------------- .../AsymmetricKeyEncryptionService.java | 7 +- .../SymmetricKeyEncryptionService.java | 28 +++++- .../stream/camel/IgniteCamelStreamerTest.java | 8 +- .../kafka/KafkaToIgniteMetadataUpdater.java | 40 +++++++- .../HibernateL2CacheMultiJvmTest.java | 4 +- .../stream/mqtt/IgniteMqttStreamerTest.java | 23 ++--- .../spark/JavaEmbeddedIgniteRDDSelfTest.java | 16 ++- ...beddedIgniteRDDWithLocalStoreSelfTest.java | 12 ++- .../JavaStandaloneIgniteRDDSelfTest.java | 19 +++- .../CacheTopologyValidatorPluginProvider.java | 5 +- 11 files changed, 114 insertions(+), 147 deletions(-) delete mode 100644 modules/aop-ext/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerAopTest.java diff --git a/modules/aop-ext/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerAopTest.java b/modules/aop-ext/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerAopTest.java deleted file mode 100644 index 0a0efddcf..000000000 --- a/modules/aop-ext/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerAopTest.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.marshaller.optimized; - -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.compute.gridify.Gridify; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.events.Event; -import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.junit.Test; - -import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED; - -/** - * Test use GridOptimizedMarshaller and AspectJ AOP. - * - * The following configuration needs to be applied to enable AspectJ byte code - * weaving. - * - */ -public class OptimizedMarshallerAopTest extends GridCommonAbstractTest { - /** */ - private static final AtomicInteger cntr = new AtomicInteger(); - - /** - * Constructs a test. - */ - public OptimizedMarshallerAopTest() { - super(false /* start grid. */); - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - IgniteConfiguration cfg = new IgniteConfiguration(); - - cfg.setMarshaller(new OptimizedMarshaller()); - - G.start(cfg); - - assert G.allGrids().size() == 1; - } - - /** - * JUnit. - * - * @throws Exception If failed. - */ - @Test - public void testUp() throws Exception { - G.ignite().events().localListen(new IgnitePredicate() { - @Override public boolean apply(Event evt) { - cntr.incrementAndGet(); - - return true; - } - }, EVT_TASK_FINISHED); - - gridify1(); - - assertEquals("Method gridify() wasn't executed on grid.", 1, cntr.get()); - } - - /** - * Method grid-enabled with {@link org.apache.ignite.compute.gridify.Gridify} annotation. - *

- * Note that default {@code Gridify} configuration is used, so this method - * will be executed on remote node with the same argument. - */ - @Gridify - private void gridify1() { - X.println("Executes on grid"); - } -} diff --git a/modules/aws-ext/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/s3/encrypt/AsymmetricKeyEncryptionService.java b/modules/aws-ext/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/s3/encrypt/AsymmetricKeyEncryptionService.java index f85a9061b..551e0f1ed 100644 --- a/modules/aws-ext/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/s3/encrypt/AsymmetricKeyEncryptionService.java +++ b/modules/aws-ext/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/s3/encrypt/AsymmetricKeyEncryptionService.java @@ -23,9 +23,10 @@ import javax.crypto.Cipher; import javax.crypto.IllegalBlockSizeException; import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.internal.S; +import static org.apache.ignite.spi.discovery.tcp.ipfinder.s3.encrypt.SymmetricKeyEncryptionService.createCipher; + /** * Provides an implementation of asymmetric encryption to encrypt/decrypt the data. */ @@ -66,8 +67,8 @@ public void setKeyPair(KeyPair keyPair) { if (publicKey == null) throw new IgniteException("Public key was not set / was set to null."); - encCipher = IgniteUtils.createCipher(privateKey, Cipher.ENCRYPT_MODE); - decCipher = IgniteUtils.createCipher(publicKey, Cipher.DECRYPT_MODE); + encCipher = createCipher(privateKey, Cipher.ENCRYPT_MODE); + decCipher = createCipher(publicKey, Cipher.DECRYPT_MODE); } /** {@inheritDoc} */ diff --git a/modules/aws-ext/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/s3/encrypt/SymmetricKeyEncryptionService.java b/modules/aws-ext/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/s3/encrypt/SymmetricKeyEncryptionService.java index 89f040270..1b444d76f 100644 --- a/modules/aws-ext/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/s3/encrypt/SymmetricKeyEncryptionService.java +++ b/modules/aws-ext/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/s3/encrypt/SymmetricKeyEncryptionService.java @@ -17,12 +17,14 @@ package org.apache.ignite.spi.discovery.tcp.ipfinder.s3.encrypt; +import java.security.InvalidKeyException; import java.security.Key; +import java.security.NoSuchAlgorithmException; import javax.crypto.BadPaddingException; import javax.crypto.Cipher; import javax.crypto.IllegalBlockSizeException; +import javax.crypto.NoSuchPaddingException; import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.internal.S; /** @@ -55,8 +57,8 @@ public SymmetricKeyEncryptionService setSecretKey(Key secretKey) { if (secretKey == null) throw new IgniteException("Secret key was not set / was set to null."); - encCipher = IgniteUtils.createCipher(secretKey, Cipher.ENCRYPT_MODE); - decCipher = IgniteUtils.createCipher(secretKey, Cipher.DECRYPT_MODE); + encCipher = createCipher(secretKey, Cipher.ENCRYPT_MODE); + decCipher = createCipher(secretKey, Cipher.DECRYPT_MODE); } /** {@inheritDoc} */ @@ -95,4 +97,24 @@ public SymmetricKeyEncryptionService setSecretKey(Key secretKey) { @Override public String toString() { return S.toString(SymmetricKeyEncryptionService.class, this, "super", super.toString()); } + + /** + * @param key Cipher Key. + * @param encMode Enc mode see {@link Cipher#ENCRYPT_MODE}, {@link Cipher#DECRYPT_MODE}, etc. + */ + public static Cipher createCipher(Key key, int encMode) { + if (key == null) + throw new IgniteException("Cipher Key cannot be null"); + + try { + Cipher cipher = Cipher.getInstance(key.getAlgorithm()); + + cipher.init(encMode, key); + + return cipher; + } + catch (NoSuchAlgorithmException | NoSuchPaddingException | InvalidKeyException e) { + throw new IgniteException(e); + } + } } diff --git a/modules/camel-ext/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java b/modules/camel-ext/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java index 06e581e51..efe817cff 100644 --- a/modules/camel-ext/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java +++ b/modules/camel-ext/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java @@ -47,9 +47,7 @@ import org.apache.ignite.events.CacheEvent; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.util.lang.GridMapEntry; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteBiPredicate; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.stream.StreamMultipleTupleExtractor; import org.apache.ignite.stream.StreamSingleTupleExtractor; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -376,11 +374,7 @@ private static StreamMultipleTupleExtractor multipleT final Map answer = new HashMap<>(); - F.forEach(map.keySet(), new IgniteInClosure() { - @Override public void apply(String s) { - answer.put(Integer.parseInt(s), map.get(s)); - } - }); + map.keySet().forEach(s -> answer.put(Integer.parseInt(s), map.get(s))); return answer; } diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java index 236dc2f71..fc44c17d3 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java @@ -18,10 +18,12 @@ package org.apache.ignite.cdc.kafka; import java.time.Duration; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -31,6 +33,7 @@ import org.apache.ignite.cdc.TypeMapping; import org.apache.ignite.internal.binary.BinaryContext; import org.apache.ignite.internal.binary.BinaryMetadata; +import org.apache.ignite.internal.binary.BinaryUtils; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; @@ -43,6 +46,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.VoidDeserializer; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.registerBinaryMeta; import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.registerMapping; @@ -137,7 +141,7 @@ public synchronized void updateMetadata() { // (stored in 'offsets' field). If there are no offsets changes, polling cycle is skipped. Map offsets0 = cnsmr.endOffsets(parts, Duration.ofMillis(kafkaReqTimeout)); - if (!F.isEmpty(offsets0) && F.eqNotOrdered(offsets, offsets0)) { + if (!F.isEmpty(offsets0) && eqNotOrdered(offsets, offsets0)) { if (log.isDebugEnabled()) log.debug("Offsets unchanged, poll skipped"); @@ -202,4 +206,38 @@ else if (data instanceof TypeMapping) @Override public String toString() { return S.toString(KafkaToIgniteMetadataUpdater.class, this); } + + /** + * Compares two maps. Unlike {@code java.util.AbstractMap#equals(...)} method this implementation + * checks not only entry sets, but also the keys. Some optimization checks are also used. + * + * @param m1 First map to check. + * @param m2 Second map to check + * @param Collection elements key. + * @param Collection elements value. + * @return {@code True} is maps are equal, {@code False} otherwise. + */ + public static boolean eqNotOrdered(@Nullable Map m1, @Nullable Map m2) { + if (m1 == m2) + return true; + + if (m1 == null || m2 == null) + return false; + + if (m1.size() != m2.size()) + return false; + + for (Map.Entry e : m1.entrySet()) { + V v1 = e.getValue(); + V v2 = m2.get(e.getKey()); + + if (v1 == v2) + continue; + + if (v1 == null || v2 == null) + return false; + } + + return true; + } } diff --git a/modules/hibernate-ext/hibernate/src/test/java/org/apache/ignite/cache/hibernate/HibernateL2CacheMultiJvmTest.java b/modules/hibernate-ext/hibernate/src/test/java/org/apache/ignite/cache/hibernate/HibernateL2CacheMultiJvmTest.java index 3fb51df90..17d3b3de1 100644 --- a/modules/hibernate-ext/hibernate/src/test/java/org/apache/ignite/cache/hibernate/HibernateL2CacheMultiJvmTest.java +++ b/modules/hibernate-ext/hibernate/src/test/java/org/apache/ignite/cache/hibernate/HibernateL2CacheMultiJvmTest.java @@ -25,7 +25,6 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; @@ -37,6 +36,7 @@ import org.hibernate.boot.MetadataSources; import org.hibernate.boot.registry.StandardServiceRegistryBuilder; import org.junit.Test; + import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -60,8 +60,6 @@ public class HibernateL2CacheMultiJvmTest extends GridCommonAbstractTest { cacheConfiguration(Entity3.class.getName()) ); - cfg.setMarshaller(new BinaryMarshaller()); - cfg.setPeerClassLoadingEnabled(false); return cfg; diff --git a/modules/mqtt-ext/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt-ext/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java index 89530f34a..f76c2c1ed 100644 --- a/modules/mqtt-ext/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java +++ b/modules/mqtt-ext/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java @@ -27,6 +27,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import com.github.rholder.retry.StopStrategies; import com.github.rholder.retry.WaitStrategies; import com.google.common.base.Splitter; @@ -42,9 +43,7 @@ import org.apache.ignite.events.CacheEvent; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.util.lang.GridMapEntry; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteBiPredicate; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.stream.StreamMultipleTupleExtractor; import org.apache.ignite.stream.StreamSingleTupleExtractor; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -534,18 +533,12 @@ private void sendMessages(final List topics, int fromIdx, int cnt, boole final List sbs = new ArrayList<>(topics.size()); // initialize String Builders for each topic - F.forEach(topics, new IgniteInClosure() { - @Override public void apply(String s) { - sbs.add(new StringBuilder()); - } - }); + topics.forEach(t -> sbs.add(new StringBuilder())); // fill String Builders for each topic - F.forEach(F.range(fromIdx, fromIdx + cnt), new IgniteInClosure() { - @Override public void apply(Integer integer) { - sbs.get(integer % topics.size()).append(integer.toString() + "," + TEST_DATA.get(integer) + "\n"); - } - }); + IntStream.range(fromIdx, fromIdx + cnt) + .forEach(integer -> + sbs.get(integer % topics.size()).append(integer).append(",").append(TEST_DATA.get(integer)).append("\n")); // send each buffer out for (int i = 0; i < topics.size(); i++) { @@ -633,11 +626,7 @@ public static StreamMultipleTupleExtractor multipl final Map answer = new HashMap<>(); - F.forEach(map.keySet(), new IgniteInClosure() { - @Override public void apply(String s) { - answer.put(Integer.parseInt(s), map.get(s)); - } - }); + map.keySet().forEach(s -> answer.put(Integer.parseInt(s), map.get(s))); return answer; } diff --git a/modules/spark-ext/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java b/modules/spark-ext/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java index fbd4363ed..fa3478f47 100644 --- a/modules/spark-ext/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java +++ b/modules/spark-ext/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java @@ -19,11 +19,12 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.spark.SparkConf; @@ -128,8 +129,10 @@ public void testStoreDataToIgnite() throws Exception { try { ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false); + List keyList = IntStream.range(0, KEYS_CNT).boxed().collect(Collectors.toList()); + ic.fromCache(PARTITIONED_CACHE_NAME) - .savePairs(sc.parallelize(F.range(0, KEYS_CNT), GRID_CNT).mapToPair(TO_PAIR_F), true, false); + .savePairs(sc.parallelize(keyList, GRID_CNT).mapToPair(TO_PAIR_F), true, false); Ignite ignite = ic.ignite(); @@ -200,7 +203,10 @@ public void testQueryObjectsFromIgnite() throws Exception { JavaIgniteRDD cache = ic.fromCache(PARTITIONED_CACHE_NAME); int cnt = 1001; - cache.savePairs(sc.parallelize(F.range(0, cnt), GRID_CNT).mapToPair(INT_TO_ENTITY_F), true, false); + + List cntList = IntStream.range(0, cnt).boxed().collect(Collectors.toList()); + + cache.savePairs(sc.parallelize(cntList, GRID_CNT).mapToPair(INT_TO_ENTITY_F), true, false); List res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000) .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect(); @@ -238,7 +244,9 @@ public void testQueryFieldsFromIgnite() throws Exception { JavaIgniteRDD cache = ic.fromCache(PARTITIONED_CACHE_NAME); - cache.savePairs(sc.parallelize(F.range(0, 1001), GRID_CNT).mapToPair(INT_TO_ENTITY_F), true, false); + List cntList = IntStream.range(0, 1001).boxed().collect(Collectors.toList()); + + cache.savePairs(sc.parallelize(cntList, GRID_CNT).mapToPair(INT_TO_ENTITY_F), true, false); Dataset df = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000); diff --git a/modules/spark-ext/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDWithLocalStoreSelfTest.java b/modules/spark-ext/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDWithLocalStoreSelfTest.java index 2f13d25dd..d24acd645 100644 --- a/modules/spark-ext/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDWithLocalStoreSelfTest.java +++ b/modules/spark-ext/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDWithLocalStoreSelfTest.java @@ -17,8 +17,11 @@ package org.apache.ignite.spark; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import javax.cache.Cache; import javax.cache.configuration.FactoryBuilder; import org.apache.ignite.Ignite; @@ -26,7 +29,6 @@ import org.apache.ignite.cache.store.CacheStoreAdapter; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.spark.SparkConf; @@ -117,14 +119,18 @@ public void testStoreDataToIgniteWithOptionSkipStore() throws Exception { for (int i = 0; i < 1000; i++) storeMap.put(i, i); + List cntList = IntStream.range(1000, 2000).boxed().collect(Collectors.toList()); + ic.fromCache(PARTITIONED_CACHE_NAME) - .savePairs(sc.parallelize(F.range(1000, 2000), GRID_CNT).mapToPair(SIMPLE_FUNCTION), true, false); + .savePairs(sc.parallelize(cntList, GRID_CNT).mapToPair(SIMPLE_FUNCTION), true, false); for (int i = 0; i < 2000; i++) assertEquals(i, storeMap.get(i)); + cntList = IntStream.range(2000, 3000).boxed().collect(Collectors.toList()); + ic.fromCache(PARTITIONED_CACHE_NAME) - .savePairs(sc.parallelize(F.range(2000, 3000), GRID_CNT).mapToPair(SIMPLE_FUNCTION), true, true); + .savePairs(sc.parallelize(cntList, GRID_CNT).mapToPair(SIMPLE_FUNCTION), true, true); for (int i = 2000; i < 3000; i++) assertNull(storeMap.get(i)); diff --git a/modules/spark-ext/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java b/modules/spark-ext/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java index 828daf0af..acfc690ca 100644 --- a/modules/spark-ext/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java +++ b/modules/spark-ext/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java @@ -20,12 +20,13 @@ import java.lang.reflect.Field; import java.math.BigDecimal; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -127,8 +128,10 @@ public void testStoreDataToIgnite() throws Exception { try { JavaIgniteContext ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); + List keyList = IntStream.range(0, KEYS_CNT).boxed().collect(Collectors.toList()); + ic.fromCache(ENTITY_CACHE_NAME) - .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 2).mapToPair(TO_PAIR_F)); + .savePairs(sc.parallelize(keyList, 2).mapToPair(TO_PAIR_F)); Ignite ignite = Ignition.ignite("grid-0"); @@ -188,7 +191,9 @@ public void testQueryObjectsFromIgnite() throws Exception { JavaIgniteRDD cache = ic.fromCache(ENTITY_CACHE_NAME); - cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F)); + List cntList = IntStream.range(0, 1001).boxed().collect(Collectors.toList()); + + cache.savePairs(sc.parallelize(cntList, 2).mapToPair(INT_TO_ENTITY_F)); List res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000) .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect(); @@ -216,7 +221,9 @@ public void testQueryFieldsFromIgnite() throws Exception { JavaIgniteRDD cache = ic.fromCache(ENTITY_CACHE_NAME); - cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F)); + List cntList = IntStream.range(0, 1001).boxed().collect(Collectors.toList()); + + cache.savePairs(sc.parallelize(cntList, 2).mapToPair(INT_TO_ENTITY_F)); Dataset df = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000); @@ -264,7 +271,9 @@ public void testAllFieldsTypes() throws Exception { JavaIgniteRDD cache = ic.fromCache(ENTITY_ALL_TYPES_CACHE_NAME); - cache.savePairs(sc.parallelize(F.range(0, cnt), 2).mapToPair(INT_TO_ENTITY_ALL_FIELDS_F)); + List cntList = IntStream.range(0, 1001).boxed().collect(Collectors.toList()); + + cache.savePairs(sc.parallelize(cntList, 2).mapToPair(INT_TO_ENTITY_ALL_FIELDS_F)); EntityTestAllTypeFields e = new EntityTestAllTypeFields(cnt / 2); for (Field f : EntityTestAllTypeFields.class.getDeclaredFields()) { diff --git a/modules/topology-validator-ext/src/main/java/org/apache/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java b/modules/topology-validator-ext/src/main/java/org/apache/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java index b34ac2d79..e30925861 100644 --- a/modules/topology-validator-ext/src/main/java/org/apache/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java +++ b/modules/topology-validator-ext/src/main/java/org/apache/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener; import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher; import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty; +import org.apache.ignite.internal.util.lang.ClusterNodeFunc; import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.LT; @@ -351,7 +352,7 @@ private class TopologyChangedEventListener implements DiscoveryEventListener, Hi " mode. Cache writes were already restricted for all configured caches, but this" + " step is still required in order to be able to unlock cache writes in the future." + " Retry this operation manually, if possible [segmentedNodes=" + - F.viewReadOnly(discoCache.allNodes(), F.node2id()) + "]", e); + F.viewReadOnly(discoCache.allNodes(), ClusterNodeFunc.node2id()) + "]", e); } } }, PUBLIC_POOL); @@ -361,7 +362,7 @@ private class TopologyChangedEventListener implements DiscoveryEventListener, Hi } U.warn(log, "Cluster segmentation was detected. Write to all user caches were blocked" + - " [segmentedNodes=" + F.viewReadOnly(discoCache.allNodes(), F.node2id()) + ']'); + " [segmentedNodes=" + F.viewReadOnly(discoCache.allNodes(), ClusterNodeFunc.node2id()) + ']'); } state = locStateCopy; From ac7abd7111e4ae86e1c06442e65af05f2d2f3251 Mon Sep 17 00:00:00 2001 From: Maksim Davydov Date: Wed, 7 May 2025 00:39:41 +0300 Subject: [PATCH 2/2] IGNITE-25309 #eqNotOrdered() fix --- .../cdc/kafka/KafkaToIgniteMetadataUpdater.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java index fc44c17d3..af79a4c9c 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java @@ -213,11 +213,9 @@ else if (data instanceof TypeMapping) * * @param m1 First map to check. * @param m2 Second map to check - * @param Collection elements key. - * @param Collection elements value. * @return {@code True} is maps are equal, {@code False} otherwise. */ - public static boolean eqNotOrdered(@Nullable Map m1, @Nullable Map m2) { + public static boolean eqNotOrdered(@Nullable Map m1, @Nullable Map m2) { if (m1 == m2) return true; @@ -227,14 +225,11 @@ public static boolean eqNotOrdered(@Nullable Map m1, @Nullable Map< if (m1.size() != m2.size()) return false; - for (Map.Entry e : m1.entrySet()) { - V v1 = e.getValue(); - V v2 = m2.get(e.getKey()); + for (Map.Entry e : m1.entrySet()) { + Long v1 = e.getValue(); + Long v2 = m2.get(e.getKey()); - if (v1 == v2) - continue; - - if (v1 == null || v2 == null) + if (!Objects.equals(v1, v2)) return false; }