Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.S;

import static org.apache.ignite.spi.discovery.tcp.ipfinder.s3.encrypt.SymmetricKeyEncryptionService.createCipher;

/**
* Provides an implementation of asymmetric encryption to encrypt/decrypt the data.
*/
Expand Down Expand Up @@ -66,8 +67,8 @@ public void setKeyPair(KeyPair keyPair) {
if (publicKey == null)
throw new IgniteException("Public key was not set / was set to null.");

encCipher = IgniteUtils.createCipher(privateKey, Cipher.ENCRYPT_MODE);
decCipher = IgniteUtils.createCipher(publicKey, Cipher.DECRYPT_MODE);
encCipher = createCipher(privateKey, Cipher.ENCRYPT_MODE);
decCipher = createCipher(publicKey, Cipher.DECRYPT_MODE);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.ignite.spi.discovery.tcp.ipfinder.s3.encrypt;

import java.security.InvalidKeyException;
import java.security.Key;
import java.security.NoSuchAlgorithmException;
import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.S;

/**
Expand Down Expand Up @@ -55,8 +57,8 @@ public SymmetricKeyEncryptionService setSecretKey(Key secretKey) {
if (secretKey == null)
throw new IgniteException("Secret key was not set / was set to null.");

encCipher = IgniteUtils.createCipher(secretKey, Cipher.ENCRYPT_MODE);
decCipher = IgniteUtils.createCipher(secretKey, Cipher.DECRYPT_MODE);
encCipher = createCipher(secretKey, Cipher.ENCRYPT_MODE);
decCipher = createCipher(secretKey, Cipher.DECRYPT_MODE);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -95,4 +97,24 @@ public SymmetricKeyEncryptionService setSecretKey(Key secretKey) {
@Override public String toString() {
return S.toString(SymmetricKeyEncryptionService.class, this, "super", super.toString());
}

/**
* @param key Cipher Key.
* @param encMode Enc mode see {@link Cipher#ENCRYPT_MODE}, {@link Cipher#DECRYPT_MODE}, etc.
*/
public static Cipher createCipher(Key key, int encMode) {
if (key == null)
throw new IgniteException("Cipher Key cannot be null");

try {
Cipher cipher = Cipher.getInstance(key.getAlgorithm());

cipher.init(encMode, key);

return cipher;
}
catch (NoSuchAlgorithmException | NoSuchPaddingException | InvalidKeyException e) {
throw new IgniteException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.util.lang.GridMapEntry;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.stream.StreamMultipleTupleExtractor;
import org.apache.ignite.stream.StreamSingleTupleExtractor;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
Expand Down Expand Up @@ -376,11 +374,7 @@ private static StreamMultipleTupleExtractor<Exchange, Integer, String> multipleT

final Map<Integer, String> answer = new HashMap<>();

F.forEach(map.keySet(), new IgniteInClosure<String>() {
@Override public void apply(String s) {
answer.put(Integer.parseInt(s), map.get(s));
}
});
map.keySet().forEach(s -> answer.put(Integer.parseInt(s), map.get(s)));

return answer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.ignite.cdc.kafka;

import java.time.Duration;
import java.util.Collection;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -31,6 +33,7 @@
import org.apache.ignite.cdc.TypeMapping;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.binary.BinaryUtils;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import

import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
Expand All @@ -43,6 +46,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.VoidDeserializer;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.registerBinaryMeta;
import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.registerMapping;
Expand Down Expand Up @@ -137,7 +141,7 @@ public synchronized void updateMetadata() {
// (stored in 'offsets' field). If there are no offsets changes, polling cycle is skipped.
Map<TopicPartition, Long> offsets0 = cnsmr.endOffsets(parts, Duration.ofMillis(kafkaReqTimeout));

if (!F.isEmpty(offsets0) && F.eqNotOrdered(offsets, offsets0)) {
if (!F.isEmpty(offsets0) && eqNotOrdered(offsets, offsets0)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be replaced with offsets0.equals(offsets)?

if (log.isDebugEnabled())
log.debug("Offsets unchanged, poll skipped");

Expand Down Expand Up @@ -202,4 +206,33 @@ else if (data instanceof TypeMapping)
@Override public String toString() {
return S.toString(KafkaToIgniteMetadataUpdater.class, this);
}

/**
* Compares two maps. Unlike {@code java.util.AbstractMap#equals(...)} method this implementation
* checks not only entry sets, but also the keys. Some optimization checks are also used.
*
* @param m1 First map to check.
* @param m2 Second map to check
* @return {@code True} is maps are equal, {@code False} otherwise.
*/
public static boolean eqNotOrdered(@Nullable Map<TopicPartition, Long> m1, @Nullable Map<TopicPartition, Long> m2) {
if (m1 == m2)
return true;

if (m1 == null || m2 == null)
return false;

if (m1.size() != m2.size())
return false;

for (Map.Entry<TopicPartition, Long> e : m1.entrySet()) {
Long v1 = e.getValue();
Long v2 = m2.get(e.getKey());

if (!Objects.equals(v1, v2))
return false;
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
Expand All @@ -37,6 +36,7 @@
import org.hibernate.boot.MetadataSources;
import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
import org.junit.Test;

import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
Expand All @@ -60,8 +60,6 @@ public class HibernateL2CacheMultiJvmTest extends GridCommonAbstractTest {
cacheConfiguration(Entity3.class.getName())
);

cfg.setMarshaller(new BinaryMarshaller());

cfg.setPeerClassLoadingEnabled(false);

return cfg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Splitter;
Expand All @@ -42,9 +43,7 @@
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.util.lang.GridMapEntry;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.stream.StreamMultipleTupleExtractor;
import org.apache.ignite.stream.StreamSingleTupleExtractor;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
Expand Down Expand Up @@ -534,18 +533,12 @@ private void sendMessages(final List<String> topics, int fromIdx, int cnt, boole
final List<StringBuilder> sbs = new ArrayList<>(topics.size());

// initialize String Builders for each topic
F.forEach(topics, new IgniteInClosure<String>() {
@Override public void apply(String s) {
sbs.add(new StringBuilder());
}
});
topics.forEach(t -> sbs.add(new StringBuilder()));

// fill String Builders for each topic
F.forEach(F.range(fromIdx, fromIdx + cnt), new IgniteInClosure<Integer>() {
@Override public void apply(Integer integer) {
sbs.get(integer % topics.size()).append(integer.toString() + "," + TEST_DATA.get(integer) + "\n");
}
});
IntStream.range(fromIdx, fromIdx + cnt)
.forEach(integer ->
sbs.get(integer % topics.size()).append(integer).append(",").append(TEST_DATA.get(integer)).append("\n"));

// send each buffer out
for (int i = 0; i < topics.size(); i++) {
Expand Down Expand Up @@ -633,11 +626,7 @@ public static StreamMultipleTupleExtractor<MqttMessage, Integer, String> multipl

final Map<Integer, String> answer = new HashMap<>();

F.forEach(map.keySet(), new IgniteInClosure<String>() {
@Override public void apply(String s) {
answer.put(Integer.parseInt(s), map.get(s));
}
});
map.keySet().forEach(s -> answer.put(Integer.parseInt(s), map.get(s)));

return answer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -128,8 +129,10 @@ public void testStoreDataToIgnite() throws Exception {
try {
ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false);

List<Integer> keyList = IntStream.range(0, KEYS_CNT).boxed().collect(Collectors.toList());

ic.fromCache(PARTITIONED_CACHE_NAME)
.savePairs(sc.parallelize(F.range(0, KEYS_CNT), GRID_CNT).mapToPair(TO_PAIR_F), true, false);
.savePairs(sc.parallelize(keyList, GRID_CNT).mapToPair(TO_PAIR_F), true, false);

Ignite ignite = ic.ignite();

Expand Down Expand Up @@ -200,7 +203,10 @@ public void testQueryObjectsFromIgnite() throws Exception {
JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);

int cnt = 1001;
cache.savePairs(sc.parallelize(F.range(0, cnt), GRID_CNT).mapToPair(INT_TO_ENTITY_F), true, false);

List<Integer> cntList = IntStream.range(0, cnt).boxed().collect(Collectors.toList());

cache.savePairs(sc.parallelize(cntList, GRID_CNT).mapToPair(INT_TO_ENTITY_F), true, false);

List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000)
.map(STR_ENTITY_PAIR_TO_ENTITY_F).collect();
Expand Down Expand Up @@ -238,7 +244,9 @@ public void testQueryFieldsFromIgnite() throws Exception {

JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);

cache.savePairs(sc.parallelize(F.range(0, 1001), GRID_CNT).mapToPair(INT_TO_ENTITY_F), true, false);
List<Integer> cntList = IntStream.range(0, 1001).boxed().collect(Collectors.toList());

cache.savePairs(sc.parallelize(cntList, GRID_CNT).mapToPair(INT_TO_ENTITY_F), true, false);

Dataset<Row> df =
cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000);
Expand Down
Loading