Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ protected <K, V> boolean isUseNew(
if (oldEntry.isStartVersion()) // Entry absent (new entry).
return true;

// Old entry has expired.
if (oldEntry.value(ctx) == null && newEntry.value(ctx) != null)
return true;

if (oldEntry.dataCenterId() == newEntry.dataCenterId()) {
int cmp = newEntry.version().compareTo(oldEntry.version());

Expand Down Expand Up @@ -179,6 +183,8 @@ private <K, V> void debugResolve(
Object oldVal = conflictResolveFieldEnabled ? oldEntry.value(ctx) : null;
Object newVal = conflictResolveFieldEnabled ? newEntry.value(ctx) : null;

boolean oldExpired = oldEntry.value(ctx) == null && newEntry.value(ctx) != null;

if (oldVal != null)
oldVal = debugValue(oldVal);

Expand All @@ -190,6 +196,7 @@ private <K, V> void debugResolve(
", oldVer=" + oldEntry.version() +
", newVer=" + newEntry.version() +
", oldExpire=[" + oldEntry.ttl() + "," + oldEntry.expireTime() + ']' +
", isOldExpired=" + oldExpired +
", newExpire=[" + newEntry.ttl() + "," + newEntry.expireTime() + ']' +
", old=" + oldVal +
", new=" + newVal +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
Expand Down Expand Up @@ -69,7 +70,11 @@
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.spi.metric.ObjectMetric;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -115,6 +120,9 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
@Parameterized.Parameter(3)
public int backups;

/** Listening test logger. */
private final ListeningTestLogger listeningLog = new ListeningTestLogger(log);

/** @return Test parameters. */
@Parameterized.Parameters(name = "clientType={0}, atomicity={1}, mode={2}, backupCnt={3}")
public static Collection<?> parameters() {
Expand Down Expand Up @@ -213,13 +221,17 @@ private enum WaitDataMode {
.setWalForceArchiveTimeout(5_000);

cfg.setConsistentId(igniteInstanceName);

cfg.setGridLogger(listeningLog);
}

return cfg;
}

/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
listeningLog.clearListeners();

cleanPersistenceDir();

cdcs.clear();
Expand Down Expand Up @@ -562,6 +574,49 @@ public void testWithExpiryPolicy() throws Exception {
}
}

/** */
@Test
public void testConflictResolveWithExpiryPolicy() throws Exception {
LogListener listener = LogListener.matches("Conflict can't be resolved, update ignored").build();

listeningLog.registerListener(listener);

int millis = 10000;

Factory<? extends ExpiryPolicy> factory = () -> new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, millis));

IgniteCache<Integer, ConflictResolvableTestData> srcCache = createCache(srcCluster[0], ACTIVE_ACTIVE_CACHE, factory);

IgniteCache<Integer, ConflictResolvableTestData> destCache = createCache(destCluster[0], ACTIVE_ACTIVE_CACHE, factory);

List<IgniteInternalFuture<?>> futs = startActiveActiveCdc();

try {
Configurator.setLevel(CacheVersionConflictResolverImpl.class.getName(), Level.DEBUG);

log.warning(">>>>>> Put new entries in the source cluster");
IntStream.range(0, KEYS_CNT).forEach(i -> srcCache.put(i, ConflictResolvableTestData.create()));

log.warning(">>>>>> Waiting for last entry in the destination cluster");
assertTrue(waitForCondition(() -> destCache.containsKey(KEYS_CNT - 1), getTestTimeout()));

doSleep(millis);

log.warning(">>>>>> Put updated entries in the destination cluster");
IntStream.range(0, KEYS_CNT).forEach(i -> destCache.put(i, ConflictResolvableTestData.create()));

doSleep(millis);

assertFalse("Unresolved conflicts found", listener.check());
}
finally {
Configurator.setLevel(CacheVersionConflictResolverImpl.class.getName(), Level.INFO);

for (IgniteInternalFuture<?> fut : futs)
fut.cancel();
}
}

/** */
public Runnable generateData(String cacheName, IgniteEx ign, IntStream keys) {
return () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryObject;
Expand All @@ -47,6 +51,7 @@
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -288,18 +293,59 @@ public void testResolveDebug() throws Exception {
assertFalse(lsnr.check());
}

/** Test conflict resolution during old entry expiration. */
@Test
public void testConflictWithExpiryPolicy() throws Exception {
String key = key("UpdateClusterUpdateReorder", otherClusterId);

LogListener lsnr = LogListener.matches("isOldExpired=true").build();

listeningLog.registerListener(lsnr);

Configurator.setLevel(CacheVersionConflictResolverImpl.class.getName(), Level.DEBUG);

try {
CreatedExpiryPolicy plc = new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, 3000));

put(key, plc);

doSleep(3000);

// Put just after expiration.
putConflict(key, 2, true, plc);

putConflict(key, 3, true, plc);

put(key, plc);

assertTrue(lsnr.check());
}
finally {
Configurator.setLevel(CacheVersionConflictResolverImpl.class.getName(), Level.INFO);

listeningLog.clearListeners();
}
}

/** */
private void put(String key) {
put(key, null);
}

/** */
private void put(String key, @Nullable ExpiryPolicy plc) {
ConflictResolvableTestData newVal = ConflictResolvableTestData.create();

CacheEntry<String, ConflictResolvableTestData> oldEntry = cache.getEntry(key);
IgniteCache<String, ConflictResolvableTestData> cache0 = plc != null ? cache.withExpiryPolicy(plc) : cache;

CacheEntry<String, ConflictResolvableTestData> oldEntry = cache0.getEntry(key);

cache.put(key, newVal);
cache0.put(key, newVal);

CacheEntry<String, ConflictResolvableTestData> newEntry = cache.getEntry(key);
CacheEntry<String, ConflictResolvableTestData> newEntry = cache0.getEntry(key);

assertNull(((CacheEntryVersion)newEntry.version()).otherClusterVersion());
assertEquals(newVal, cache.get(key));
assertEquals(newVal, cache0.get(key));

if (oldEntry != null)
assertTrue(((CacheEntryVersion)oldEntry.version()).order() < ((CacheEntryVersion)newEntry.version()).order());
Expand All @@ -310,23 +356,39 @@ private void putConflict(String k, long order, boolean success) throws IgniteChe
putConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), success);
}

/** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */
private void putConflict(String k, long order, boolean success, @Nullable ExpiryPolicy plc) throws IgniteCheckedException {
putConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), success, plc);
}

/** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */
private void putConflict(String k, GridCacheVersion newVer, boolean success) throws IgniteCheckedException {
CacheEntry<String, ConflictResolvableTestData> oldEntry = cache.getEntry(k);
putConflict(k, newVer, success, null);
}

/** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */
private void putConflict(String k, GridCacheVersion newVer, boolean success, @Nullable ExpiryPolicy plc) throws IgniteCheckedException {
IgniteCache<String, ConflictResolvableTestData> cache0 = plc != null ? cache.withExpiryPolicy(plc) : cache;
IgniteInternalCache<BinaryObject, BinaryObject> cachex0 = plc != null ? cachex.withExpiryPolicy(plc) : cachex;

CacheEntry<String, ConflictResolvableTestData> oldEntry = cache0.getEntry(k);
ConflictResolvableTestData newVal = ConflictResolvableTestData.create();

KeyCacheObject key = new KeyCacheObjectImpl(k, null, cachex.context().affinity().partition(k));
KeyCacheObject key = new KeyCacheObjectImpl(k, null, cachex0.context().affinity().partition(k));
CacheObject val = new CacheObjectImpl(client.binary().toBinary(newVal), null);

cachex.putAllConflict(singletonMap(key, new GridCacheDrInfo(val, newVer)));
cachex0.putAllConflict(singletonMap(key, new GridCacheDrInfo(val, newVer)));

if (success) {
assertEquals(newVer, ((GridCacheVersion)cache.getEntry(k).version()).conflictVersion());
assertEquals(newVal, cache.get(k));
CacheEntry<String, ConflictResolvableTestData> newEntry = cache0.getEntry(k);

assertNotNull(newEntry);
assertEquals(newVer, ((GridCacheVersion)newEntry.version()).conflictVersion());
assertEquals(newVal, cache0.get(k));
}
else if (oldEntry != null) {
assertEquals(oldEntry.getValue(), cache.get(k));
assertEquals(oldEntry.version(), cache.getEntry(k).version());
assertEquals(oldEntry.getValue(), cache0.get(k));
assertEquals(oldEntry.version(), cache0.getEntry(k).version());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ public class CacheConflictOperationsWithCustomResolverTest extends CacheConflict
GridTestUtils.assertThrows(log, super::testResolveDebug, AssertionError.class, "");
}

/** {@inheritDoc} */
@Test
@Override public void testConflictWithExpiryPolicy() throws Exception {
// LWW strategy resolves conflicts in unexpected way at versioned resolve test.
GridTestUtils.assertThrows(log, super::testConflictWithExpiryPolicy, AssertionError.class, null);
}

/**
*
*/
Expand Down