diff --git a/build.gradle b/build.gradle index a75d093664fe6..4253e13f2c53e 100644 --- a/build.gradle +++ b/build.gradle @@ -173,8 +173,8 @@ task verifyVersions { * the enabled state of every bwc task. It should be set back to true * after the backport of the backcompat code is complete. */ -final boolean bwc_tests_enabled = true -final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */ +final boolean bwc_tests_enabled = false +final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/issues/32006" // BWC handled at a later time if (bwc_tests_enabled == false) { if (bwc_tests_disabled_issue.isEmpty()) { throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false") diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index 9a02b76b3e038..01c24acccdd81 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; @@ -1024,8 +1025,10 @@ private enum ElasticsearchExceptionHandle { UNKNOWN_NAMED_OBJECT_EXCEPTION(org.elasticsearch.common.xcontent.UnknownNamedObjectException.class, org.elasticsearch.common.xcontent.UnknownNamedObjectException::new, 148, Version.V_5_2_0), TOO_MANY_BUCKETS_EXCEPTION(MultiBucketConsumerService.TooManyBucketsException.class, - MultiBucketConsumerService.TooManyBucketsException::new, 149, - Version.V_7_0_0_alpha1); + MultiBucketConsumerService.TooManyBucketsException::new, 149, Version.V_7_0_0_alpha1), + COORDINATION_STATE_REJECTED_EXCEPTION(CoordinationStateRejectedException.class, + CoordinationStateRejectedException::new, 150, Version.V_7_0_0_alpha1); + final Class exceptionClass; final CheckedFunction constructor; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java index b7ef075a59afa..4defd09485e6f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -76,8 +76,11 @@ protected void masterOperation(final ClusterStateRequest request, final ClusterS ClusterState currentState = clusterService.state(); logger.trace("Serving cluster state request using version {}", currentState.version()); ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName()); + builder.term(currentState.term()); builder.version(currentState.version()); builder.stateUUID(currentState.stateUUID()); + builder.lastCommittedConfiguration(currentState.getLastCommittedConfiguration()); + builder.lastAcceptedConfiguration(currentState.getLastAcceptedConfiguration()); if (request.nodes()) { builder.nodes(currentState.nodes()); } diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index 276e00a2ba3db..7a377d10090ab 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -22,6 +22,8 @@ import com.carrotsearch.hppc.cursors.IntObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; + +import org.elasticsearch.Version; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -49,8 +51,10 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.VersionedNamedWriteable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; @@ -58,10 +62,14 @@ import org.elasticsearch.discovery.zen.PublishClusterStateAction; import java.io.IOException; +import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -154,6 +162,8 @@ default boolean isPrivate() { public static final long UNKNOWN_VERSION = -1; + private final long term; + private final long version; private final String stateUUID; @@ -172,16 +182,23 @@ default boolean isPrivate() { private final boolean wasReadFromDiff; + private final VotingConfiguration lastCommittedConfiguration; + + private final VotingConfiguration lastAcceptedConfiguration; + // built on demand private volatile RoutingNodes routingNodes; - public ClusterState(long version, String stateUUID, ClusterState state) { - this(state.clusterName, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), state.customs(), - false); + public ClusterState(long term, long version, String stateUUID, ClusterState state) { + this(state.clusterName, term, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), + state.customs(), state.getLastCommittedConfiguration(), state.getLastAcceptedConfiguration(), false); } - public ClusterState(ClusterName clusterName, long version, String stateUUID, MetaData metaData, RoutingTable routingTable, - DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap customs, boolean wasReadFromDiff) { + public ClusterState(ClusterName clusterName, long term, long version, String stateUUID, MetaData metaData, RoutingTable routingTable, + DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap customs, + VotingConfiguration lastCommittedConfiguration, VotingConfiguration lastAcceptedConfiguration, + boolean wasReadFromDiff) { + this.term = term; this.version = version; this.stateUUID = stateUUID; this.clusterName = clusterName; @@ -190,9 +207,15 @@ public ClusterState(ClusterName clusterName, long version, String stateUUID, Met this.nodes = nodes; this.blocks = blocks; this.customs = customs; + this.lastCommittedConfiguration = lastCommittedConfiguration; + this.lastAcceptedConfiguration = lastAcceptedConfiguration; this.wasReadFromDiff = wasReadFromDiff; } + public long term() { + return term; + } + public long version() { return this.version; } @@ -257,6 +280,14 @@ public ClusterName getClusterName() { return this.clusterName; } + public VotingConfiguration getLastAcceptedConfiguration() { + return lastAcceptedConfiguration; + } + + public VotingConfiguration getLastCommittedConfiguration() { + return lastCommittedConfiguration; + } + // Used for testing and logging to determine how this cluster state was send over the wire public boolean wasReadFromDiff() { return wasReadFromDiff; @@ -277,8 +308,11 @@ public RoutingNodes getRoutingNodes() { public String toString() { StringBuilder sb = new StringBuilder(); sb.append("cluster uuid: ").append(metaData.clusterUUID()).append("\n"); + sb.append("term: ").append(term).append("\n"); sb.append("version: ").append(version).append("\n"); sb.append("state uuid: ").append(stateUUID).append("\n"); + sb.append("last committed config: ").append(getLastCommittedConfiguration()).append("\n"); + sb.append("last accepted config: ").append(getLastAcceptedConfiguration()).append("\n"); sb.append("from_diff: ").append(wasReadFromDiff).append("\n"); sb.append("meta data version: ").append(metaData.version()).append("\n"); final String TAB = " "; @@ -386,8 +420,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("cluster_uuid", metaData().clusterUUID()); if (metrics.contains(Metric.VERSION)) { + builder.field("term", term); builder.field("version", version); builder.field("state_uuid", stateUUID); + builder.field("last_committed_config", lastCommittedConfiguration); + builder.field("last_accepted_config", lastAcceptedConfiguration); } if (metrics.contains(Metric.MASTER_NODE)) { @@ -587,8 +624,11 @@ public static Builder builder(ClusterState state) { public static class Builder { private final ClusterName clusterName; + private long term = 0; private long version = 0; private String uuid = UNKNOWN_UUID; + private VotingConfiguration lastCommittedConfiguration = VotingConfiguration.EMPTY_CONFIG; + private VotingConfiguration lastAcceptedConfiguration = VotingConfiguration.EMPTY_CONFIG; private MetaData metaData = MetaData.EMPTY_META_DATA; private RoutingTable routingTable = RoutingTable.EMPTY_ROUTING_TABLE; private DiscoveryNodes nodes = DiscoveryNodes.EMPTY_NODES; @@ -599,8 +639,11 @@ public static class Builder { public Builder(ClusterState state) { this.clusterName = state.clusterName; + this.term = state.term(); this.version = state.version(); this.uuid = state.stateUUID(); + this.lastCommittedConfiguration = state.getLastCommittedConfiguration(); + this.lastAcceptedConfiguration = state.getLastAcceptedConfiguration(); this.nodes = state.nodes(); this.routingTable = state.routingTable(); this.metaData = state.metaData(); @@ -650,6 +693,11 @@ public Builder blocks(ClusterBlocks blocks) { return this; } + public Builder term(long term) { + this.term = term; + return this; + } + public Builder version(long version) { this.version = version; return this; @@ -666,6 +714,16 @@ public Builder stateUUID(String uuid) { return this; } + public Builder lastCommittedConfiguration(VotingConfiguration config) { + this.lastCommittedConfiguration = config; + return this; + } + + public Builder lastAcceptedConfiguration(VotingConfiguration config) { + this.lastAcceptedConfiguration = config; + return this; + } + public Builder putCustom(String type, Custom custom) { customs.put(type, custom); return this; @@ -690,7 +748,8 @@ public ClusterState build() { if (UNKNOWN_UUID.equals(uuid)) { uuid = UUIDs.randomBase64UUID(); } - return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build(), fromDiff); + return new ClusterState(clusterName, term, version, uuid, metaData, routingTable, nodes, blocks, customs.build(), + lastCommittedConfiguration, lastAcceptedConfiguration, fromDiff); } public static byte[] toBytes(ClusterState state) throws IOException { @@ -722,8 +781,15 @@ public static Diff readDiffFrom(StreamInput in, DiscoveryNode loca public static ClusterState readFrom(StreamInput in, DiscoveryNode localNode) throws IOException { ClusterName clusterName = new ClusterName(in); Builder builder = new Builder(clusterName); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + builder.term = in.readLong(); + } builder.version = in.readLong(); builder.uuid = in.readString(); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + builder.lastCommittedConfiguration(new VotingConfiguration(in)); + builder.lastAcceptedConfiguration(new VotingConfiguration(in)); + } builder.metaData = MetaData.readFrom(in); builder.routingTable = RoutingTable.readFrom(in); builder.nodes = DiscoveryNodes.readFrom(in, localNode); @@ -739,8 +805,15 @@ public static ClusterState readFrom(StreamInput in, DiscoveryNode localNode) thr @Override public void writeTo(StreamOutput out) throws IOException { clusterName.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeLong(term); + } out.writeLong(version); out.writeString(stateUUID); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + lastCommittedConfiguration.writeTo(out); + lastAcceptedConfiguration.writeTo(out); + } metaData.writeTo(out); routingTable.writeTo(out); nodes.writeTo(out); @@ -762,6 +835,8 @@ public void writeTo(StreamOutput out) throws IOException { private static class ClusterStateDiff implements Diff { + private final long toTerm; + private final long toVersion; private final String fromUuid; @@ -770,6 +845,10 @@ private static class ClusterStateDiff implements Diff { private final ClusterName clusterName; + private final VotingConfiguration lastCommittedConfiguration; + + private final VotingConfiguration lastAcceptedConfiguration; + private final Diff routingTable; private final Diff nodes; @@ -783,8 +862,11 @@ private static class ClusterStateDiff implements Diff { ClusterStateDiff(ClusterState before, ClusterState after) { fromUuid = before.stateUUID; toUuid = after.stateUUID; + toTerm = after.term; toVersion = after.version; clusterName = after.clusterName; + lastCommittedConfiguration = after.lastCommittedConfiguration; + lastAcceptedConfiguration = after.lastAcceptedConfiguration; routingTable = after.routingTable.diff(before.routingTable); nodes = after.nodes.diff(before.nodes); metaData = after.metaData.diff(before.metaData); @@ -796,7 +878,19 @@ private static class ClusterStateDiff implements Diff { clusterName = new ClusterName(in); fromUuid = in.readString(); toUuid = in.readString(); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + toTerm = in.readLong(); + } else { + toTerm = 0L; + } toVersion = in.readLong(); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + lastCommittedConfiguration = new VotingConfiguration(in); + lastAcceptedConfiguration = new VotingConfiguration(in); + } else { + lastCommittedConfiguration = VotingConfiguration.EMPTY_CONFIG; + lastAcceptedConfiguration = VotingConfiguration.EMPTY_CONFIG; + } routingTable = RoutingTable.readDiffFrom(in); nodes = DiscoveryNodes.readDiffFrom(in, localNode); metaData = MetaData.readDiffFrom(in); @@ -809,7 +903,14 @@ public void writeTo(StreamOutput out) throws IOException { clusterName.writeTo(out); out.writeString(fromUuid); out.writeString(toUuid); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeLong(toTerm); + } out.writeLong(toVersion); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + lastCommittedConfiguration.writeTo(out); + lastAcceptedConfiguration.writeTo(out); + } routingTable.writeTo(out); nodes.writeTo(out); metaData.writeTo(out); @@ -828,7 +929,10 @@ public ClusterState apply(ClusterState state) { throw new IncompatibleClusterStateVersionException(state.version, state.stateUUID, toVersion, fromUuid); } builder.stateUUID(toUuid); + builder.term(toTerm); builder.version(toVersion); + builder.lastCommittedConfiguration(lastCommittedConfiguration); + builder.lastAcceptedConfiguration(lastAcceptedConfiguration); builder.routingTable(routingTable.apply(state.routingTable)); builder.nodes(nodes.apply(state.nodes)); builder.metaData(metaData.apply(state.metaData)); @@ -840,4 +944,67 @@ public ClusterState apply(ClusterState state) { } + /** + * A collection of persistent node ids, denoting the voting configuration for cluster state changes. + */ + public static class VotingConfiguration implements Writeable, ToXContentFragment { + + public static final VotingConfiguration EMPTY_CONFIG = new VotingConfiguration(Collections.emptySet()); + + private final Set nodeIds; + + public VotingConfiguration(Set nodeIds) { + this.nodeIds = Collections.unmodifiableSet(new HashSet<>(nodeIds)); + } + + public VotingConfiguration(StreamInput in) throws IOException { + nodeIds = Collections.unmodifiableSet(Sets.newHashSet(in.readStringArray())); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeStringArray(nodeIds.toArray(new String[nodeIds.size()])); + } + + public boolean hasQuorum(Collection votes) { + final HashSet intersection = new HashSet<>(nodeIds); + intersection.retainAll(votes); + return intersection.size() * 2 > nodeIds.size(); + } + + public Set getNodeIds() { + return nodeIds; + } + + @Override + public String toString() { + return "VotingConfiguration{" + String.join(",", nodeIds) + "}"; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + VotingConfiguration that = (VotingConfiguration) o; + return Objects.equals(nodeIds, that.nodeIds); + } + + @Override + public int hashCode() { + return Objects.hash(nodeIds); + } + + public boolean isEmpty() { + return nodeIds.isEmpty(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray(); + for (String nodeId : nodeIds) { + builder.value(nodeId); + } + return builder.endArray(); + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ApplyCommit.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ApplyCommit.java new file mode 100644 index 0000000000000..1be2350d17fb1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ApplyCommit.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class ApplyCommit extends TermVersionRequest { + + public ApplyCommit(DiscoveryNode sourceNode, long term, long version) { + super(sourceNode, term, version); + } + + public ApplyCommit(StreamInput in) throws IOException { + super(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + + @Override + public String toString() { + return "ApplyCommit{" + + "term=" + term + + ", version=" + version + + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java new file mode 100644 index 0000000000000..fb97b0965178f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java @@ -0,0 +1,485 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * The core class of the cluster state coordination algorithm, directly implementing the + * formal model + */ +public class CoordinationState extends AbstractComponent { + + private final DiscoveryNode localNode; + + // persisted state + private final PersistedState persistedState; + + // transient state + private VoteCollection joinVotes; + private boolean startedJoinSinceLastReboot; + private boolean electionWon; + private long lastPublishedVersion; + private VotingConfiguration lastPublishedConfiguration; + private VoteCollection publishVotes; + + public CoordinationState(Settings settings, DiscoveryNode localNode, PersistedState persistedState) { + super(settings); + + this.localNode = localNode; + + // persisted state + this.persistedState = persistedState; + + // transient state + this.joinVotes = new VoteCollection(); + this.startedJoinSinceLastReboot = false; + this.electionWon = false; + this.lastPublishedVersion = 0L; + this.lastPublishedConfiguration = persistedState.getLastAcceptedState().getLastAcceptedConfiguration(); + this.publishVotes = new VoteCollection(); + } + + public long getCurrentTerm() { + return persistedState.getCurrentTerm(); + } + + public ClusterState getLastAcceptedState() { + return persistedState.getLastAcceptedState(); + } + + public long getLastAcceptedTerm() { + return getLastAcceptedState().term(); + } + + public long getLastAcceptedVersion() { + return getLastAcceptedState().version(); + } + + public VotingConfiguration getLastCommittedConfiguration() { + return getLastAcceptedState().getLastCommittedConfiguration(); + } + + public VotingConfiguration getLastAcceptedConfiguration() { + return getLastAcceptedState().getLastAcceptedConfiguration(); + } + + public long getLastPublishedVersion() { + return lastPublishedVersion; + } + + public boolean electionWon() { + return electionWon; + } + + public boolean isElectionQuorum(VoteCollection votes) { + return votes.isQuorum(getLastCommittedConfiguration()) && votes.isQuorum(getLastAcceptedConfiguration()); + } + + public boolean isPublishQuorum(VoteCollection votes) { + return votes.isQuorum(getLastCommittedConfiguration()) && votes.isQuorum(lastPublishedConfiguration); + } + + public boolean containsJoinVoteFor(DiscoveryNode node) { + return joinVotes.containsVoteFor(node); + } + + public boolean joinVotesHaveQuorumFor(VotingConfiguration votingConfiguration) { + return joinVotes.isQuorum(votingConfiguration); + } + + /** + * Used to bootstrap a cluster by injecting the initial state and configuration. + * + * @param initialState The initial state to use. Must have term 0, version 1, and non-empty configurations. + * @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object. + */ + public void setInitialState(ClusterState initialState) { + final long lastAcceptedVersion = getLastAcceptedVersion(); + if (lastAcceptedVersion != 0) { + logger.debug("setInitialState: rejecting since last-accepted version {} > 0", lastAcceptedVersion); + throw new CoordinationStateRejectedException("initial state already set: last-accepted version now " + lastAcceptedVersion); + } + + assert getLastAcceptedTerm() == 0; + assert getLastAcceptedConfiguration().isEmpty(); + assert getLastCommittedConfiguration().isEmpty(); + assert lastPublishedVersion == 0; + assert lastPublishedConfiguration.isEmpty(); + assert electionWon == false; + assert joinVotes.isEmpty(); + assert publishVotes.isEmpty(); + + assert initialState.term() == 0; + assert initialState.version() == 1; + assert initialState.getLastAcceptedConfiguration().isEmpty() == false; + assert initialState.getLastCommittedConfiguration().isEmpty() == false; + + persistedState.setLastAcceptedState(initialState); + } + + /** + * May be safely called at any time to move this instance to a new term. + * + * @param startJoinRequest The startJoinRequest, specifying the node requesting the join. + * @return A Join that should be sent to the target node of the join. + * @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object. + */ + public Join handleStartJoin(StartJoinRequest startJoinRequest) { + if (startJoinRequest.getTerm() <= getCurrentTerm()) { + logger.debug("handleStartJoin: ignored as term provided [{}] not greater than current term [{}]", + startJoinRequest.getTerm(), getCurrentTerm()); + throw new CoordinationStateRejectedException("incoming term " + startJoinRequest.getTerm() + + " not greater than current term " + getCurrentTerm()); + } + + logger.debug("handleStartJoin: updating term from [{}] to [{}]", getCurrentTerm(), startJoinRequest.getTerm()); + + persistedState.setCurrentTerm(startJoinRequest.getTerm()); + assert getCurrentTerm() == startJoinRequest.getTerm(); + lastPublishedVersion = 0; + lastPublishedConfiguration = getLastAcceptedConfiguration(); + startedJoinSinceLastReboot = true; + electionWon = false; + joinVotes = new VoteCollection(); + publishVotes = new VoteCollection(); + + return new Join(localNode, startJoinRequest.getSourceNode(), getCurrentTerm(), getLastAcceptedTerm(), getLastAcceptedVersion()); + } + + /** + * May be called on receipt of a Join. + * + * @param join The Join received. + * @return true iff this instance does not already have a join vote from the given source node for this term + * @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object. + */ + public boolean handleJoin(Join join) { + assert join.getTargetNode().equals(localNode) : "handling join " + join + " for the wrong node " + localNode; + + if (join.getTerm() != getCurrentTerm()) { + logger.debug("handleJoin: ignored join due to term mismatch (expected: [{}], actual: [{}])", + getCurrentTerm(), join.getTerm()); + throw new CoordinationStateRejectedException( + "incoming term " + join.getTerm() + " does not match current term " + getCurrentTerm()); + } + + if (startedJoinSinceLastReboot == false) { + logger.debug("handleJoin: ignored join as term was not incremented yet after reboot"); + throw new CoordinationStateRejectedException("ignored join as term has not been incremented yet after reboot"); + } + + final long lastAcceptedTerm = getLastAcceptedTerm(); + if (join.getLastAcceptedTerm() > lastAcceptedTerm) { + logger.debug("handleJoin: ignored join as joiner has a better last accepted term (expected: <=[{}], actual: [{}])", + lastAcceptedTerm, join.getLastAcceptedTerm()); + throw new CoordinationStateRejectedException("incoming last accepted term " + join.getLastAcceptedTerm() + + " of join higher than current last accepted term " + lastAcceptedTerm); + } + + if (join.getLastAcceptedTerm() == lastAcceptedTerm && join.getLastAcceptedVersion() > getLastAcceptedVersion()) { + logger.debug("handleJoin: ignored join as joiner has a better last accepted version (expected: <=[{}], actual: [{}])", + getLastAcceptedVersion(), join.getLastAcceptedVersion()); + throw new CoordinationStateRejectedException("incoming last accepted version " + join.getLastAcceptedVersion() + + " of join higher than current last accepted version " + getLastAcceptedVersion()); + } + + if (getLastAcceptedVersion() == 0) { + // We do not check for an election won on setting the initial configuration, so it would be possible to end up in a state where + // we have enough join votes to have won the election immediately on setting the initial configuration. It'd be quite + // complicated to restore all the appropriate invariants when setting the initial configuration (it's not just electionWon) + // so instead we just reject join votes received prior to receiving the initial configuration. + logger.debug("handleJoin: ignored join because initial configuration not set"); + throw new CoordinationStateRejectedException("initial configuration not set"); + } + + boolean added = joinVotes.addVote(join.getSourceNode()); + boolean prevElectionWon = electionWon; + electionWon = isElectionQuorum(joinVotes); + logger.debug("handleJoin: added join {} from [{}] for election, electionWon={} lastAcceptedTerm={} lastAcceptedVersion={}", join, + join.getSourceNode(), electionWon, lastAcceptedTerm, getLastAcceptedVersion()); + + if (electionWon && prevElectionWon == false) { + lastPublishedVersion = getLastAcceptedVersion(); + } + return added; + } + + /** + * May be called in order to prepare publication of the given cluster state + * + * @param clusterState The cluster state to publish. + * @return A PublishRequest to publish the given cluster state + * @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object. + */ + public PublishRequest handleClientValue(ClusterState clusterState) { + if (electionWon == false) { + logger.debug("handleClientValue: ignored request as election not won"); + throw new CoordinationStateRejectedException("election not won"); + } + if (lastPublishedVersion != getLastAcceptedVersion()) { + logger.debug("handleClientValue: cannot start publishing next value before accepting previous one"); + throw new CoordinationStateRejectedException("cannot start publishing next value before accepting previous one"); + } + if (clusterState.term() != getCurrentTerm()) { + logger.debug("handleClientValue: ignored request due to term mismatch " + + "(expected: [term {} version >{}], actual: [term {} version {}])", + getCurrentTerm(), lastPublishedVersion, clusterState.term(), clusterState.version()); + throw new CoordinationStateRejectedException("incoming term " + clusterState.term() + " does not match current term " + + getCurrentTerm()); + } + if (clusterState.version() <= lastPublishedVersion) { + logger.debug("handleClientValue: ignored request due to version mismatch " + + "(expected: [term {} version >{}], actual: [term {} version {}])", + getCurrentTerm(), lastPublishedVersion, clusterState.term(), clusterState.version()); + throw new CoordinationStateRejectedException("incoming cluster state version " + clusterState.version() + + " lower or equal to last published version " + lastPublishedVersion); + } + + if (clusterState.getLastAcceptedConfiguration().equals(getLastAcceptedConfiguration()) == false + && getLastCommittedConfiguration().equals(getLastAcceptedConfiguration()) == false) { + logger.debug("handleClientValue: only allow reconfiguration while not already reconfiguring"); + throw new CoordinationStateRejectedException("only allow reconfiguration while not already reconfiguring"); + } + if (joinVotesHaveQuorumFor(clusterState.getLastAcceptedConfiguration()) == false) { + logger.debug("handleClientValue: only allow reconfiguration if joinVotes have quorum for new config"); + throw new CoordinationStateRejectedException("only allow reconfiguration if joinVotes have quorum for new config"); + } + + assert clusterState.getLastCommittedConfiguration().equals(getLastCommittedConfiguration()) : + "last committed configuration should not change"; + + lastPublishedVersion = clusterState.version(); + lastPublishedConfiguration = clusterState.getLastAcceptedConfiguration(); + publishVotes = new VoteCollection(); + + logger.trace("handleClientValue: processing request for version [{}] and term [{}]", lastPublishedVersion, getCurrentTerm()); + + return new PublishRequest(clusterState); + } + + /** + * May be called on receipt of a PublishRequest. + * + * @param publishRequest The publish request received. + * @return A PublishResponse which can be sent back to the sender of the PublishRequest. + * @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object. + */ + public PublishResponse handlePublishRequest(PublishRequest publishRequest) { + final ClusterState clusterState = publishRequest.getAcceptedState(); + if (clusterState.term() != getCurrentTerm()) { + logger.debug("handlePublishRequest: ignored publish request due to term mismatch (expected: [{}], actual: [{}])", + getCurrentTerm(), clusterState.term()); + throw new CoordinationStateRejectedException("incoming term " + clusterState.term() + " does not match current term " + + getCurrentTerm()); + } + if (clusterState.term() == getLastAcceptedTerm() && clusterState.version() <= getLastAcceptedVersion()) { + logger.debug("handlePublishRequest: ignored publish request due to version mismatch (expected: >[{}], actual: [{}])", + getLastAcceptedVersion(), clusterState.version()); + throw new CoordinationStateRejectedException("incoming version " + clusterState.version() + + " lower or equal to current version " + getLastAcceptedVersion()); + } + + logger.trace("handlePublishRequest: accepting publish request for version [{}] and term [{}]", + clusterState.version(), clusterState.term()); + persistedState.setLastAcceptedState(clusterState); + assert getLastAcceptedState() == clusterState; + + return new PublishResponse(clusterState.term(), clusterState.version()); + } + + /** + * May be called on receipt of a PublishResponse from the given sourceNode. + * + * @param sourceNode The sender of the PublishResponse received. + * @param publishResponse The PublishResponse received. + * @return An optional ApplyCommit which, if present, may be broadcast to all peers, indicating that this publication + * has been accepted at a quorum of peers and is therefore committed. + * @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object. + */ + public Optional handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) { + if (electionWon == false) { + logger.debug("handlePublishResponse: ignored response as election not won"); + throw new CoordinationStateRejectedException("election not won"); + } + if (publishResponse.getTerm() != getCurrentTerm()) { + logger.debug("handlePublishResponse: ignored publish response due to term mismatch (expected: [{}], actual: [{}])", + getCurrentTerm(), publishResponse.getTerm()); + throw new CoordinationStateRejectedException("incoming term " + publishResponse.getTerm() + + " does not match current term " + getCurrentTerm()); + } + if (publishResponse.getVersion() != lastPublishedVersion) { + logger.debug("handlePublishResponse: ignored publish response due to version mismatch (expected: [{}], actual: [{}])", + lastPublishedVersion, publishResponse.getVersion()); + throw new CoordinationStateRejectedException("incoming version " + publishResponse.getVersion() + + " does not match current version " + lastPublishedVersion); + } + + logger.trace("handlePublishResponse: accepted publish response for version [{}] and term [{}] from [{}]", + publishResponse.getVersion(), publishResponse.getTerm(), sourceNode); + publishVotes.addVote(sourceNode); + if (isPublishQuorum(publishVotes)) { + logger.trace("handlePublishResponse: value committed for version [{}] and term [{}]", + publishResponse.getVersion(), publishResponse.getTerm()); + return Optional.of(new ApplyCommit(localNode, publishResponse.getTerm(), publishResponse.getVersion())); + } + + return Optional.empty(); + } + + /** + * May be called on receipt of an ApplyCommit. Updates the committed configuration accordingly. + * + * @param applyCommit The ApplyCommit received. + * @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object. + */ + public void handleCommit(ApplyCommit applyCommit) { + if (applyCommit.getTerm() != getCurrentTerm()) { + logger.debug("handleCommit: ignored commit request due to term mismatch " + + "(expected: [term {} version {}], actual: [term {} version {}])", + getLastAcceptedTerm(), getLastAcceptedVersion(), applyCommit.getTerm(), applyCommit.getVersion()); + throw new CoordinationStateRejectedException("incoming term " + applyCommit.getTerm() + " does not match current term " + + getCurrentTerm()); + } + if (applyCommit.getTerm() != getLastAcceptedTerm()) { + logger.debug("handleCommit: ignored commit request due to term mismatch " + + "(expected: [term {} version {}], actual: [term {} version {}])", + getLastAcceptedTerm(), getLastAcceptedVersion(), applyCommit.getTerm(), applyCommit.getVersion()); + throw new CoordinationStateRejectedException("incoming term " + applyCommit.getTerm() + " does not match last accepted term " + + getLastAcceptedTerm()); + } + if (applyCommit.getVersion() != getLastAcceptedVersion()) { + logger.debug("handleCommit: ignored commit request due to version mismatch (term {}, expected: [{}], actual: [{}])", + getLastAcceptedTerm(), getLastAcceptedVersion(), applyCommit.getVersion()); + throw new CoordinationStateRejectedException("incoming version " + applyCommit.getVersion() + + " does not match current version " + getLastAcceptedVersion()); + } + + logger.trace("handleCommit: applying commit request for term [{}] and version [{}]", applyCommit.getTerm(), + applyCommit.getVersion()); + + persistedState.markLastAcceptedConfigAsCommitted(); + assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration()); + } + + /** + * Pluggable persistence layer for {@link CoordinationState}. + * + */ + public interface PersistedState { + + /** + * Returns the current term + */ + long getCurrentTerm(); + + /** + * Returns the last accepted cluster state + */ + ClusterState getLastAcceptedState(); + + /** + * Sets a new current term. + * After a successful call to this method, {@link #getCurrentTerm()} should return the last term that was set. + * The value returned by {@link #getLastAcceptedState()} should not be influenced by calls to this method. + */ + void setCurrentTerm(long currentTerm); + + /** + * Sets a new last accepted cluster state. + * After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set. + * The value returned by {@link #getCurrentTerm()} should not be influenced by calls to this method. + */ + void setLastAcceptedState(ClusterState clusterState); + + /** + * Marks the last accepted cluster state as committed. + * After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set, + * with the last committed configuration now corresponding to the last accepted configuration. + */ + default void markLastAcceptedConfigAsCommitted() { + final ClusterState lastAcceptedState = getLastAcceptedState(); + setLastAcceptedState(ClusterState.builder(lastAcceptedState) + .lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration()) + .build()); + } + } + + /** + * A collection of votes, used to calculate quorums. + */ + public static class VoteCollection { + + private final Map nodes; + + public boolean addVote(DiscoveryNode sourceNode) { + return nodes.put(sourceNode.getId(), sourceNode) == null; + } + + public VoteCollection() { + nodes = new HashMap<>(); + } + + public boolean isQuorum(VotingConfiguration configuration) { + return configuration.hasQuorum(nodes.keySet()); + } + + public boolean containsVoteFor(DiscoveryNode node) { + return nodes.containsKey(node.getId()); + } + + public boolean isEmpty() { + return nodes.isEmpty(); + } + + public Collection nodes() { + return Collections.unmodifiableCollection(nodes.values()); + } + + @Override + public String toString() { + return "VoteCollection{" + String.join(",", nodes.keySet()) + "}"; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + VoteCollection that = (VoteCollection) o; + + return nodes.equals(that.nodes); + } + + @Override + public int hashCode() { + return nodes.hashCode(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationStateRejectedException.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationStateRejectedException.java new file mode 100644 index 0000000000000..fa58d86a1460b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationStateRejectedException.java @@ -0,0 +1,35 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + +public class CoordinationStateRejectedException extends ElasticsearchException { + public CoordinationStateRejectedException(String msg, Object... args) { + super(msg, args); + } + + public CoordinationStateRejectedException(StreamInput in) throws IOException { + super(in); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Join.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Join.java new file mode 100644 index 0000000000000..3ebc45b34ac2d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Join.java @@ -0,0 +1,118 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; + +public class Join implements Writeable { + private final DiscoveryNode sourceNode; + private final DiscoveryNode targetNode; + private final long term; + private final long lastAcceptedTerm; + private final long lastAcceptedVersion; + + public Join(DiscoveryNode sourceNode, DiscoveryNode targetNode, long term, long lastAcceptedTerm, long lastAcceptedVersion) { + assert term >= 0; + assert lastAcceptedTerm >= 0; + assert lastAcceptedVersion >= 0; + + this.sourceNode = sourceNode; + this.targetNode = targetNode; + this.term = term; + this.lastAcceptedTerm = lastAcceptedTerm; + this.lastAcceptedVersion = lastAcceptedVersion; + } + + public Join(StreamInput in) throws IOException { + sourceNode = new DiscoveryNode(in); + targetNode = new DiscoveryNode(in); + term = in.readLong(); + lastAcceptedTerm = in.readLong(); + lastAcceptedVersion = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + sourceNode.writeTo(out); + targetNode.writeTo(out); + out.writeLong(term); + out.writeLong(lastAcceptedTerm); + out.writeLong(lastAcceptedVersion); + } + + public DiscoveryNode getSourceNode() { + return sourceNode; + } + + public DiscoveryNode getTargetNode() { + return targetNode; + } + + public long getLastAcceptedVersion() { + return lastAcceptedVersion; + } + + public long getTerm() { + return term; + } + + public long getLastAcceptedTerm() { + return lastAcceptedTerm; + } + + @Override + public String toString() { + return "Join{" + + "term=" + term + + ", lastAcceptedTerm=" + lastAcceptedTerm + + ", lastAcceptedVersion=" + lastAcceptedVersion + + ", sourceNode=" + sourceNode + + ", targetNode=" + targetNode + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Join join = (Join) o; + + if (sourceNode.equals(join.sourceNode) == false) return false; + if (targetNode.equals(join.targetNode) == false) return false; + if (lastAcceptedVersion != join.lastAcceptedVersion) return false; + if (term != join.term) return false; + return lastAcceptedTerm == join.lastAcceptedTerm; + } + + @Override + public int hashCode() { + int result = (int) (lastAcceptedVersion ^ (lastAcceptedVersion >>> 32)); + result = 31 * result + sourceNode.hashCode(); + result = 31 * result + targetNode.hashCode(); + result = 31 * result + (int) (term ^ (term >>> 32)); + result = 31 * result + (int) (lastAcceptedTerm ^ (lastAcceptedTerm >>> 32)); + return result; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublishRequest.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishRequest.java new file mode 100644 index 0000000000000..5357ff022368b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishRequest.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.TransportRequest; + +import java.io.IOException; +import java.util.Objects; + +public class PublishRequest extends TransportRequest { + + private final ClusterState acceptedState; + + public PublishRequest(ClusterState acceptedState) { + this.acceptedState = acceptedState; + } + + public PublishRequest(StreamInput in, DiscoveryNode localNode) throws IOException { + super(in); + acceptedState = ClusterState.readFrom(in, localNode); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + acceptedState.writeTo(out); + } + + public ClusterState getAcceptedState() { + return acceptedState; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof PublishRequest)) return false; + + PublishRequest that = (PublishRequest) o; + + return acceptedState.term() == that.acceptedState.term() && + acceptedState.version() == that.acceptedState.version(); + } + + @Override + public int hashCode() { + return Objects.hash(acceptedState.term(), acceptedState.version()); + } + + @Override + public String toString() { + return "PublishRequest{term=" + acceptedState.term() + + ", version=" + acceptedState.version() + + ", state=" + acceptedState + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java new file mode 100644 index 0000000000000..fb1adb1f21a9a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java @@ -0,0 +1,48 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class PublishResponse extends TermVersionResponse { + + public PublishResponse(long term, long version) { + super(term, version); + } + + public PublishResponse(StreamInput in) throws IOException { + super(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + + @Override + public String toString() { + return "PublishResponse{" + + "term=" + term + + ", version=" + version + + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/StartJoinRequest.java b/server/src/main/java/org/elasticsearch/cluster/coordination/StartJoinRequest.java new file mode 100644 index 0000000000000..6554c4b21d9a8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/StartJoinRequest.java @@ -0,0 +1,84 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.TransportRequest; + +import java.io.IOException; + +public class StartJoinRequest extends TransportRequest { + + private final DiscoveryNode sourceNode; + + private final long term; + + public StartJoinRequest(DiscoveryNode sourceNode, long term) { + this.sourceNode = sourceNode; + this.term = term; + } + + public StartJoinRequest(StreamInput input) throws IOException { + super(input); + this.sourceNode = new DiscoveryNode(input); + this.term = input.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + sourceNode.writeTo(out); + out.writeLong(term); + } + + public DiscoveryNode getSourceNode() { + return sourceNode; + } + + public long getTerm() { + return term; + } + + @Override + public String toString() { + return "StartJoinRequest{" + + "term=" + term + + ",node=" + sourceNode + "}"; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof StartJoinRequest)) return false; + + StartJoinRequest that = (StartJoinRequest) o; + + if (term != that.term) return false; + return sourceNode.equals(that.sourceNode); + } + + @Override + public int hashCode() { + int result = sourceNode.hashCode(); + result = 31 * result + (int) (term ^ (term >>> 32)); + return result; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionRequest.java b/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionRequest.java new file mode 100644 index 0000000000000..e591803e167a1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionRequest.java @@ -0,0 +1,98 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.transport.TransportRequest; + +import java.io.IOException; + +abstract class TermVersionRequest extends TransportRequest implements Writeable { + protected final DiscoveryNode sourceNode; + protected final long term; + protected final long version; + + TermVersionRequest(DiscoveryNode sourceNode, long term, long version) { + assert term >= 0; + assert version >= 0; + + this.sourceNode = sourceNode; + this.term = term; + this.version = version; + } + + TermVersionRequest(StreamInput in) throws IOException { + super(in); + sourceNode = new DiscoveryNode(in); + term = in.readLong(); + version = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + sourceNode.writeTo(out); + out.writeLong(term); + out.writeLong(version); + } + + public DiscoveryNode getSourceNode() { + return sourceNode; + } + + public long getTerm() { + return term; + } + + public long getVersion() { + return version; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TermVersionRequest versionTerm = (TermVersionRequest) o; + + if (term != versionTerm.term) return false; + if (version != versionTerm.version) return false; + return sourceNode.equals(versionTerm.sourceNode); + } + + @Override + public int hashCode() { + int result = (int) (term ^ (term >>> 32)); + result = 31 * result + (int) (version ^ (version >>> 32)); + result = 31 * result + sourceNode.hashCode(); + return result; + } + + @Override + public String toString() { + return "TermVersionRequest{" + + "term=" + term + + ", version=" + version + + ", sourceNode=" + sourceNode + + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionResponse.java b/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionResponse.java new file mode 100644 index 0000000000000..5eba2e6b732a5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionResponse.java @@ -0,0 +1,82 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.TransportResponse; + +import java.io.IOException; + +abstract class TermVersionResponse extends TransportResponse { + protected final long term; + protected final long version; + + TermVersionResponse(long term, long version) { + assert term >= 0; + assert version >= 0; + + this.term = term; + this.version = version; + } + + TermVersionResponse(StreamInput in) throws IOException { + this(in.readLong(), in.readLong()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(term); + out.writeLong(version); + } + + public long getTerm() { + return term; + } + + public long getVersion() { + return version; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TermVersionResponse response = (TermVersionResponse) o; + + if (term != response.term) return false; + return version == response.version; + } + + @Override + public int hashCode() { + int result = (int) (term ^ (term >>> 32)); + result = 31 * result + (int) (version ^ (version >>> 32)); + return result; + } + + @Override + public String toString() { + return "TermVersionResponse{" + + "term=" + term + + ", version=" + version + + '}'; + } +} diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 1f62eb706a84b..091c5846748ba 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.client.AbstractClientHeadersTestCase; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IllegalShardRoutingStateException; import org.elasticsearch.cluster.routing.ShardRouting; @@ -810,6 +811,7 @@ public void testIds() { ids.put(147, org.elasticsearch.env.ShardLockObtainFailedException.class); ids.put(148, UnknownNamedObjectException.class); ids.put(149, MultiBucketConsumerService.TooManyBucketsException.class); + ids.put(150, CoordinationStateRejectedException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java index 7d671096514f4..20a9035df9aaf 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java @@ -70,8 +70,11 @@ public void testToXContent() throws IOException { " \"acknowledged\" : true,\n" + " \"state\" : {\n" + " \"cluster_uuid\" : \"_na_\",\n" + + " \"term\" : 0,\n" + " \"version\" : 0,\n" + " \"state_uuid\" : \"" + clusterState.stateUUID() + "\",\n" + + " \"last_committed_config\" : [ ],\n" + + " \"last_accepted_config\" : [ ],\n" + " \"master_node\" : \"node0\",\n" + " \"blocks\" : { },\n" + " \"nodes\" : {\n" + @@ -138,8 +141,11 @@ public void testToXContent() throws IOException { " \"acknowledged\" : true,\n" + " \"state\" : {\n" + " \"cluster_uuid\" : \"_na_\",\n" + + " \"term\" : 0,\n" + " \"version\" : 0,\n" + " \"state_uuid\" : \"" + clusterState.stateUUID() + "\",\n" + + " \"last_committed_config\" : [ ],\n" + + " \"last_accepted_config\" : [ ],\n" + " \"master_node\" : \"node0\"\n" + " },\n" + " \"explanations\" : [\n" + diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java index a5d865a274140..da1924140b0cd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java @@ -97,7 +97,7 @@ public void testClusterStateDiffSerialization() throws Exception { if (i > 0) { clusterState = builder.build(); } - switch (randomInt(4)) { + switch (randomInt(5)) { case 0: builder = randomNodes(clusterState); break; @@ -113,11 +113,14 @@ public void testClusterStateDiffSerialization() throws Exception { case 4: builder = randomMetaDataChanges(clusterState); break; + case 5: + builder = randomVotingConfiguration(clusterState); + break; default: throw new IllegalArgumentException("Shouldn't be here"); } } - clusterState = builder.incrementVersion().build(); + clusterState = builder.incrementVersion().term(randomLong()).build(); if (randomIntBetween(0, 10) < 1) { // Update cluster state via full serialization from time to time @@ -141,7 +144,10 @@ public void testClusterStateDiffSerialization() throws Exception { try { // Check non-diffable elements assertThat(clusterStateFromDiffs.version(), equalTo(clusterState.version())); + assertThat(clusterStateFromDiffs.term(), equalTo(clusterState.term())); assertThat(clusterStateFromDiffs.stateUUID(), equalTo(clusterState.stateUUID())); + assertThat(clusterStateFromDiffs.getLastAcceptedConfiguration(), equalTo(clusterState.getLastAcceptedConfiguration())); + assertThat(clusterStateFromDiffs.getLastCommittedConfiguration(), equalTo(clusterState.getLastCommittedConfiguration())); // Check nodes assertThat(clusterStateFromDiffs.nodes().getNodes(), equalTo(clusterState.nodes().getNodes())); @@ -190,6 +196,20 @@ public void testClusterStateDiffSerialization() throws Exception { } + private ClusterState.Builder randomVotingConfiguration(ClusterState clusterState) { + ClusterState.Builder builder = ClusterState.builder(clusterState); + if (randomBoolean()) { + builder.lastCommittedConfiguration( + new ClusterState.VotingConfiguration(Sets.newHashSet(generateRandomStringArray(10, 10, false)))); + } + if (randomBoolean()) { + builder.lastAcceptedConfiguration( + new ClusterState.VotingConfiguration(Sets.newHashSet(generateRandomStringArray(10, 10, false)))); + } + + return builder; + } + /** * Randomly updates nodes in the cluster state */ diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java index 21c661230eb7d..7d900406e0804 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java @@ -19,10 +19,18 @@ package org.elasticsearch.cluster; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.EqualsHashCodeTestUtils; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -55,6 +63,70 @@ public void testSupersedes() { // state from the same master compare by version assertThat(withMaster1a.supersedes(withMaster1b), equalTo(withMaster1a.version() > withMaster1b.version())); + } + + public void testVotingConfiguration() { + VotingConfiguration config0 = new VotingConfiguration(Sets.newHashSet()); + assertThat(config0, equalTo(VotingConfiguration.EMPTY_CONFIG)); + assertThat(config0.getNodeIds(), equalTo(Sets.newHashSet())); + assertThat(config0.isEmpty(), equalTo(true)); + assertThat(config0.hasQuorum(Sets.newHashSet()), equalTo(false)); + assertThat(config0.hasQuorum(Sets.newHashSet("id1")), equalTo(false)); + + VotingConfiguration config1 = new VotingConfiguration(Sets.newHashSet("id1")); + assertThat(config1.getNodeIds(), equalTo(Sets.newHashSet("id1"))); + assertThat(config1.isEmpty(), equalTo(false)); + assertThat(config1.hasQuorum(Sets.newHashSet("id1")), equalTo(true)); + assertThat(config1.hasQuorum(Sets.newHashSet("id1", "id2")), equalTo(true)); + assertThat(config1.hasQuorum(Sets.newHashSet("id2")), equalTo(false)); + assertThat(config1.hasQuorum(Sets.newHashSet()), equalTo(false)); + + VotingConfiguration config2 = new VotingConfiguration(Sets.newHashSet("id1", "id2")); + assertThat(config2.getNodeIds(), equalTo(Sets.newHashSet("id1", "id2"))); + assertThat(config2.isEmpty(), equalTo(false)); + assertThat(config2.hasQuorum(Sets.newHashSet("id1", "id2")), equalTo(true)); + assertThat(config2.hasQuorum(Sets.newHashSet("id1", "id2", "id3")), equalTo(true)); + assertThat(config2.hasQuorum(Sets.newHashSet("id1")), equalTo(false)); + assertThat(config2.hasQuorum(Sets.newHashSet("id2")), equalTo(false)); + assertThat(config2.hasQuorum(Sets.newHashSet("id3")), equalTo(false)); + assertThat(config2.hasQuorum(Sets.newHashSet("id1", "id3")), equalTo(false)); + assertThat(config2.hasQuorum(Sets.newHashSet()), equalTo(false)); + + VotingConfiguration config3 = new VotingConfiguration(Sets.newHashSet("id1", "id2", "id3")); + assertThat(config3.getNodeIds(), equalTo(Sets.newHashSet("id1", "id2", "id3"))); + assertThat(config3.isEmpty(), equalTo(false)); + assertThat(config3.hasQuorum(Sets.newHashSet("id1", "id2")), equalTo(true)); + assertThat(config3.hasQuorum(Sets.newHashSet("id2", "id3")), equalTo(true)); + assertThat(config3.hasQuorum(Sets.newHashSet("id1", "id3")), equalTo(true)); + assertThat(config3.hasQuorum(Sets.newHashSet("id1", "id2", "id3")), equalTo(true)); + assertThat(config3.hasQuorum(Sets.newHashSet("id1", "id2", "id4")), equalTo(true)); + assertThat(config3.hasQuorum(Sets.newHashSet("id1")), equalTo(false)); + assertThat(config3.hasQuorum(Sets.newHashSet("id2")), equalTo(false)); + assertThat(config3.hasQuorum(Sets.newHashSet("id3")), equalTo(false)); + assertThat(config3.hasQuorum(Sets.newHashSet("id1", "id4")), equalTo(false)); + assertThat(config3.hasQuorum(Sets.newHashSet("id1", "id4", "id5")), equalTo(false)); + assertThat(config3.hasQuorum(Sets.newHashSet()), equalTo(false)); + } + public void testVotingConfigurationSerializationEqualsHashCode() { + VotingConfiguration initialConfig = new VotingConfiguration( + Sets.newHashSet(generateRandomStringArray(randomInt(10), 20, false))); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialConfig, + orig -> ESTestCase.copyWriteable(orig, new NamedWriteableRegistry(Collections.emptyList()), VotingConfiguration::new), + cfg -> { + Set newNodeIds = new HashSet<>(cfg.getNodeIds()); + if (cfg.isEmpty() == false && randomBoolean()) { + // remove random element + newNodeIds.remove(randomFrom(cfg.getNodeIds())); + } else if (cfg.isEmpty() == false && randomBoolean()) { + // change random element + newNodeIds.remove(randomFrom(cfg.getNodeIds())); + newNodeIds.add(randomAlphaOfLength(20)); + } else { + // add random element + newNodeIds.add(randomAlphaOfLength(20)); + } + return new VotingConfiguration(newNodeIds); + }); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java new file mode 100644 index 0000000000000..eeefdfc4831eb --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java @@ -0,0 +1,670 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.Assertions; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.node.Node; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.EqualsHashCodeTestUtils; +import org.junit.Before; + +import java.util.Collections; +import java.util.Optional; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class CoordinationStateTests extends ESTestCase { + + DiscoveryNode node1; + DiscoveryNode node2; + DiscoveryNode node3; + + ClusterState initialStateNode1; + ClusterState initialStateNode2; + ClusterState initialStateNode3; + + CoordinationState cs1; + CoordinationState cs2; + CoordinationState cs3; + + @Before + public void setupNodes() { + node1 = createNode("node1"); + node2 = createNode("node2"); + node3 = createNode("node3"); + + initialStateNode1 = clusterState(0L, 0L, node1, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 42L); + initialStateNode2 = clusterState(0L, 0L, node2, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 42L); + initialStateNode3 = clusterState(0L, 0L, node3, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 42L); + + cs1 = createCoordinationState(new InMemoryPersistedState(0L, initialStateNode1), node1); + cs2 = createCoordinationState(new InMemoryPersistedState(0L, initialStateNode2), node2); + cs3 = createCoordinationState(new InMemoryPersistedState(0L, initialStateNode3), node2); + } + + private DiscoveryNode createNode(String id) { + return new DiscoveryNode(id, buildNewFakeTransportAddress(), Version.CURRENT); + } + + public void testSetInitialState() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + assertTrue(state1.getLastAcceptedConfiguration().hasQuorum(Collections.singleton(node1.getId()))); + assertTrue(state1.getLastCommittedConfiguration().hasQuorum(Collections.singleton(node1.getId()))); + cs1.setInitialState(state1); + assertThat(cs1.getLastAcceptedState(), equalTo(state1)); + } + + public void testSetInitialStateWhenAlreadySet() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + assertTrue(state1.getLastAcceptedConfiguration().hasQuorum(Collections.singleton(node1.getId()))); + assertTrue(state1.getLastCommittedConfiguration().hasQuorum(Collections.singleton(node1.getId()))); + cs1.setInitialState(state1); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.setInitialState(state1)).getMessage(), + containsString("initial state already set")); + } + + public void testStartJoinBeforeBootstrap() { + assertThat(cs1.getCurrentTerm(), equalTo(0L)); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(randomFrom(node1, node2), randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertThat(v1.getTargetNode(), equalTo(startJoinRequest1.getSourceNode())); + assertThat(v1.getSourceNode(), equalTo(node1)); + assertThat(v1.getTerm(), equalTo(startJoinRequest1.getTerm())); + assertThat(v1.getLastAcceptedTerm(), equalTo(initialStateNode1.term())); + assertThat(v1.getLastAcceptedVersion(), equalTo(initialStateNode1.version())); + assertThat(cs1.getCurrentTerm(), equalTo(startJoinRequest1.getTerm())); + + StartJoinRequest startJoinRequest2 = new StartJoinRequest(randomFrom(node1, node2), + randomLongBetween(0, startJoinRequest1.getTerm())); + expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleStartJoin(startJoinRequest2)); + } + + public void testStartJoinAfterBootstrap() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + assertTrue(state1.getLastAcceptedConfiguration().hasQuorum(Collections.singleton(node1.getId()))); + assertTrue(state1.getLastCommittedConfiguration().hasQuorum(Collections.singleton(node1.getId()))); + cs1.setInitialState(state1); + + StartJoinRequest startJoinRequest1 = new StartJoinRequest(randomFrom(node1, node2), randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertThat(v1.getTargetNode(), equalTo(startJoinRequest1.getSourceNode())); + assertThat(v1.getSourceNode(), equalTo(node1)); + assertThat(v1.getTerm(), equalTo(startJoinRequest1.getTerm())); + assertThat(v1.getLastAcceptedTerm(), equalTo(state1.term())); + assertThat(v1.getLastAcceptedVersion(), equalTo(state1.version())); + assertThat(cs1.getCurrentTerm(), equalTo(startJoinRequest1.getTerm())); + + StartJoinRequest startJoinRequest2 = new StartJoinRequest(randomFrom(node1, node2), + randomLongBetween(0, startJoinRequest1.getTerm())); + expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleStartJoin(startJoinRequest2)); + } + + public void testJoinBeforeBootstrap() { + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleJoin(v1)).getMessage(), + containsString("initial configuration not set")); + } + + public void testJoinWithWrongTarget() { + assumeTrue("test only works with assertions enabled", Assertions.ENABLED); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertThat(expectThrows(AssertionError.class, () -> cs1.handleJoin(v1)).getMessage(), + containsString("wrong node")); + } + + public void testJoinWithBadCurrentTerm() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5)); + cs1.handleStartJoin(startJoinRequest1); + Join badJoin = new Join(randomFrom(node1, node2), node1, randomLongBetween(0, startJoinRequest1.getTerm() - 1), + randomNonNegativeLong(), randomNonNegativeLong()); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleJoin(badJoin)).getMessage(), + containsString("does not match current term")); + } + + public void testJoinWithHigherAcceptedTerm() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5)); + cs1.handleStartJoin(startJoinRequest1); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(2, 20), node1, initialConfig, initialConfig, 42L); + cs1.handlePublishRequest(new PublishRequest(state2)); + StartJoinRequest startJoinRequest2 = new StartJoinRequest(node2, randomLongBetween(startJoinRequest1.getTerm() + 1, 10)); + Join v1 = cs1.handleStartJoin(startJoinRequest2); + + Join badJoin = new Join(randomFrom(node1, node2), node1, v1.getTerm(), randomLongBetween(state2.term() + 1, 30), + randomNonNegativeLong()); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleJoin(badJoin)).getMessage(), + containsString("higher than current last accepted term")); + } + + public void testJoinWithHigherVersion() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5)); + cs1.handleStartJoin(startJoinRequest1); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(2, 20), node1, initialConfig, initialConfig, 42L); + cs1.handlePublishRequest(new PublishRequest(state2)); + StartJoinRequest startJoinRequest2 = new StartJoinRequest(node2, randomLongBetween(startJoinRequest1.getTerm() + 1, 10)); + Join v1 = cs1.handleStartJoin(startJoinRequest2); + + Join badJoin = new Join(randomFrom(node1, node2), node1, v1.getTerm(), state2.term(), + randomLongBetween(state2.version() + 1, 30)); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleJoin(badJoin)).getMessage(), + containsString("higher than current last accepted version")); + } + + public void testJoinWinsElection() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5)); + cs1.handleStartJoin(startJoinRequest1); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(2, 20), node1, initialConfig, initialConfig, 42L); + cs1.handlePublishRequest(new PublishRequest(state2)); + StartJoinRequest startJoinRequest2 = new StartJoinRequest(node2, randomLongBetween(startJoinRequest1.getTerm() + 1, 10)); + Join v1 = cs1.handleStartJoin(startJoinRequest2); + + Join join = new Join(node1, node1, v1.getTerm(), randomLongBetween(0, state2.term()), + randomLongBetween(0, state2.version())); + assertTrue(cs1.handleJoin(join)); + assertTrue(cs1.electionWon()); + assertTrue(cs1.containsJoinVoteFor(node1)); + assertFalse(cs1.containsJoinVoteFor(node2)); + assertEquals(cs1.getLastPublishedVersion(), cs1.getLastAcceptedVersion()); + assertFalse(cs1.handleJoin(join)); + } + + public void testJoinDoesNotWinElection() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5)); + cs1.handleStartJoin(startJoinRequest1); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(2, 20), node1, initialConfig, initialConfig, 42L); + cs1.handlePublishRequest(new PublishRequest(state2)); + StartJoinRequest startJoinRequest2 = new StartJoinRequest(node2, randomLongBetween(startJoinRequest1.getTerm() + 1, 10)); + Join v1 = cs1.handleStartJoin(startJoinRequest2); + + Join join = new Join(node2, node1, v1.getTerm(), randomLongBetween(0, state2.term()), randomLongBetween(0, state2.version())); + assertTrue(cs1.handleJoin(join)); + assertFalse(cs1.electionWon()); + assertEquals(cs1.getLastPublishedVersion(), 0L); + assertFalse(cs1.handleJoin(join)); + } + + public void testHandleClientValue() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + Join v2 = cs2.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + assertTrue(cs1.handleJoin(v2)); + + VotingConfiguration newConfig = new VotingConfiguration(Collections.singleton(node2.getId())); + + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, newConfig, 42L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + assertThat(publishRequest.getAcceptedState(), equalTo(state2)); + assertThat(cs1.getLastPublishedVersion(), equalTo(state2.version())); + } + + public void testHandleClientValueWhenElectionNotWon() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + if (randomBoolean()) { + cs1.setInitialState(state1); + } + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleClientValue(state1)).getMessage(), + containsString("election not won")); + } + + public void testHandleClientValueDuringOngoingPublication() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, initialConfig, 42L); + cs1.handleClientValue(state2); + + ClusterState state3 = clusterState(startJoinRequest1.getTerm(), 3L, node1, initialConfig, initialConfig, 42L); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleClientValue(state3)).getMessage(), + containsString("cannot start publishing next value before accepting previous one")); + } + + public void testHandleClientValueWithBadTerm() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(3, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + + long term = randomBoolean() ? + randomLongBetween(startJoinRequest1.getTerm() + 1, 10) : + randomLongBetween(0, startJoinRequest1.getTerm() - 1); + ClusterState state2 = clusterState(term, 2L, node1, initialConfig, initialConfig, 42L); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleClientValue(state2)).getMessage(), + containsString("does not match current term")); + } + + public void testHandleClientValueWithOldVersion() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 1L, node1, initialConfig, initialConfig, 42L); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleClientValue(state2)).getMessage(), + containsString("lower or equal to last published version")); + } + + public void testHandleClientValueWithReconfigurationWhileAlreadyReconfiguring() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + Join v2 = cs2.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + assertTrue(cs1.handleJoin(v2)); + + VotingConfiguration newConfig1 = new VotingConfiguration(Collections.singleton(node2.getId())); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, newConfig1, 42L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + cs1.handlePublishRequest(publishRequest); + VotingConfiguration newConfig2 = new VotingConfiguration(Collections.singleton(node3.getId())); + ClusterState state3 = clusterState(startJoinRequest1.getTerm(), 3L, node1, initialConfig, newConfig2, 42L); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleClientValue(state3)).getMessage(), + containsString("only allow reconfiguration while not already reconfiguring")); + } + + public void testHandleClientValueWithIllegalCommittedConfigurationChange() { + assumeTrue("test only works with assertions enabled", Assertions.ENABLED); + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + Join v2 = cs2.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + assertTrue(cs1.handleJoin(v2)); + + VotingConfiguration newConfig = new VotingConfiguration(Collections.singleton(node2.getId())); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, newConfig, newConfig, 42L); + assertThat(expectThrows(AssertionError.class, () -> cs1.handleClientValue(state2)).getMessage(), + containsString("last committed configuration should not change")); + } + + public void testHandleClientValueWithConfigurationChangeButNoJoinQuorum() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + + VotingConfiguration newConfig = new VotingConfiguration(Collections.singleton(node2.getId())); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, newConfig, 42L); + assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleClientValue(state2)).getMessage(), + containsString("only allow reconfiguration if joinVotes have quorum for new config")); + } + + public void testHandlePublishRequest() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + if (randomBoolean()) { + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + } + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, initialConfig, 13L); + PublishResponse publishResponse = cs1.handlePublishRequest(new PublishRequest(state2)); + assertThat(publishResponse.getTerm(), equalTo(state2.term())); + assertThat(publishResponse.getVersion(), equalTo(state2.version())); + assertThat(cs1.getLastAcceptedState(), equalTo(state2)); + assertThat(value(cs1.getLastAcceptedState()), equalTo(13L)); + } + + public void testHandlePublishRequestWithBadTerm() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + if (randomBoolean()) { + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + } + long term = randomBoolean() ? + randomLongBetween(startJoinRequest1.getTerm() + 1, 10) : + randomLongBetween(0, startJoinRequest1.getTerm() - 1); + ClusterState state2 = clusterState(term, 2L, node1, initialConfig, initialConfig, 42L); + assertThat(expectThrows(CoordinationStateRejectedException.class, + () -> cs1.handlePublishRequest(new PublishRequest(state2))).getMessage(), + containsString("does not match current term")); + } + + public void testHandlePublishRequestWithOlderVersion() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + if (randomBoolean()) { + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + } + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(2, 10), node1, initialConfig, initialConfig, 42L); + cs1.handlePublishRequest(new PublishRequest(state2)); + ClusterState state3 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(0, state2.version()), node1, initialConfig, + initialConfig, 42L); + assertThat(expectThrows(CoordinationStateRejectedException.class, + () -> cs1.handlePublishRequest(new PublishRequest(state3))).getMessage(), + containsString("lower or equal to current version")); + } + + public void testHandlePublishResponseWithCommit() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, initialConfig, 42L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + PublishResponse publishResponse = cs1.handlePublishRequest(publishRequest); + Optional applyCommit = cs1.handlePublishResponse(node1, publishResponse); + assertTrue(applyCommit.isPresent()); + assertThat(applyCommit.get().getSourceNode(), equalTo(node1)); + assertThat(applyCommit.get().getTerm(), equalTo(state2.term())); + assertThat(applyCommit.get().getVersion(), equalTo(state2.version())); + } + + public void testHandlePublishResponseWithoutCommit() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, initialConfig, 42L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + PublishResponse publishResponse = cs1.handlePublishRequest(publishRequest); + Optional applyCommit = cs1.handlePublishResponse(node2, publishResponse); + assertFalse(applyCommit.isPresent()); + } + + public void testHandlePublishResponseWithBadTerm() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(2, 10), node1, initialConfig, initialConfig, 42L); + PublishResponse publishResponse = cs1.handlePublishRequest(new PublishRequest(state2)); + long term = randomBoolean() ? + randomLongBetween(startJoinRequest1.getTerm() + 1, 10) : + randomLongBetween(0, startJoinRequest1.getTerm() - 1); + assertThat(expectThrows(CoordinationStateRejectedException.class, + () -> cs1.handlePublishResponse(randomFrom(node1, node2, node3), + new PublishResponse(term, publishResponse.getVersion()))).getMessage(), + containsString("does not match current term")); + } + + public void testHandlePublishResponseWithVersionMismatch() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), randomLongBetween(2, 10), node1, initialConfig, initialConfig, 42L); + PublishResponse publishResponse = cs1.handlePublishRequest(new PublishRequest(state2)); + assertThat(expectThrows(CoordinationStateRejectedException.class, + () -> cs1.handlePublishResponse(randomFrom(node1, node2, node3), publishResponse)).getMessage(), + containsString("does not match current version")); + } + + public void testHandleCommit() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + Join v2 = cs2.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v2)); + VotingConfiguration newConfig = new VotingConfiguration(Collections.singleton(node2.getId())); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, newConfig, 7L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + PublishResponse publishResponse = cs1.handlePublishRequest(publishRequest); + cs1.handlePublishResponse(node1, publishResponse); + Optional applyCommit = cs1.handlePublishResponse(node2, publishResponse); + assertTrue(applyCommit.isPresent()); + assertThat(cs1.getLastCommittedConfiguration(), equalTo(initialConfig)); + cs1.handleCommit(applyCommit.get()); + assertThat(cs1.getLastCommittedConfiguration(), equalTo(newConfig)); + } + + public void testHandleCommitWithBadCurrentTerm() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, initialConfig, 7L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + PublishResponse publishResponse = cs1.handlePublishRequest(publishRequest); + cs1.handlePublishResponse(node1, publishResponse); + long term = randomBoolean() ? + randomLongBetween(startJoinRequest1.getTerm() + 1, 10) : + randomLongBetween(0, startJoinRequest1.getTerm() - 1); + assertThat(expectThrows(CoordinationStateRejectedException.class, + () -> cs1.handleCommit(new ApplyCommit(node1, term, 2L))).getMessage(), + containsString("does not match current term")); + } + + public void testHandleCommitWithBadLastAcceptedTerm() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + assertThat(expectThrows(CoordinationStateRejectedException.class, + () -> cs1.handleCommit(new ApplyCommit(node1, startJoinRequest1.getTerm(), 2L))).getMessage(), + containsString("does not match last accepted term")); + } + + public void testHandleCommitWithBadVersion() { + VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId())); + ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L); + cs1.setInitialState(state1); + StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5)); + Join v1 = cs1.handleStartJoin(startJoinRequest1); + assertTrue(cs1.handleJoin(v1)); + assertTrue(cs1.electionWon()); + ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 2L, node1, initialConfig, initialConfig, 7L); + PublishRequest publishRequest = cs1.handleClientValue(state2); + cs1.handlePublishRequest(publishRequest); + assertThat(expectThrows(CoordinationStateRejectedException.class, + () -> cs1.handleCommit(new ApplyCommit(node1, startJoinRequest1.getTerm(), randomLongBetween(3, 10)))).getMessage(), + containsString("does not match current version")); + } + + public void testVoteCollection() { + final CoordinationState.VoteCollection voteCollection = new CoordinationState.VoteCollection(); + assertTrue(voteCollection.isEmpty()); + voteCollection.addVote(node1); + assertFalse(voteCollection.isEmpty()); + assertTrue(voteCollection.containsVoteFor(node1)); + assertFalse(voteCollection.containsVoteFor(node2)); + assertFalse(voteCollection.containsVoteFor(node3)); + voteCollection.addVote(node2); + assertTrue(voteCollection.containsVoteFor(node1)); + assertTrue(voteCollection.containsVoteFor(node2)); + assertFalse(voteCollection.containsVoteFor(node3)); + assertTrue(voteCollection.isQuorum(new VotingConfiguration(Sets.newHashSet(node1.getId(), node2.getId())))); + assertTrue(voteCollection.isQuorum(new VotingConfiguration(Sets.newHashSet(node1.getId())))); + assertFalse(voteCollection.isQuorum(new VotingConfiguration(Sets.newHashSet(node3.getId())))); + + EqualsHashCodeTestUtils.CopyFunction copyFunction = + vc -> { + CoordinationState.VoteCollection voteCollection1 = new CoordinationState.VoteCollection(); + for (DiscoveryNode node : vc.nodes()) { + voteCollection1.addVote(node); + } + return voteCollection1; + }; + EqualsHashCodeTestUtils.checkEqualsAndHashCode(voteCollection, copyFunction, + vc -> { + CoordinationState.VoteCollection copy = copyFunction.copy(vc); + copy.addVote(createNode(randomAlphaOfLength(10))); + return copy; + }); + } + + public static CoordinationState createCoordinationState(PersistedState storage, DiscoveryNode localNode) { + final Settings initialSettings = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), localNode.getId()).build(); + return new CoordinationState(initialSettings, localNode, storage); + } + + public static ClusterState clusterState(long term, long version, DiscoveryNode localNode, VotingConfiguration lastCommittedConfig, + VotingConfiguration lastAcceptedConfig, long value) { + return clusterState(term, version, DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build(), + lastCommittedConfig, lastAcceptedConfig, value); + } + + public static ClusterState clusterState(long term, long version, DiscoveryNodes discoveryNodes, VotingConfiguration lastCommittedConfig, + VotingConfiguration lastAcceptedConfig, long value) { + return setValue(ClusterState.builder(ClusterName.DEFAULT) + .version(version) + .term(term) + .lastCommittedConfiguration(lastCommittedConfig) + .lastAcceptedConfiguration(lastAcceptedConfig) + .nodes(discoveryNodes) + .metaData(MetaData.builder() + .clusterUUID(UUIDs.randomBase64UUID(random()))) // generate cluster UUID deterministically for repeatable tests + .stateUUID(UUIDs.randomBase64UUID(random())) // generate cluster state UUID deterministically for repeatable tests + .build(), value); + } + + public static ClusterState setValue(ClusterState clusterState, long value) { + return ClusterState.builder(clusterState).metaData( + MetaData.builder(clusterState.metaData()) + .persistentSettings(Settings.builder() + .put(clusterState.metaData().persistentSettings()) + .put("value", value) + .build()) + .build()) + .build(); + } + + public static long value(ClusterState clusterState) { + return clusterState.metaData().persistentSettings().getAsLong("value", 0L); + } + + public static class InMemoryPersistedState implements PersistedState { + + private long currentTerm; + private ClusterState acceptedState; + + public InMemoryPersistedState(long term, ClusterState acceptedState) { + this.currentTerm = term; + this.acceptedState = acceptedState; + + assert currentTerm >= 0; + assert getLastAcceptedState().term() <= currentTerm : + "last accepted term " + getLastAcceptedState().term() + " cannot be above current term " + currentTerm; + } + + @Override + public void setCurrentTerm(long currentTerm) { + assert this.currentTerm <= currentTerm; + this.currentTerm = currentTerm; + } + + @Override + public void setLastAcceptedState(ClusterState clusterState) { + this.acceptedState = clusterState; + } + + @Override + public long getCurrentTerm() { + return currentTerm; + } + + @Override + public ClusterState getLastAcceptedState() { + return acceptedState; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java new file mode 100644 index 0000000000000..c4ec5e847a78d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java @@ -0,0 +1,148 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.EqualsHashCodeTestUtils; + +public class MessagesTests extends ESTestCase { + + private DiscoveryNode createNode(String id) { + return new DiscoveryNode(id, buildNewFakeTransportAddress(), Version.CURRENT); + } + + public void testJoinEqualsHashCodeSerialization() { + Join initialJoin = new Join(createNode(randomAlphaOfLength(10)), createNode(randomAlphaOfLength(10)), randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong()); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialJoin, + join -> copyWriteable(join, writableRegistry(), Join::new), + join -> { + switch (randomInt(4)) { + case 0: + // change sourceNode + return new Join(createNode(randomAlphaOfLength(20)), join.getTargetNode(), join.getTerm(), + join.getLastAcceptedTerm(), join.getLastAcceptedVersion()); + case 1: + // change targetNode + return new Join(join.getSourceNode(), createNode(randomAlphaOfLength(20)), join.getTerm(), + join.getLastAcceptedTerm(), join.getLastAcceptedVersion()); + case 2: + // change term + return new Join(join.getSourceNode(), join.getTargetNode(), + randomValueOtherThan(join.getTerm(), ESTestCase::randomNonNegativeLong), join.getLastAcceptedTerm(), + join.getLastAcceptedVersion()); + case 3: + // change last accepted term + return new Join(join.getSourceNode(), join.getTargetNode(), join.getTerm(), + randomValueOtherThan(join.getLastAcceptedTerm(), ESTestCase::randomNonNegativeLong), + join.getLastAcceptedVersion()); + case 4: + // change version + return new Join(join.getSourceNode(), join.getTargetNode(), + join.getTerm(), join.getLastAcceptedTerm(), + randomValueOtherThan(join.getLastAcceptedVersion(), ESTestCase::randomNonNegativeLong)); + default: + throw new AssertionError(); + } + }); + } + + public void testPublishRequestEqualsHashCodeSerialization() { + PublishRequest initialPublishRequest = new PublishRequest(randomClusterState()); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPublishRequest, + publishRequest -> copyWriteable(publishRequest, writableRegistry(), + in -> new PublishRequest(in, publishRequest.getAcceptedState().nodes().getLocalNode())), + in -> new PublishRequest(randomClusterState())); + } + + public void testPublishResponseEqualsHashCodeSerialization() { + PublishResponse initialPublishResponse = new PublishResponse(randomNonNegativeLong(), randomNonNegativeLong()); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPublishResponse, + publishResponse -> copyWriteable(publishResponse, writableRegistry(), PublishResponse::new), + publishResponse -> { + switch (randomInt(1)) { + case 0: + // change term + return new PublishResponse(randomValueOtherThan(publishResponse.getTerm(), ESTestCase::randomNonNegativeLong), + publishResponse.getVersion()); + case 1: + // change version + return new PublishResponse(publishResponse.getTerm(), + randomValueOtherThan(publishResponse.getVersion(), ESTestCase::randomNonNegativeLong)); + default: + throw new AssertionError(); + } + }); + } + + public void testStartJoinRequestEqualsHashCodeSerialization() { + StartJoinRequest initialStartJoinRequest = new StartJoinRequest(createNode(randomAlphaOfLength(10)), randomNonNegativeLong()); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialStartJoinRequest, + startJoinRequest -> copyWriteable(startJoinRequest, writableRegistry(), StartJoinRequest::new), + startJoinRequest -> { + switch (randomInt(1)) { + case 0: + // change sourceNode + return new StartJoinRequest(createNode(randomAlphaOfLength(20)), startJoinRequest.getTerm()); + case 1: + // change term + return new StartJoinRequest(startJoinRequest.getSourceNode(), + randomValueOtherThan(startJoinRequest.getTerm(), ESTestCase::randomNonNegativeLong)); + default: + throw new AssertionError(); + } + }); + } + + public void testApplyCommitEqualsHashCodeSerialization() { + ApplyCommit initialApplyCommit = new ApplyCommit(createNode(randomAlphaOfLength(10)), randomNonNegativeLong(), + randomNonNegativeLong()); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialApplyCommit, + applyCommit -> copyWriteable(applyCommit, writableRegistry(), ApplyCommit::new), + applyCommit -> { + switch (randomInt(2)) { + case 0: + // change sourceNode + return new ApplyCommit(createNode(randomAlphaOfLength(20)), applyCommit.getTerm(), applyCommit.getVersion()); + case 1: + // change term + return new ApplyCommit(applyCommit.getSourceNode(), + randomValueOtherThan(applyCommit.getTerm(), ESTestCase::randomNonNegativeLong), applyCommit.getVersion()); + case 2: + // change version + return new ApplyCommit(applyCommit.getSourceNode(), applyCommit.getTerm(), + randomValueOtherThan(applyCommit.getVersion(), ESTestCase::randomNonNegativeLong)); + default: + throw new AssertionError(); + } + }); + } + + public ClusterState randomClusterState() { + return CoordinationStateTests.clusterState(randomNonNegativeLong(), randomNonNegativeLong(), createNode(randomAlphaOfLength(10)), + new ClusterState.VotingConfiguration(Sets.newHashSet(generateRandomStringArray(10, 10, false))), + new ClusterState.VotingConfiguration(Sets.newHashSet(generateRandomStringArray(10, 10, false))), + randomLong()); + } +} diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java index ac1719269e7ae..0503a4f819d23 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java @@ -487,7 +487,8 @@ public void clusterChanged(ClusterChangedEvent event) { clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder() .addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build(); - ClusterState unserializableClusterState = new ClusterState(clusterState.version(), clusterState.stateUUID(), clusterState) { + ClusterState unserializableClusterState = new ClusterState(clusterState.term(), clusterState.version(), clusterState.stateUUID(), + clusterState) { @Override public Diff diff(ClusterState previousState) { return new Diff() { diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java index 098f4190b0e88..076fe596e52a3 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java @@ -504,8 +504,11 @@ public void testToXContent() throws IOException { + "\"nodes_hash\":1314980060," + "\"status\":\"green\"," + "\"cluster_uuid\":\"_cluster\"," + + "\"term\":0," + "\"version\":12," + "\"state_uuid\":\"_state_uuid\"," + + "\"last_committed_config\":[]," + + "\"last_accepted_config\":[]," + "\"master_node\":\"_node\"," + "\"nodes\":{" + "\"_node_id\":{"