Skip to content

Commit 91f60e7

Browse files
WIP
1 parent fe451d5 commit 91f60e7

File tree

5 files changed

+529
-320
lines changed

5 files changed

+529
-320
lines changed

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@
2323
import org.apache.ignite.binary.BinaryObject;
2424
import org.apache.ignite.internal.IgniteEx;
2525
import org.apache.ignite.internal.processors.cache.CacheObject;
26-
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
2726
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
2827
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
29-
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
3028
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
3129
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
3230
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -84,28 +82,16 @@ public CdcEventsIgniteApplier(IgniteEx ignite, int maxBatchSize, IgniteLogger lo
8482

8583
/** {@inheritDoc} */
8684
@Override protected KeyCacheObject toKey(CdcEvent evt) {
87-
Object key = evt.key();
88-
89-
if (key instanceof KeyCacheObject)
90-
return (KeyCacheObject)key;
91-
else
92-
return new KeyCacheObjectImpl(key, null, evt.partition());
85+
return evt.keyCacheObject();
9386
}
9487

9588
/** {@inheritDoc} */
9689
@Override protected GridCacheDrInfo toValue(int cacheId, CdcEvent evt, GridCacheVersion ver) {
97-
CacheObject cacheObj;
98-
99-
Object val = evt.value();
100-
101-
if (val instanceof CacheObject)
102-
cacheObj = (CacheObject)val;
103-
else
104-
cacheObj = new CacheObjectImpl(val, null);
90+
CacheObject val = evt.valueCacheObject();
10591

10692
return evt.expireTime() != EXPIRE_TIME_ETERNAL ?
107-
new GridCacheDrExpirationInfo(cacheObj, ver, TTL_ETERNAL, evt.expireTime()) :
108-
new GridCacheDrInfo(cacheObj, ver);
93+
new GridCacheDrExpirationInfo(val, ver, TTL_ETERNAL, evt.expireTime()) :
94+
new GridCacheDrInfo(val, ver);
10995
}
11096

11197
/** @return Cache. */

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,16 @@
1717

1818
package org.apache.ignite.cdc.conflictresolve;
1919

20+
import java.util.Objects;
2021
import org.apache.ignite.IgniteLogger;
2122
import org.apache.ignite.binary.BinaryObject;
23+
import org.apache.ignite.internal.processors.cache.CacheObject;
24+
import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
2225
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
26+
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
27+
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
2328
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
29+
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
2430
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
2531
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
2632
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -88,11 +94,12 @@ public CacheVersionConflictResolverImpl(
8894
CacheObjectValueContext ctx,
8995
GridCacheVersionedEntryEx<K, V> oldEntry,
9096
GridCacheVersionedEntryEx<K, V> newEntry,
97+
Object prevStateMeta,
9198
boolean atomicVerComparator
9299
) {
93100
GridCacheVersionConflictContext<K, V> res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
94101

95-
boolean useNew = isUseNew(ctx, oldEntry, newEntry);
102+
boolean useNew = isUseNew(ctx, oldEntry, newEntry, prevStateMeta);
96103

97104
if (log.isDebugEnabled())
98105
debugResolve(ctx, useNew, oldEntry, newEntry);
@@ -117,7 +124,8 @@ public CacheVersionConflictResolverImpl(
117124
protected <K, V> boolean isUseNew(
118125
CacheObjectValueContext ctx,
119126
GridCacheVersionedEntryEx<K, V> oldEntry,
120-
GridCacheVersionedEntryEx<K, V> newEntry
127+
GridCacheVersionedEntryEx<K, V> newEntry,
128+
Object prevStateMeta
121129
) {
122130
if (newEntry.dataCenterId() == clusterId) // Update made on the local cluster always win.
123131
return true;
@@ -139,8 +147,8 @@ protected <K, V> boolean isUseNew(
139147
}
140148

141149
if (conflictResolveFieldEnabled) {
142-
Object oldVal = oldEntry.value(ctx);
143150
Object newVal = newEntry.value(ctx);
151+
Object oldVal = oldEntry.value(ctx);
144152

145153
if (oldVal != null && newVal != null) {
146154
try {
@@ -153,6 +161,17 @@ protected <K, V> boolean isUseNew(
153161
);
154162
}
155163
}
164+
165+
Object field = oldVal != null ? value(oldVal) : null;
166+
167+
if (Objects.equals(field, prevStateMeta)) // Previous value synchronized.
168+
return true;
169+
}
170+
else {
171+
GridCacheVersion oldVer = oldEntry.value(ctx) != null ? oldEntry.version() : null; // TODO null value version (entry vs row)
172+
173+
if (Objects.equals(oldVer, prevStateMeta)) // Previous value synchronized.
174+
return true;
156175
}
157176

158177
log.error("Conflict can't be resolved, " + (newEntry.value(ctx) == null ? "remove" : "update") + " ignored " +
@@ -162,6 +181,30 @@ protected <K, V> boolean isUseNew(
162181
return false;
163182
}
164183

184+
/**
185+
* {@inheritDoc}
186+
*/
187+
@Override public Object previousStateMetadata(GridCacheEntryEx entry) {
188+
if (conflictResolveFieldEnabled) {
189+
CacheObjectValueContext ctx = entry.context().cacheObjectContext();
190+
CacheObject val = entry.rawGet();
191+
192+
return val != null ?
193+
value(CacheObjectUtils.unwrapBinaryIfNeeded(ctx, val, true, true, null)) :
194+
null;
195+
}
196+
else {
197+
try {
198+
GridCacheVersion ver = entry.version();
199+
200+
return ver != null ? ver.conflictVersion() : null;
201+
}
202+
catch (GridCacheEntryRemovedException e) { // TODO
203+
throw new RuntimeException(e);
204+
}
205+
}
206+
}
207+
165208
/** @return Conflict resolve field value. */
166209
protected Comparable value(Object val) {
167210
return (val instanceof BinaryObject)

0 commit comments

Comments
 (0)