Skip to content

Realm support for data movement#1637

Open
elliottslaughter wants to merge 17 commits intoflexflow:masterfrom
elliottslaughter:realm-data-movement-explicit
Open

Realm support for data movement#1637
elliottslaughter wants to merge 17 commits intoflexflow:masterfrom
elliottslaughter:realm-data-movement-explicit

Conversation

@elliottslaughter
Copy link
Contributor

@elliottslaughter elliottslaughter commented Mar 18, 2026

This PR adds support for issuing copies in Realm when operators are spread out over multiple devices. In principle this should enable distributed model parallelism. Other parallel operators (e.g., required for data parallelism) are not implemented.

Overview of changes:

  • Adds CopyAttrs to TrainingOperationAttrs in task-spec to permit copies to be represented in the dynamic graph
  • DynamicValueAttrs now track their mapping explicitly
  • Adds an explicit copy insertion pass to the dynamic graph that fills mapping on DynamicValueAttrs and inserts copies where this would break edges in the dependence graph
  • Update shard expansion to expand copies
  • Update Realm infrastructure to issue copies when present in the dynamic graph

This change is Reviewable

@lockshaw lockshaw self-requested a review March 18, 2026 04:15
Copy link
Collaborator

@lockshaw lockshaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lockshaw reviewed 26 files and all commit messages, and made 13 comments.
Reviewable status: all files reviewed, 12 unresolved discussions (waiting on elliottslaughter).


lib/realm-execution/src/realm-execution/pcg_instance.cc line 163 at r1 (raw file):

}

static Realm::Event spawn_dynamic_node_invocation(

Minor: A small docstring would be nice, as the number of arguments makes it a bit hard to quickly skim for the meaning of this function


lib/realm-execution/src/realm-execution/realm_context.cc line 174 at r1 (raw file):

      /*field_id=*/0,
      /*size=*/
      static_cast<size_t>(int{size_of_datatype(src_piece_shape.data_type)}),

Minor: Slightly clearer/more idiomatic in the codebase

Suggestion:

      static_cast<size_t>(size_of_datatype(src_piece_shape.data_type).int_from_positive_int()),

lib/realm-execution/src/realm-execution/realm_context.cc line 218 at r1 (raw file):

    default:
      PANIC("TensorShape dims greater than REALM_MAX_DIM",
            fmt::to_string(src_piece_shape.dims.ff_ordered.num_dims()));

Minor: I don't think you need the explicit to_string call

Suggestion:

      PANIC("TensorShape dims greater than REALM_MAX_DIM: {}", src_piece_shape.dims.ff_ordered.num_dims());

lib/task-spec/include/task-spec/dynamic_graph/copy_insertion.h line 0 at r1 (raw file):
Add a high-level explanation of copy insertion to dynamic_graph/index.dox and (ideally) link to there from here


lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc line 115 at r1 (raw file):

      auto const &[filtered_source, filtered_use] =
          filter_mapping_to_avoid_degenerate_copies(source_value, use_value);
      DynamicNodeInvocation copy{

FYI Normally we'd do DynamicNodeInvocation copy = DynamicNodeInvocation{, so that would technically be slightly more idiomatic in the codebase, but it really doesn't matter much

Code quote:

      DynamicNodeInvocation copy{

lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc line 154 at r1 (raw file):

  ASSERT(no_part_of_graph_is_copy_inserted(g));

  std::unordered_map<DynamicValueAttrs, DynamicValueAttrs> sources;

A more specific variable name here would be really helpful, especially since the type declaration is not particularly illuminating

Code quote:

  std::unordered_map<DynamicValueAttrs, DynamicValueAttrs> sources;

lib/task-spec/src/task-spec/dynamic_graph/dynamic_task_type.cc line 6 at r1 (raw file):

namespace FlexFlow {

DynamicTaskType decide_copy_task_type(DynamicTensorRole role) {

Minor: Slightly clearer name. Unless I'm misunderstanding, this function isn't really doing any "deciding", it's really just flattening some nesting of task types

Suggestion:

DynamicTaskType dynamic_task_type_from_tensor_role(DynamicTensorRole role) {

lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc line 49 at r1 (raw file):

        ParallelTensorSpaceCoordinate const &parallel_tensor_coord) {
  return filter_keys(mapping, [&](ParallelTensorSpaceCoordinate const &p) {
    return p == parallel_tensor_coord;

If you're fixing the key, what's the point of returning a bidict over an unordered_set from this function?


lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc line 90 at r1 (raw file):

static std::unordered_set<DynamicNodeInvocation>
    perform_shard_expansion_for_copy(DynamicNodeInvocation const &i) {
  auto const &[input_slot, input] = get_only(i.inputs);

Minor: Generally I discourage assigning references unless necessary, as it opens up more room for lifetime bugs

Suggestion:

 auto [input_slot, input] = get_only(i.inputs);

lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc line 96 at r1 (raw file):

  bidict<ParallelTensorSpaceCoordinate, MachineSpaceCoordinate> output_mapping =
      assert_unwrap(output.mapping);
  require_same(input_mapping.left_values(), output_mapping.left_values());

Minor: Slightly more idiomatic, as that way you don't have to arbitrarily choose which mapping (input or output) to use in the rest of the function

Suggestion:

  bidict<ParallelTensorSpaceCoordinate, MachineSpaceCoordinate> mapping =
    require_same(assert_unwrap(input.mapping), assert_unwrap(output.mapping));

lib/task-spec/test/src/task-spec/dynamic_graph/copy_insertion.cc line 388 at r1 (raw file):

    SUBCASE("copy one tensor, one point") {
      std::unordered_map<DynamicValueAttrs, DynamicValueAttrs> sources_copy1{
          {graph_input1, graph_input1_src_copy1},

It seems that some of the initialization is used for a single subcase? If so, it would make it more readable to move the creation of those subcase-specific values into the subcase itself.

Also, is there any way to shrink the amount of stuff needed in this test? Wading through all the construction is not fun, though admittedly maybe just coalescing the setup into the subcases will make the storyline of the setup sufficiently clear that we won't need to do this.

Code quote:

         {graph_input1, graph_input1_src_copy1},

lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc line 335 at r1 (raw file):

          DynamicNodeAttrs{
              /*task_type=*/std::nullopt,
              /*device_coord=*/device_coord,

What is the meaning of the device placement of a copy? Is it the source or destination of the copy? It seems like either way that's going to run into issues in the backward pass, where the copy will have to operate the other direction, but I don't see any code for handling that currently?


lib/task-spec/test/src/task-spec/dynamic_graph/shard_expansion.cc line 351 at r1 (raw file):

              },
          },
      };

Minor: Might be clearer to make this a modification of input rather than a full reconstruction? It looks like very little is changed, but it's kinda hard to spot what changes in all the initalization

Code quote:

      return DynamicNodeInvocation{
          /*inputs=*/{
              {
                  mk_slot(TensorSlotName::INPUT),
                  mk_value(0,
                           TensorSlotName::OUTPUT,
                           src_binding,
                           tensor_shard_coord),
              },
          },
          /*node_attrs=*/
          DynamicNodeAttrs{
              /*task_type=*/std::nullopt,
              /*device_coord=*/device_coord,
              /*mapping=*/std::nullopt,
              /*op_attrs=*/TrainingOperationAttrs{CopyAttrs{}},
              /*layer_guid=*/dynamic_layer_guid_t{dynamic_copy_layer_guid_t{}},
              /*per_device_op_state=*/std::nullopt,
          },
          /*outputs=*/
          {
              {
                  mk_slot(TensorSlotName::OUTPUT),
                  mk_value(20,
                           TensorSlotName::OUTPUT,
                           dst_binding,
                           tensor_shard_coord),
              },
          },
      };

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants