From 3048e343200a2ed69afda74de072a51263e92708 Mon Sep 17 00:00:00 2001 From: Tim <43156029+AtlantaPepsi@users.noreply.github.com> Date: Mon, 10 Aug 2020 23:18:11 -0400 Subject: [PATCH 1/2] Update win_allocate.c --- src/ghost/rma/win_allocate.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ghost/rma/win_allocate.c b/src/ghost/rma/win_allocate.c index 5279e2b..e80b96a 100644 --- a/src/ghost/rma/win_allocate.c +++ b/src/ghost/rma/win_allocate.c @@ -222,7 +222,7 @@ static int alloc_shared_window(MPI_Info user_info, MPI_Aint * size, CSPG_win_t * CSP_CALLMPI(JUMP, PMPI_Info_dup(user_info, &shared_info)); CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "alloc_shm", "true")); CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "same_size", "false")); - CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "alloc_shared_noncontig", "false")); + //CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "alloc_shared_noncontig", "false")); } /* -Allocate shared window in CHAR type From 7578b4181492ad5d3af923616b263fc600bbe831 Mon Sep 17 00:00:00 2001 From: AtlantaPepsi Date: Sun, 24 Jan 2021 06:17:26 -0500 Subject: [PATCH 2/2] add offset exchange --- src/ghost/rma/win_allocate.c | 46 ++++++++++- src/user/rma/win_allocate.c | 153 ++++++++++++++++++++++++----------- 2 files changed, 150 insertions(+), 49 deletions(-) diff --git a/src/ghost/rma/win_allocate.c b/src/ghost/rma/win_allocate.c index 5279e2b..8315835 100644 --- a/src/ghost/rma/win_allocate.c +++ b/src/ghost/rma/win_allocate.c @@ -27,6 +27,15 @@ static inline int send_ghost_cmd_param(void *params, size_t size, CSPG_win_t * w return mpi_errno; } +/* Send base offsets of ghost to individual user to respective users via local communicator (non-blocking call). */ +static inline int send_ghost_offset(ptrdiff_t *offset, int dst, int src, MPI_Request *request, CSPG_win_t * win) +{ + int mpi_errno = MPI_SUCCESS; + CSP_CALLMPI(NOSTMT, PMPI_Isend(offset, 1, MPI_Aint, dst, src, + CSP_PROC.local_comm, request)); + return mpi_errno; +} + static int init_ghost_win(CSP_cwp_fnc_winalloc_pkt_t * winalloc_pkt, CSPG_win_t * win, MPI_Info * user_info) { @@ -196,10 +205,15 @@ static int alloc_shared_window(MPI_Info user_info, MPI_Aint * size, CSPG_win_t * MPI_Aint csp_buf_size = CSP_GP_SHARED_SG_SIZE; int dst, local_ug_rank, local_ug_nprocs; MPI_Info shared_info = MPI_INFO_NULL; + int flag, non_contig = 0; + char value[6]; int r_disp_unit; MPI_Aint r_size; void **user_bases = NULL; - int is_first_nonzero = 1; + int is_first_nonzero = 1, Nrequest; + MPI_Request *requests = NULL; + MPI_Status *stats = NULL; + CSP_CALLMPI(JUMP, PMPI_Comm_rank(win->local_ug_comm, &local_ug_rank)); CSP_CALLMPI(JUMP, PMPI_Comm_size(win->local_ug_comm, &local_ug_nprocs)); @@ -222,9 +236,12 @@ static int alloc_shared_window(MPI_Info user_info, MPI_Aint * size, CSPG_win_t * CSP_CALLMPI(JUMP, PMPI_Info_dup(user_info, &shared_info)); CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "alloc_shm", "true")); CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "same_size", "false")); - CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "alloc_shared_noncontig", "false")); + //CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "alloc_shared_noncontig", "false")); + CSP_CALLMPI(JUMP, PMPI_Info_get(shared_info, "alloc_shared_noncontig", 5, value, &flag)); + if (flag && strcmp(value, "true")) non_contig = 1; } + /* -Allocate shared window in CHAR type * (No local buffer, only need shared buffer on user processes) */ CSP_CALLMPI(JUMP, PMPI_Win_allocate_shared(csp_buf_size, 1, shared_info, @@ -256,18 +273,43 @@ static int alloc_shared_window(MPI_Info user_info, MPI_Aint * size, CSPG_win_t * if (r_size > 0 && is_first_nonzero) { win->base = user_bases[dst]; is_first_nonzero = 0; + /* -If shared window is noncontiguously allocated, intiallize the list + * of MPI_Requests for communications with all local users */ + if (local_ug_rank == 0 && noncontig) { + Nrequest = local_ug_nprocs - dst; + requests = (MPI_Request *) CSP_calloc(Nrequest, sizeof(MPI_Request)); + //stats = (MPI_Status *) CSP_calloc(Nrequest, sizeof(MPI_Status)); + } + } + /* -Deliver offset info to individual users */ + if (!is_first_nonzero && noncontig && dst >= CSP_ENV.num_g && local_ug_rank == 0) { + MPI_Request request; + MPI_Aint offset = ((char *) user_bases[dst] - (char *) win->base); + CSP_CALLMPI(NOSTMT, PMPI_Isend(offset, 1, MPI_Aint, dst, 74, + CSP_PROC.local_comm, &requests)); + //send_ghost_offset(&offset, dst, local_ug_rank, &request, win); + requests[dst - local_ug_nprocs + Nrequest] = request; } + (*size) += r_size; /* size in byte */ } CSPG_DBG_PRINT(" Created shared window, base=%p, size=%ld\n", win->base, (*size)); + if (non_contig) { + CSP_CALLMPI(JUMP, PMPI_Waitall(Nrequest, requests, MPI_STATUSES_IGNORE)); + CSPG_DBG_PRINT(" Offsets delivered to local users.\n"); + } fn_exit: if (shared_info && shared_info != MPI_INFO_NULL) CSP_CALLMPI_EXIT(PMPI_Info_free(&shared_info)); if (user_bases) free(user_bases); + if (requests) + free(requests); + if (stats) + free(stats); return mpi_errno; fn_fail: diff --git a/src/user/rma/win_allocate.c b/src/user/rma/win_allocate.c index fe36222..b201042 100644 --- a/src/user/rma/win_allocate.c +++ b/src/user/rma/win_allocate.c @@ -274,6 +274,40 @@ static int issue_ghost_cmd(int user_nprocs, MPI_Info info, CSPU_win_t * ug_win) goto fn_exit; } +/* Gather base offsets from all the local ghosts (non-blocking call). */ +static int gather_local_offsets(MPI_Aint *base_g_offsets, CSPU_win_t * ug_win) +{ + int mpi_errno = MPI_SUCCESS; + int i, user_nprocs, user_rank; + MPI_Status *stats = NULL; + MPI_Request *reqs = NULL; + + CSP_CALLMPI(JUMP, PMPI_Comm_rank(ug_win->user_comm, &user_rank)); + + reqs = (MPI_Request *) CSP_calloc(CSP_ENV.num_g, sizeof(MPI_Request)); + stats = (MPI_Status *) CSP_calloc(CSP_ENV.num_g, sizeof(MPI_Status)); + + /* ghosts are always start from rank 0 on local communicator. */ + for (i = 0; i < CSP_ENV.num_g; i++) { + CSP_CALLMPI(JUMP, PMPI_Irecv(base_g_offsets + (user_rank * CSP_ENV.num_g), + 1, MPI_Aint, i, MPI_ANY_TAG, + CSP_PROC.local_comm, &reqs[i])); + } + + CSP_CALLMPI(JUMP, PMPI_Waitall(CSP_ENV.num_g, reqs, stats)); + + fn_exit: + if (stats) + free(stats); + if (reqs) + free(reqs); + return mpi_errno; + + fn_fail: + goto fn_exit; +} + + static int gather_ranks(CSPU_win_t * win, int *num_ghosts, int *gp_ranks_in_world, int *unique_gp_ranks_in_world) { @@ -519,7 +553,7 @@ static int create_communicators(CSPU_win_t * ug_win) goto fn_exit; } -static int gather_base_offsets(CSPU_win_t * ug_win) +static int gather_base_offsets(CSPU_win_t * ug_win, int noncontig) { int mpi_errno = MPI_SUCCESS; MPI_Aint tmp_u_offsets; @@ -533,7 +567,6 @@ static int gather_base_offsets(CSPU_win_t * ug_win) CSP_CALLMPI(JUMP, PMPI_Comm_rank(ug_win->user_comm, &user_rank)); CSP_CALLMPI(JUMP, PMPI_Comm_size(ug_win->user_comm, &user_nprocs)); - base_g_offsets = CSP_calloc(user_nprocs * CSP_ENV.num_g, sizeof(MPI_Aint)); #ifdef CSP_ENABLE_GRANT_LOCK_HIDDEN_BYTE /* All the ghosts use the byte located on ghost 0. */ @@ -547,37 +580,70 @@ static int gather_base_offsets(CSPU_win_t * ug_win) root_g_size = CSP_MAX(root_g_size, sizeof(CSP_GRANT_LOCK_DATATYPE)); #endif - /* Calculate my offset on the local shared buffer. - * Note that all the ghosts start the window from baseptr of ghost 0, - * hence all the local ghosts use the same offset of user buffers. - * My offset is the total window size of all ghosts and all users in front of - * me on the node (loop world ranks to get its window size without rank translate).*/ - tmp_u_offsets = root_g_size + CSP_GP_SHARED_SG_SIZE * (CSP_ENV.num_g - 1); - - i = 0; - while (i < user_rank) { - if (ug_win->targets[i].node_id == ug_win->node_id) { - tmp_u_offsets += ug_win->targets[i].size; /* size in bytes */ + + if (noncontig) { + base_g_offsets = CSP_calloc(user_nprocs, sizeof(MPI_Aint)); + + //gather_local_offsets(base_g_offsets, ug_win); + CSP_CALLMPI(JUMP, PMPI_Recv(base_g_offsets + user_rank, + 1, MPI_Aint, MPI_ANY_SOURCE, 74, + CSP_PROC.local_comm, MPI_STATUS_IGNORE)); + + CSP_DBG_PRINT("[%d] local base_g_offset 0x%lx\n", user_rank, base_g_offsets[user_rank]); + + /* -Receive the address of all the shared user buffers on Ghost processes. */ + CSP_CALLMPI(JUMP, PMPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, base_g_offsets, + 1, MPI_AINT, ug_win->user_comm)); + + for (i = 0; i < user_nprocs; i++) { + for (j = 0; j < CSP_ENV.num_g; j++) { + ug_win->targets[i].base_g_offsets[j] = base_g_offsets[i]; + CSP_DBG_PRINT("\t.base_g_offsets[%d] = 0x%lx\n", + j, ug_win->targets[i].base_g_offsets[j]); + } } - i++; - } - for (j = 0; j < CSP_ENV.num_g; j++) { - base_g_offsets[user_rank * CSP_ENV.num_g + j] = tmp_u_offsets; - } - CSP_DBG_PRINT("[%d] local base_g_offset 0x%lx\n", user_rank, tmp_u_offsets); - /* -Receive the address of all the shared user buffers on Ghost processes. */ - CSP_CALLMPI(JUMP, PMPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, base_g_offsets, - CSP_ENV.num_g, MPI_AINT, ug_win->user_comm)); + } + else { + /* Calculate my offset on the local shared buffer. + * Note that all the ghosts start the window from baseptr of ghost 0, + * hence all the local ghosts use the same offset of user buffers. + * My offset is the total window size of all ghosts and all users in front of + * me on the node (loop world ranks to get its window size without rank translate).*/ + base_g_offsets = CSP_calloc(user_nprocs * CSP_ENV.num_g, sizeof(MPI_Aint)); + + tmp_u_offsets = root_g_size + CSP_GP_SHARED_SG_SIZE * (CSP_ENV.num_g - 1); + + i = 0; + while (i < user_rank) { + if (ug_win->targets[i].node_id == ug_win->node_id) { + tmp_u_offsets += ug_win->targets[i].size; /* size in bytes */ + } + i++; + } - for (i = 0; i < user_nprocs; i++) { for (j = 0; j < CSP_ENV.num_g; j++) { - ug_win->targets[i].base_g_offsets[j] = base_g_offsets[i * CSP_ENV.num_g + j]; - CSP_DBG_PRINT("\t.base_g_offsets[%d] = 0x%lx\n", - j, ug_win->targets[i].base_g_offsets[j]); + base_g_offsets[user_rank * CSP_ENV.num_g + j] = tmp_u_offsets; + } + + + + CSP_DBG_PRINT("[%d] local base_g_offset 0x%lx\n", user_rank, tmp_u_offsets); + + /* -Receive the address of all the shared user buffers on Ghost processes. */ + CSP_CALLMPI(JUMP, PMPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, base_g_offsets, + CSP_ENV.num_g, MPI_AINT, ug_win->user_comm)); + + for (i = 0; i < user_nprocs; i++) { + for (j = 0; j < CSP_ENV.num_g; j++) { + ug_win->targets[i].base_g_offsets[j] = base_g_offsets[i * CSP_ENV.num_g + j]; + CSP_DBG_PRINT("\t.base_g_offsets[%d] = 0x%lx\n", + j, ug_win->targets[i].base_g_offsets[j]); + } } + } fn_exit: @@ -597,6 +663,8 @@ static int alloc_shared_window(MPI_Aint size, int disp_unit, MPI_Info info, CSPU int mpi_errno = MPI_SUCCESS; MPI_Info shared_info = MPI_INFO_NULL; int user_rank; + int flag, non_contig = 0; + char value[6]; CSP_CALLMPI(JUMP, PMPI_Comm_rank(ug_win->user_comm, &user_rank)); @@ -610,7 +678,9 @@ static int alloc_shared_window(MPI_Aint size, int disp_unit, MPI_Info info, CSPU CSP_CALLMPI(JUMP, PMPI_Info_dup(info, &shared_info)); CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "alloc_shm", "true")); CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "same_size", "false")); - CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "alloc_shared_noncontig", "false")); + //CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "alloc_shared_noncontig", "false")); + CSP_CALLMPI(JUMP, PMPI_Info_get(shared_info, "alloc_shared_noncontig", 5, value, &flag)); + if (flag && strcmp(value, "true")) non_contig = 1; } CSP_CALLMPI(JUMP, PMPI_Win_allocate_shared(size, disp_unit, shared_info, ug_win->local_ug_comm, @@ -623,7 +693,7 @@ static int alloc_shared_window(MPI_Aint size, int disp_unit, MPI_Info info, CSPU CSPU_WIN_ERRHAN_SET_INTERN(ug_win->local_ug_win); /* Gather user offsets on corresponding ghost processes */ - mpi_errno = gather_base_offsets(ug_win); + mpi_errno = gather_base_offsets(ug_win, noncontig); CSP_CHKMPIFAIL_JUMP(mpi_errno); fn_exit: @@ -705,17 +775,8 @@ int MPI_Win_allocate(MPI_Aint size, int disp_unit, MPI_Info info, int tmp_bcast_buf[2]; /* Skip internal processing when disabled */ - if (CSP_IS_DISABLED) - return PMPI_Win_allocate(size, disp_unit, info, user_comm, baseptr, win); - if (CSP_IS_MODE_DISABLED(RMA)) { - if (user_comm == MPI_COMM_WORLD) - user_comm = CSP_COMM_USER_WORLD; - return PMPI_Win_allocate(size, disp_unit, info, user_comm, baseptr, win); - } - CSPU_ERRHAN_EXTOBJ_LOCAL_DCL(); - CSPU_COMM_ERRHAN_SET_EXTOBJ(); ug_win = CSP_calloc(1, sizeof(CSPU_win_t)); @@ -727,21 +788,16 @@ int MPI_Win_allocate(MPI_Aint size, int disp_unit, MPI_Info info, mpi_errno = read_win_info(info, ug_win); CSP_CHKMPIFAIL_JUMP(mpi_errno); - /* If user turns off asynchronous redirection, simply return normal window; */ - if (ug_win->info_args.async_config == CSP_ASYNC_CONFIG_OFF) { - CSPU_ERRHAN_RESET_EXTOBJ(); /* reset before calling original MPI */ - CSP_CALLMPI(NOSTMT, PMPI_Win_allocate(size, disp_unit, info, user_comm, baseptr, win)); - CSP_DBG_PRINT("User turns off async in win_allocate, return normal win 0x%x\n", *win); - - goto fn_noasync; - } /* Start allocating casper window */ /* Check any invalid input which can only be checked by MPI calls after ghost joined. * TODO: how to interrupt ghost win_allocate if MPI error reported on user side ?*/ mpi_errno = check_valid_input(size, disp_unit); - CSP_CHKMPIFAIL_JUMP(mpi_errno); + CSP_CHKMPIFAIL_JUMP(mpi_errno); ??????????????? + + + /* Initialize basic communicators and information. */ if (user_comm == CSP_COMM_USER_WORLD) { @@ -792,6 +848,9 @@ int MPI_Win_allocate(MPI_Aint size, int disp_unit, MPI_Info info, ug_win->targets[i].g_ranks_in_ug = CSP_calloc(CSP_ENV.num_g, sizeof(MPI_Aint)); } + + + /* Gather users' disp_unit, size, ranks and node_id */ tmp_gather_buf = CSP_calloc(user_nprocs * 7, sizeof(MPI_Aint)); tmp_gather_buf[7 * user_rank] = (MPI_Aint) disp_unit; @@ -962,4 +1021,4 @@ int MPI_Win_allocate(MPI_Aint size, int disp_unit, MPI_Info info, CSPU_ERRHAN_RESET_EXTOBJ(); /* reset before error handling */ CSPU_COMM_ERRHANLDING(user_comm, &mpi_errno); goto fn_exit; -} +} \ No newline at end of file