diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java index 1e275a7be..b6531911d 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java @@ -125,6 +125,10 @@ protected boolean isUseNew( if (oldEntry.isStartVersion()) // Entry absent (new entry). return true; + // Old entry has expired. + if (oldEntry.value(ctx) == null && newEntry.value(ctx) != null) + return true; + if (oldEntry.dataCenterId() == newEntry.dataCenterId()) { int cmp = newEntry.version().compareTo(oldEntry.version()); @@ -179,6 +183,8 @@ private void debugResolve( Object oldVal = conflictResolveFieldEnabled ? oldEntry.value(ctx) : null; Object newVal = conflictResolveFieldEnabled ? newEntry.value(ctx) : null; + boolean oldExpired = oldEntry.value(ctx) == null && newEntry.value(ctx) != null; + if (oldVal != null) oldVal = debugValue(oldVal); @@ -190,6 +196,7 @@ private void debugResolve( ", oldVer=" + oldEntry.version() + ", newVer=" + newEntry.version() + ", oldExpire=[" + oldEntry.ttl() + "," + oldEntry.expireTime() + ']' + + ", isOldExpired=" + oldExpired + ", newExpire=[" + newEntry.ttl() + "," + newEntry.expireTime() + ']' + ", old=" + oldVal + ", new=" + newVal + diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java index 9a5e18c7f..42617774a 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java @@ -42,6 +42,7 @@ import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl; import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -69,7 +70,11 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.metric.LongMetric; import org.apache.ignite.spi.metric.ObjectMetric; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.config.Configurator; import org.jetbrains.annotations.Nullable; import org.junit.Test; import org.junit.runner.RunWith; @@ -115,6 +120,9 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest { @Parameterized.Parameter(3) public int backups; + /** Listening test logger. */ + private final ListeningTestLogger listeningLog = new ListeningTestLogger(log); + /** @return Test parameters. */ @Parameterized.Parameters(name = "clientType={0}, atomicity={1}, mode={2}, backupCnt={3}") public static Collection parameters() { @@ -213,6 +221,8 @@ private enum WaitDataMode { .setWalForceArchiveTimeout(5_000); cfg.setConsistentId(igniteInstanceName); + + cfg.setGridLogger(listeningLog); } return cfg; @@ -220,6 +230,8 @@ private enum WaitDataMode { /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { + listeningLog.clearListeners(); + cleanPersistenceDir(); cdcs.clear(); @@ -562,6 +574,49 @@ public void testWithExpiryPolicy() throws Exception { } } + /** */ + @Test + public void testConflictResolveWithExpiryPolicy() throws Exception { + LogListener listener = LogListener.matches("Conflict can't be resolved, update ignored").build(); + + listeningLog.registerListener(listener); + + int millis = 10000; + + Factory factory = () -> new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, millis)); + + IgniteCache srcCache = createCache(srcCluster[0], ACTIVE_ACTIVE_CACHE, factory); + + IgniteCache destCache = createCache(destCluster[0], ACTIVE_ACTIVE_CACHE, factory); + + List> futs = startActiveActiveCdc(); + + try { + Configurator.setLevel(CacheVersionConflictResolverImpl.class.getName(), Level.DEBUG); + + log.warning(">>>>>> Put new entries in the source cluster"); + IntStream.range(0, KEYS_CNT).forEach(i -> srcCache.put(i, ConflictResolvableTestData.create())); + + log.warning(">>>>>> Waiting for last entry in the destination cluster"); + assertTrue(waitForCondition(() -> destCache.containsKey(KEYS_CNT - 1), getTestTimeout())); + + doSleep(millis); + + log.warning(">>>>>> Put updated entries in the destination cluster"); + IntStream.range(0, KEYS_CNT).forEach(i -> destCache.put(i, ConflictResolvableTestData.create())); + + doSleep(millis); + + assertFalse("Unresolved conflicts found", listener.check()); + } + finally { + Configurator.setLevel(CacheVersionConflictResolverImpl.class.getName(), Level.INFO); + + for (IgniteInternalFuture fut : futs) + fut.cancel(); + } + } + /** */ public Runnable generateData(String cacheName, IgniteEx ign, IntStream keys) { return () -> { diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java index 86ef675fb..a3cb8b422 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java @@ -24,6 +24,10 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.binary.BinaryObject; @@ -47,6 +51,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; +import org.jetbrains.annotations.Nullable; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -288,18 +293,59 @@ public void testResolveDebug() throws Exception { assertFalse(lsnr.check()); } + /** Test conflict resolution during old entry expiration. */ + @Test + public void testConflictWithExpiryPolicy() throws Exception { + String key = key("UpdateClusterUpdateReorder", otherClusterId); + + LogListener lsnr = LogListener.matches("isOldExpired=true").build(); + + listeningLog.registerListener(lsnr); + + Configurator.setLevel(CacheVersionConflictResolverImpl.class.getName(), Level.DEBUG); + + try { + CreatedExpiryPolicy plc = new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, 3000)); + + put(key, plc); + + doSleep(3000); + + // Put just after expiration. + putConflict(key, 2, true, plc); + + putConflict(key, 3, true, plc); + + put(key, plc); + + assertTrue(lsnr.check()); + } + finally { + Configurator.setLevel(CacheVersionConflictResolverImpl.class.getName(), Level.INFO); + + listeningLog.clearListeners(); + } + } + /** */ private void put(String key) { + put(key, null); + } + + /** */ + private void put(String key, @Nullable ExpiryPolicy plc) { ConflictResolvableTestData newVal = ConflictResolvableTestData.create(); - CacheEntry oldEntry = cache.getEntry(key); + IgniteCache cache0 = plc != null ? cache.withExpiryPolicy(plc) : cache; + + CacheEntry oldEntry = cache0.getEntry(key); - cache.put(key, newVal); + cache0.put(key, newVal); - CacheEntry newEntry = cache.getEntry(key); + CacheEntry newEntry = cache0.getEntry(key); assertNull(((CacheEntryVersion)newEntry.version()).otherClusterVersion()); - assertEquals(newVal, cache.get(key)); + assertEquals(newVal, cache0.get(key)); if (oldEntry != null) assertTrue(((CacheEntryVersion)oldEntry.version()).order() < ((CacheEntryVersion)newEntry.version()).order()); @@ -310,23 +356,39 @@ private void putConflict(String k, long order, boolean success) throws IgniteChe putConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), success); } + /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */ + private void putConflict(String k, long order, boolean success, @Nullable ExpiryPolicy plc) throws IgniteCheckedException { + putConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), success, plc); + } + /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */ private void putConflict(String k, GridCacheVersion newVer, boolean success) throws IgniteCheckedException { - CacheEntry oldEntry = cache.getEntry(k); + putConflict(k, newVer, success, null); + } + + /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */ + private void putConflict(String k, GridCacheVersion newVer, boolean success, @Nullable ExpiryPolicy plc) throws IgniteCheckedException { + IgniteCache cache0 = plc != null ? cache.withExpiryPolicy(plc) : cache; + IgniteInternalCache cachex0 = plc != null ? cachex.withExpiryPolicy(plc) : cachex; + + CacheEntry oldEntry = cache0.getEntry(k); ConflictResolvableTestData newVal = ConflictResolvableTestData.create(); - KeyCacheObject key = new KeyCacheObjectImpl(k, null, cachex.context().affinity().partition(k)); + KeyCacheObject key = new KeyCacheObjectImpl(k, null, cachex0.context().affinity().partition(k)); CacheObject val = new CacheObjectImpl(client.binary().toBinary(newVal), null); - cachex.putAllConflict(singletonMap(key, new GridCacheDrInfo(val, newVer))); + cachex0.putAllConflict(singletonMap(key, new GridCacheDrInfo(val, newVer))); if (success) { - assertEquals(newVer, ((GridCacheVersion)cache.getEntry(k).version()).conflictVersion()); - assertEquals(newVal, cache.get(k)); + CacheEntry newEntry = cache0.getEntry(k); + + assertNotNull(newEntry); + assertEquals(newVer, ((GridCacheVersion)newEntry.version()).conflictVersion()); + assertEquals(newVal, cache0.get(k)); } else if (oldEntry != null) { - assertEquals(oldEntry.getValue(), cache.get(k)); - assertEquals(oldEntry.version(), cache.getEntry(k).version()); + assertEquals(oldEntry.getValue(), cache0.get(k)); + assertEquals(oldEntry.version(), cache0.getEntry(k).version()); } } diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java index 24bd8489d..426523242 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithCustomResolverTest.java @@ -58,6 +58,13 @@ public class CacheConflictOperationsWithCustomResolverTest extends CacheConflict GridTestUtils.assertThrows(log, super::testResolveDebug, AssertionError.class, ""); } + /** {@inheritDoc} */ + @Test + @Override public void testConflictWithExpiryPolicy() throws Exception { + // LWW strategy resolves conflicts in unexpected way at versioned resolve test. + GridTestUtils.assertThrows(log, super::testConflictWithExpiryPolicy, AssertionError.class, null); + } + /** * */