Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.ignite.cdc.conflictresolve;

import java.util.Objects;
import org.apache.ignite.IgniteCommonsSystemProperties;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
Expand Down Expand Up @@ -113,7 +115,6 @@ public CacheVersionConflictResolverImpl(
* @param <V> Key type.
* @return {@code True} is should use new entry.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
protected <K, V> boolean isUseNew(
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K, V> oldEntry,
Expand Down Expand Up @@ -147,28 +148,57 @@ protected <K, V> boolean isUseNew(
return value(oldVal).compareTo(value(newVal)) < 0;
}
catch (Exception e) {
log.error(
"Error while resolving replication conflict. [field=" + conflictResolveField + ", key=" + newEntry.key() + ']',
e
);
log.error("Error during field-based conflicts resolving " +
"[key=" + safeKeyToString(newEntry) +
", field=" + conflictResolveField + ']',
e);
}
}
else {
log.warning(
"Field-based conflicts resolving is enabled, but at least one of entries has null value " +
"[key=" + safeKeyToString(newEntry) +
", oldValIsNull=" + (oldVal == null) +
", newValIsNull=" + (newVal == null) + ']');
}
}
else
log.warning("Field-based conflicts resolving is not enabled: key=" + safeKeyToString(newEntry));

log.error("Conflict can't be resolved, " + (newEntry.value(ctx) == null ? "remove" : "update") + " ignored " +
"[key=" + newEntry.key() + ", fromCluster=" + newEntry.dataCenterId() + ", toCluster=" + oldEntry.dataCenterId() + ']');
"[key=" + safeKeyToString(newEntry) +
", fromCluster=" + newEntry.dataCenterId() +
", toCluster=" + oldEntry.dataCenterId() + ']');

// Ignoring update.
return false;
}

/** @return Conflict resolve field value. */
protected Comparable value(Object val) {
protected <T> Comparable<T> value(Object val) {
return (val instanceof BinaryObject)
? ((BinaryObject)val).field(conflictResolveField)
: U.field(val, conflictResolveField);
}

/** @return Sensitive-safe string representation of an entry key. */
private static <K, V> String safeKeyToString(GridCacheVersionedEntryEx<K, V> entry) {
return safeToString(entry.key());
}

/**
* @param obj Object.
*
* @return Sensitive-safe string representation of an object.
* @see IgniteCommonsSystemProperties#IGNITE_TO_STRING_INCLUDE_SENSITIVE
*/
private static String safeToString(Object obj) {
if (obj instanceof BinaryObject)
return Objects.toString(obj);

return S.includeSensitive() ? Objects.toString(obj) : "[sensitiveDataHash=" + Objects.hashCode(obj) + ']';
}

/** */
private <K, V> void debugResolve(
CacheObjectValueContext ctx,
Expand All @@ -179,32 +209,39 @@ private <K, V> void debugResolve(
Object oldVal = conflictResolveFieldEnabled ? oldEntry.value(ctx) : null;
Object newVal = conflictResolveFieldEnabled ? newEntry.value(ctx) : null;

String keyStr = safeKeyToString(newEntry);

if (oldVal != null)
oldVal = debugValue(oldVal);
oldVal = debugValue(keyStr, oldVal);

if (newVal != null)
newVal = debugValue(newVal);
newVal = debugValue(keyStr, newVal);

log.debug("isUseNew[" +
"start=" + oldEntry.isStartVersion() +
log.debug("isUseNew [" +
"key=" + keyStr +
", start=" + oldEntry.isStartVersion() +
", oldVer=" + oldEntry.version() +
", newVer=" + newEntry.version() +
", oldExpire=[" + oldEntry.ttl() + "," + oldEntry.expireTime() + ']' +
", newExpire=[" + newEntry.ttl() + "," + newEntry.expireTime() + ']' +
", old=" + oldVal +
", new=" + newVal +
", oldFieldVal=" + oldVal +
", newFieldVal=" + newVal +
", res=" + useNew + ']');
}

/** @return Conflict resolve field value, or specified {@code val} if the field not found. */
private Object debugValue(Object val) {
private Object debugValue(String keyStr, Object val) {
try {
return value(val);
return safeToString(value(val));
}
catch (Exception e) {
log.debug("Can't resolve field value [field=" + conflictResolveField + ", val=" + val + ']');
log.error("Can't resolve field value " +
"[key=" + keyStr +
", field=" + conflictResolveField +
", val=" + safeToString(val) + ']',
e);

return val;
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntry;
Expand All @@ -44,9 +46,11 @@
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -134,6 +138,8 @@ public static Collection<?> parameters() {

cachex = client.cachex(DEFAULT_CACHE_NAME);
}

listeningLog.clearListeners();
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -259,33 +265,120 @@ public void testUpdatesConflict() throws Exception {
putConflict(key, 5, conflictResolveField() != null);
}

/** Test switching debug log level for ConflictResolver during runtime */
/** Test switching debug log level for ConflictResolver during runtime. */
@Test
@WithSystemProperty(key = IgniteSystemProperties.IGNITE_TO_STRING_INCLUDE_SENSITIVE, value = "true")
public void testResolveDebug() throws Exception {
testResolveDebug(true);
}

/**
* Test switching debug log level for ConflictResolver during runtime.
* Sensitive data should be hidden.
*/
@Test
@WithSystemProperty(key = IgniteSystemProperties.IGNITE_TO_STRING_INCLUDE_SENSITIVE, value = "false")
public void testResolveDebugExcludeSensitive() throws Exception {
testResolveDebug(false);
}

/** */
private void testResolveDebug(boolean includeSensitive) throws IgniteCheckedException {
String key = key("UpdateClusterUpdateReorder", otherClusterId);

LogListener lsnr = LogListener.matches("isUseNew").build();
String expKeyStr = includeSensitive ? key : "[sensitiveDataHash=" + key.hashCode() + "]";

LogListener lsnr = LogListener.matches("isUseNew [key=" + expKeyStr).build();
listeningLog.registerListener(lsnr);

LogListener resolveFieldLsnr = LogListener.matches(newValueString(includeSensitive)).build();
listeningLog.registerListener(resolveFieldLsnr);

Configurator.setLevel(CacheVersionConflictResolverImpl.class.getName(), Level.DEBUG);

try {
putConflict(key, 1, true);
assertFalse(lsnr.check());
assertFalse(resolveFieldLsnr.check());

putConflict(key, 1, false);
put(key);

assertTrue(lsnr.check());
assertTrue(resolveFieldLsnr.check());
}
finally {
Configurator.setLevel(CacheVersionConflictResolverImpl.class.getName(), Level.INFO);
}

lsnr.reset();
resolveFieldLsnr.reset();

putConflict(key, 1, false);
put(key);

assertFalse(lsnr.check());
assertFalse(resolveFieldLsnr.check());
}

/** Gets expected conflict resolvable field output in log. */
private String newValueString(boolean includeSensitive) {
String newValExpStr = null;

if (conflictResolveField() != null) {
// Incremented in ConflictResolvableTestData#create during put.
long expReqId = ConflictResolvableTestData.REQUEST_ID.get() + 1;

newValExpStr = includeSensitive ? String.valueOf(expReqId) : "[sensitiveDataHash=" +
Objects.hashCode(expReqId);
}

return "newFieldVal=" + newValExpStr;
}

/** Test log of resolving error. */
@Test
@WithSystemProperty(key = IgniteSystemProperties.IGNITE_TO_STRING_INCLUDE_SENSITIVE, value = "true")
public void testResolveError() throws Exception {
testResolveError("TestErrorLoggingINCLUDE", true);
}

/** Test log of resolving error with hidden sensitive data. */
@Test
@WithSystemProperty(key = IgniteSystemProperties.IGNITE_TO_STRING_INCLUDE_SENSITIVE, value = "false")
public void testResolveErrorExcludeSensitive() throws Exception {
testResolveError("TestErrorLoggingEXCLUDE", false);
}

/** */
private void testResolveError(String keyVal, boolean includeSensitive) throws IgniteCheckedException {
Assume.assumeTrue("Should not run with enabled field", conflictResolveField() == null);

String key = key(keyVal, otherClusterId);

String expKeyStr = includeSensitive ? key : "[sensitiveDataHash=" + key.hashCode() + "]";

LogListener warnLsnr = LogListener.matches("Field-based conflicts resolving is not enabled: key=" +
expKeyStr).build();

LogListener errLsnr = LogListener.matches("Conflict can't be resolved, update ignored " +
"[key=" + expKeyStr + ", fromCluster=" + otherClusterId + ", toCluster=" + SECOND_CLUSTER_ID + "]").build();

listeningLog.registerListener(warnLsnr);
listeningLog.registerListener(errLsnr);

Configurator.setLevel(CacheVersionConflictResolverImpl.class.getName(), Level.DEBUG);

try {
put(key);
assertFalse(warnLsnr.check());
assertFalse(errLsnr.check());

putConflict(key, 1, false);

assertTrue(warnLsnr.check());
assertTrue(errLsnr.check());
}
finally {
Configurator.setLevel(CacheVersionConflictResolverImpl.class.getName(), Level.INFO);
}
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Ignore;
import org.junit.Test;

/** Cache conflict operations test with a custom resolver. */
Expand Down Expand Up @@ -53,9 +54,30 @@ public class CacheConflictOperationsWithCustomResolverTest extends CacheConflict

/** {@inheritDoc} */
@Test
@Ignore("LwwConflictResolver does not have logging.")
@Override public void testResolveDebug() throws Exception {
// LWW strategy resolves conflicts in unexpected way at versioned resolve test.
GridTestUtils.assertThrows(log, super::testResolveDebug, AssertionError.class, "");
// No-op.
}

/** {@inheritDoc} */
@Test
@Ignore("LwwConflictResolver does not have logging.")
@Override public void testResolveDebugExcludeSensitive() throws Exception {
// No-op.
}

/** {@inheritDoc} */
@Test
@Ignore("LwwConflictResolver does not have logging.")
@Override public void testResolveError() throws Exception {
// No-op.
}

/** {@inheritDoc} */
@Test
@Ignore("LwwConflictResolver does not have logging.")
@Override public void testResolveErrorExcludeSensitive() throws Exception {
// No-op.
}

/**
Expand Down