Skip to content

Dynamic Replication #52

@gz

Description

@gz

Is your feature request related to a problem? Please describe.

Static replication can consume too much memory.

Currently CNR/NR set-up the replication factor at data-structure instantiation and doesn't allow it to change which is sub-optimal: as the workload changes a program might want to invest less memory in replication (at the cost of read-performance) but instead use the memory for something else.

Describe the solution you'd like

Varying replicas

[random thoughts in no-particular order of how this should be implemented]

Remove replica

  • Get the combiner lock on the replica (no more readers)
  • Make sure any future operations from threads registered with the replica we remove are going to another replica from now on
    • This might need an indirection table in the NodeReplicated<> struct
    • We also need to make sure that the Replica<> struct uses the right Context<> since the thread id that context uses are “replica-local”
      • Maybe the right way to do this is to have all Context allocated in the NodeReplicated<> struct in a flat array, then hand an iterator<Context> to the replicas which yields the right contexts based on current thread assignment?
      • Alternative: think of it as as thread re-registration (addition, removal of thread to replica)
  • “Deregister” the replica from the log
  • Deallocate the data-structure
  • Maybe: Drop the Replica struct itself (seems harder as there can be reader threads accessing its members simultaneously); maybe it’s actually easier to keep the replica struct if we ever want to add it again?
    • This would mean we need to know the max. amount of replicas upfront; easy to just set to #nodes (doesn’t make sense to have more in most cases)…
    • For example a thread might notice that replica is behind while we’re in the process of dropping it and call sync on it (sync will try to acquire the combiner lock if it happens after replica is dropped, should probably just return)
  • There might be outstanding operations in the context buffer of the threads (I think this is fine we just keep them there and apply them on the new replica we’re redirected to)
  • We’ll have cases where a thread is no longer local to the NUMA node but it might become the combiner and do allocation on behalf of the data-structure. Ideally these allocations should be NUMA local
    • We can use the existing AffinityChange interface to notify when threads have to change affinity
  • As an optimization: if only one replica is left, we can avoid using the log until we have a replica again
  • Even more optimization: if only one replica is left with one thread registered, we can just route execute and execute_mut to Dispatch directly, no need to FC or use the log…
    • Nice because then we’re back to single-threaded DS with no atomics in the path

Add a new replica:

  • We need to get a quiescent copy of the DS of some existing replica in the system
    • Should be fine if we hold the combiner lock
    • We need to make sure we remember where in the log this replica is at when we do the copy
    • We need to make sure we copy to memory of the right NUMA affinity
      • We already require Clone on the DS
      • Copying the DS might be very expensive (which is fine as a first
        • One option: While copying keep a single DS and share it between two replicas
          • Maybe: Log operations again between the two shared replicas
      • We could use copy-on-write for the data-structure (e.g., Cow<> in rust)
      • Might be copy on read instead too (lazy replication)?
        • Harder, if not impossible because we don’t have a mutable reference on reads
  • We need to register the replica with the log
    • And set our ctail to the same index of the replica we’ve copied from
    • We should also ensure no wrap-around in the log meanwhile / e.g., once we set the ctail the log needs to wait for the replica we’re in process of adding (and respect its ctail when deciding when it’s safe to overwrite log entries)
    • Dynamic membership (removing and adding replicas) with the log is a bit tricky since the log is not designed for this
      • It has a single atomic next field that tracks registration
      • The “next” field is used to check for to find the minimum completed tail (by iterating from 0..next)
        • Such functions now need an additional check if the replica is active or not
        • Maybe keep an atomic bitmask in addition to next?
          • Need to figure out when it’s safe to change this mask as almost all log functions are fully concurrent and code needs to vetted for new assumption that a replica might disappear/appear at any time
  • We need to route certain threads to the new replica
    • One question is do we allow registration of threads dynamically too or do keep as is where we expect all threads to register up-front
      • I think we can expect no new threads, then it’s just a matter of deciding which ones to re-route to the new replica
      • Again need to make sure the replica has the right thread contexts

API pseudo-code: adding and removing replicas

NodeReplicated {
  max_replicas: 4 // #NUMA nodes
  active: Vec&lt;usize> // currently active replicas
  routing: HashMap&lt;tid, rid> // maps threads to replicas
}

fn add_replica(rid: usize) -> Result {
  assert(rid &lt; max_replicas);
  assert(!active.contains(rid), “not already active”);

 
  // possible thread routing policy:
  // migrates tids that were originally registered to `rid` back to execute against `rid`
  ensures(active.contains(rid));
}

fn remove_replica(rid: usize, new_tid_affinity: usize) {
  assert(rid &lt; max_replicas);
  assert(new_tid_affinity &lt; max_replicas);
  assert(active.contains(rid));
  assert(active.contains(new_tid_affinity));

  // route all threads registered with rid to `new_tid_affinity`
  // should it be possible to do more complex assignment?
  // then maybe we can provide a function that implements a routing
  // policy and assigns them or provide a HashMap with tid->new_rid mappings 
  // and a default?

  ensures(!active.contains(rid));
  ensures(active.contains(new_tid_affinity));
  ensures(!routing.values.contains(rid));
  ensures(routing.len == old(routing).len);
}

How do we show it works

NR micro-benchmarks: measure mem consumption and read-tput, vary replicas over time

Integration with network and IO

(this is mostly future work to connect IO + network once we have these mechanism in place)

  • Socket for each replica:
    • Spawn up a new receive path each time to add a new replica
    • Route request to right replica
    • Similar framework to NR for network
    • Seastar: shared nothing, dpdk queue per-core
      • Similar but optimize for request ingress
    • Shuffle from IO to NR
      • Becomes a bottleneck if NIC is not smart about routing
        • FLOEM/FlexNIC from antoine

Describe alternatives you've considered

Alternative approaches

  • We could stop the world, delete the whole NR instance, extract the DS from the replicas and reconstruct a new NR instance

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions