Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions src/ghost/rma/win_allocate.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ static inline int send_ghost_cmd_param(void *params, size_t size, CSPG_win_t * w
return mpi_errno;
}

/* Send base offsets of ghost to individual user to respective users via local communicator (non-blocking call). */
static inline int send_ghost_offset(ptrdiff_t *offset, int dst, int src, MPI_Request *request, CSPG_win_t * win)
{
int mpi_errno = MPI_SUCCESS;
CSP_CALLMPI(NOSTMT, PMPI_Isend(offset, 1, MPI_Aint, dst, src,
CSP_PROC.local_comm, request));
return mpi_errno;
}

static int init_ghost_win(CSP_cwp_fnc_winalloc_pkt_t * winalloc_pkt, CSPG_win_t * win,
MPI_Info * user_info)
{
Expand Down Expand Up @@ -196,10 +205,15 @@ static int alloc_shared_window(MPI_Info user_info, MPI_Aint * size, CSPG_win_t *
MPI_Aint csp_buf_size = CSP_GP_SHARED_SG_SIZE;
int dst, local_ug_rank, local_ug_nprocs;
MPI_Info shared_info = MPI_INFO_NULL;
int flag, non_contig = 0;
char value[6];
int r_disp_unit;
MPI_Aint r_size;
void **user_bases = NULL;
int is_first_nonzero = 1;
int is_first_nonzero = 1, Nrequest;
MPI_Request *requests = NULL;
MPI_Status *stats = NULL;


CSP_CALLMPI(JUMP, PMPI_Comm_rank(win->local_ug_comm, &local_ug_rank));
CSP_CALLMPI(JUMP, PMPI_Comm_size(win->local_ug_comm, &local_ug_nprocs));
Expand All @@ -222,9 +236,12 @@ static int alloc_shared_window(MPI_Info user_info, MPI_Aint * size, CSPG_win_t *
CSP_CALLMPI(JUMP, PMPI_Info_dup(user_info, &shared_info));
CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "alloc_shm", "true"));
CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "same_size", "false"));
CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "alloc_shared_noncontig", "false"));
//CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "alloc_shared_noncontig", "false"));
CSP_CALLMPI(JUMP, PMPI_Info_get(shared_info, "alloc_shared_noncontig", 5, value, &flag));
if (flag && strcmp(value, "true")) non_contig = 1;
}


/* -Allocate shared window in CHAR type
* (No local buffer, only need shared buffer on user processes) */
CSP_CALLMPI(JUMP, PMPI_Win_allocate_shared(csp_buf_size, 1, shared_info,
Expand Down Expand Up @@ -256,18 +273,43 @@ static int alloc_shared_window(MPI_Info user_info, MPI_Aint * size, CSPG_win_t *
if (r_size > 0 && is_first_nonzero) {
win->base = user_bases[dst];
is_first_nonzero = 0;
/* -If shared window is noncontiguously allocated, intiallize the list
* of MPI_Requests for communications with all local users */
if (local_ug_rank == 0 && noncontig) {
Nrequest = local_ug_nprocs - dst;
requests = (MPI_Request *) CSP_calloc(Nrequest, sizeof(MPI_Request));
//stats = (MPI_Status *) CSP_calloc(Nrequest, sizeof(MPI_Status));
}
}
/* -Deliver offset info to individual users */
if (!is_first_nonzero && noncontig && dst >= CSP_ENV.num_g && local_ug_rank == 0) {
MPI_Request request;
MPI_Aint offset = ((char *) user_bases[dst] - (char *) win->base);
CSP_CALLMPI(NOSTMT, PMPI_Isend(offset, 1, MPI_Aint, dst, 74,
CSP_PROC.local_comm, &requests));
//send_ghost_offset(&offset, dst, local_ug_rank, &request, win);
requests[dst - local_ug_nprocs + Nrequest] = request;
}


(*size) += r_size; /* size in byte */
}

CSPG_DBG_PRINT(" Created shared window, base=%p, size=%ld\n", win->base, (*size));

if (non_contig) {
CSP_CALLMPI(JUMP, PMPI_Waitall(Nrequest, requests, MPI_STATUSES_IGNORE));
CSPG_DBG_PRINT(" Offsets delivered to local users.\n");
}
fn_exit:
if (shared_info && shared_info != MPI_INFO_NULL)
CSP_CALLMPI_EXIT(PMPI_Info_free(&shared_info));
if (user_bases)
free(user_bases);
if (requests)
free(requests);
if (stats)
free(stats);
return mpi_errno;

fn_fail:
Expand Down
153 changes: 106 additions & 47 deletions src/user/rma/win_allocate.c
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,40 @@ static int issue_ghost_cmd(int user_nprocs, MPI_Info info, CSPU_win_t * ug_win)
goto fn_exit;
}

/* Gather base offsets from all the local ghosts (non-blocking call). */
static int gather_local_offsets(MPI_Aint *base_g_offsets, CSPU_win_t * ug_win)
{
int mpi_errno = MPI_SUCCESS;
int i, user_nprocs, user_rank;
MPI_Status *stats = NULL;
MPI_Request *reqs = NULL;

CSP_CALLMPI(JUMP, PMPI_Comm_rank(ug_win->user_comm, &user_rank));

reqs = (MPI_Request *) CSP_calloc(CSP_ENV.num_g, sizeof(MPI_Request));
stats = (MPI_Status *) CSP_calloc(CSP_ENV.num_g, sizeof(MPI_Status));

/* ghosts are always start from rank 0 on local communicator. */
for (i = 0; i < CSP_ENV.num_g; i++) {
CSP_CALLMPI(JUMP, PMPI_Irecv(base_g_offsets + (user_rank * CSP_ENV.num_g),
1, MPI_Aint, i, MPI_ANY_TAG,
CSP_PROC.local_comm, &reqs[i]));
}

CSP_CALLMPI(JUMP, PMPI_Waitall(CSP_ENV.num_g, reqs, stats));

fn_exit:
if (stats)
free(stats);
if (reqs)
free(reqs);
return mpi_errno;

fn_fail:
goto fn_exit;
}


static int gather_ranks(CSPU_win_t * win, int *num_ghosts, int *gp_ranks_in_world,
int *unique_gp_ranks_in_world)
{
Expand Down Expand Up @@ -519,7 +553,7 @@ static int create_communicators(CSPU_win_t * ug_win)
goto fn_exit;
}

static int gather_base_offsets(CSPU_win_t * ug_win)
static int gather_base_offsets(CSPU_win_t * ug_win, int noncontig)
{
int mpi_errno = MPI_SUCCESS;
MPI_Aint tmp_u_offsets;
Expand All @@ -533,7 +567,6 @@ static int gather_base_offsets(CSPU_win_t * ug_win)
CSP_CALLMPI(JUMP, PMPI_Comm_rank(ug_win->user_comm, &user_rank));
CSP_CALLMPI(JUMP, PMPI_Comm_size(ug_win->user_comm, &user_nprocs));

base_g_offsets = CSP_calloc(user_nprocs * CSP_ENV.num_g, sizeof(MPI_Aint));

#ifdef CSP_ENABLE_GRANT_LOCK_HIDDEN_BYTE
/* All the ghosts use the byte located on ghost 0. */
Expand All @@ -547,37 +580,70 @@ static int gather_base_offsets(CSPU_win_t * ug_win)
root_g_size = CSP_MAX(root_g_size, sizeof(CSP_GRANT_LOCK_DATATYPE));
#endif

/* Calculate my offset on the local shared buffer.
* Note that all the ghosts start the window from baseptr of ghost 0,
* hence all the local ghosts use the same offset of user buffers.
* My offset is the total window size of all ghosts and all users in front of
* me on the node (loop world ranks to get its window size without rank translate).*/
tmp_u_offsets = root_g_size + CSP_GP_SHARED_SG_SIZE * (CSP_ENV.num_g - 1);

i = 0;
while (i < user_rank) {
if (ug_win->targets[i].node_id == ug_win->node_id) {
tmp_u_offsets += ug_win->targets[i].size; /* size in bytes */

if (noncontig) {
base_g_offsets = CSP_calloc(user_nprocs, sizeof(MPI_Aint));

//gather_local_offsets(base_g_offsets, ug_win);
CSP_CALLMPI(JUMP, PMPI_Recv(base_g_offsets + user_rank,
1, MPI_Aint, MPI_ANY_SOURCE, 74,
CSP_PROC.local_comm, MPI_STATUS_IGNORE));

CSP_DBG_PRINT("[%d] local base_g_offset 0x%lx\n", user_rank, base_g_offsets[user_rank]);

/* -Receive the address of all the shared user buffers on Ghost processes. */
CSP_CALLMPI(JUMP, PMPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, base_g_offsets,
1, MPI_AINT, ug_win->user_comm));

for (i = 0; i < user_nprocs; i++) {
for (j = 0; j < CSP_ENV.num_g; j++) {
ug_win->targets[i].base_g_offsets[j] = base_g_offsets[i];
CSP_DBG_PRINT("\t.base_g_offsets[%d] = 0x%lx\n",
j, ug_win->targets[i].base_g_offsets[j]);
}
}
i++;
}

for (j = 0; j < CSP_ENV.num_g; j++) {
base_g_offsets[user_rank * CSP_ENV.num_g + j] = tmp_u_offsets;
}

CSP_DBG_PRINT("[%d] local base_g_offset 0x%lx\n", user_rank, tmp_u_offsets);

/* -Receive the address of all the shared user buffers on Ghost processes. */
CSP_CALLMPI(JUMP, PMPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, base_g_offsets,
CSP_ENV.num_g, MPI_AINT, ug_win->user_comm));
}
else {
/* Calculate my offset on the local shared buffer.
* Note that all the ghosts start the window from baseptr of ghost 0,
* hence all the local ghosts use the same offset of user buffers.
* My offset is the total window size of all ghosts and all users in front of
* me on the node (loop world ranks to get its window size without rank translate).*/
base_g_offsets = CSP_calloc(user_nprocs * CSP_ENV.num_g, sizeof(MPI_Aint));

tmp_u_offsets = root_g_size + CSP_GP_SHARED_SG_SIZE * (CSP_ENV.num_g - 1);

i = 0;
while (i < user_rank) {
if (ug_win->targets[i].node_id == ug_win->node_id) {
tmp_u_offsets += ug_win->targets[i].size; /* size in bytes */
}
i++;
}

for (i = 0; i < user_nprocs; i++) {
for (j = 0; j < CSP_ENV.num_g; j++) {
ug_win->targets[i].base_g_offsets[j] = base_g_offsets[i * CSP_ENV.num_g + j];
CSP_DBG_PRINT("\t.base_g_offsets[%d] = 0x%lx\n",
j, ug_win->targets[i].base_g_offsets[j]);
base_g_offsets[user_rank * CSP_ENV.num_g + j] = tmp_u_offsets;
}



CSP_DBG_PRINT("[%d] local base_g_offset 0x%lx\n", user_rank, tmp_u_offsets);

/* -Receive the address of all the shared user buffers on Ghost processes. */
CSP_CALLMPI(JUMP, PMPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, base_g_offsets,
CSP_ENV.num_g, MPI_AINT, ug_win->user_comm));

for (i = 0; i < user_nprocs; i++) {
for (j = 0; j < CSP_ENV.num_g; j++) {
ug_win->targets[i].base_g_offsets[j] = base_g_offsets[i * CSP_ENV.num_g + j];
CSP_DBG_PRINT("\t.base_g_offsets[%d] = 0x%lx\n",
j, ug_win->targets[i].base_g_offsets[j]);
}
}

}

fn_exit:
Expand All @@ -597,6 +663,8 @@ static int alloc_shared_window(MPI_Aint size, int disp_unit, MPI_Info info, CSPU
int mpi_errno = MPI_SUCCESS;
MPI_Info shared_info = MPI_INFO_NULL;
int user_rank;
int flag, non_contig = 0;
char value[6];

CSP_CALLMPI(JUMP, PMPI_Comm_rank(ug_win->user_comm, &user_rank));

Expand All @@ -610,7 +678,9 @@ static int alloc_shared_window(MPI_Aint size, int disp_unit, MPI_Info info, CSPU
CSP_CALLMPI(JUMP, PMPI_Info_dup(info, &shared_info));
CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "alloc_shm", "true"));
CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "same_size", "false"));
CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "alloc_shared_noncontig", "false"));
//CSP_CALLMPI(JUMP, PMPI_Info_set(shared_info, "alloc_shared_noncontig", "false"));
CSP_CALLMPI(JUMP, PMPI_Info_get(shared_info, "alloc_shared_noncontig", 5, value, &flag));
if (flag && strcmp(value, "true")) non_contig = 1;
}

CSP_CALLMPI(JUMP, PMPI_Win_allocate_shared(size, disp_unit, shared_info, ug_win->local_ug_comm,
Expand All @@ -623,7 +693,7 @@ static int alloc_shared_window(MPI_Aint size, int disp_unit, MPI_Info info, CSPU
CSPU_WIN_ERRHAN_SET_INTERN(ug_win->local_ug_win);

/* Gather user offsets on corresponding ghost processes */
mpi_errno = gather_base_offsets(ug_win);
mpi_errno = gather_base_offsets(ug_win, noncontig);
CSP_CHKMPIFAIL_JUMP(mpi_errno);

fn_exit:
Expand Down Expand Up @@ -705,17 +775,8 @@ int MPI_Win_allocate(MPI_Aint size, int disp_unit, MPI_Info info,
int tmp_bcast_buf[2];

/* Skip internal processing when disabled */
if (CSP_IS_DISABLED)
return PMPI_Win_allocate(size, disp_unit, info, user_comm, baseptr, win);

if (CSP_IS_MODE_DISABLED(RMA)) {
if (user_comm == MPI_COMM_WORLD)
user_comm = CSP_COMM_USER_WORLD;
return PMPI_Win_allocate(size, disp_unit, info, user_comm, baseptr, win);
}

CSPU_ERRHAN_EXTOBJ_LOCAL_DCL();
CSPU_COMM_ERRHAN_SET_EXTOBJ();

ug_win = CSP_calloc(1, sizeof(CSPU_win_t));

Expand All @@ -727,21 +788,16 @@ int MPI_Win_allocate(MPI_Aint size, int disp_unit, MPI_Info info,
mpi_errno = read_win_info(info, ug_win);
CSP_CHKMPIFAIL_JUMP(mpi_errno);

/* If user turns off asynchronous redirection, simply return normal window; */
if (ug_win->info_args.async_config == CSP_ASYNC_CONFIG_OFF) {
CSPU_ERRHAN_RESET_EXTOBJ(); /* reset before calling original MPI */
CSP_CALLMPI(NOSTMT, PMPI_Win_allocate(size, disp_unit, info, user_comm, baseptr, win));
CSP_DBG_PRINT("User turns off async in win_allocate, return normal win 0x%x\n", *win);

goto fn_noasync;
}

/* Start allocating casper window */

/* Check any invalid input which can only be checked by MPI calls after ghost joined.
* TODO: how to interrupt ghost win_allocate if MPI error reported on user side ?*/
mpi_errno = check_valid_input(size, disp_unit);
CSP_CHKMPIFAIL_JUMP(mpi_errno);
CSP_CHKMPIFAIL_JUMP(mpi_errno); ???????????????




/* Initialize basic communicators and information. */
if (user_comm == CSP_COMM_USER_WORLD) {
Expand Down Expand Up @@ -792,6 +848,9 @@ int MPI_Win_allocate(MPI_Aint size, int disp_unit, MPI_Info info,
ug_win->targets[i].g_ranks_in_ug = CSP_calloc(CSP_ENV.num_g, sizeof(MPI_Aint));
}




/* Gather users' disp_unit, size, ranks and node_id */
tmp_gather_buf = CSP_calloc(user_nprocs * 7, sizeof(MPI_Aint));
tmp_gather_buf[7 * user_rank] = (MPI_Aint) disp_unit;
Expand Down Expand Up @@ -962,4 +1021,4 @@ int MPI_Win_allocate(MPI_Aint size, int disp_unit, MPI_Info info,
CSPU_ERRHAN_RESET_EXTOBJ(); /* reset before error handling */
CSPU_COMM_ERRHANLDING(user_comm, &mpi_errno);
goto fn_exit;
}
}