Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,18 @@ if(PDC_USE_CRAY_DRC)
endif()

#-----------------------------------------------------------------------------
# DART Suffix Tree mode
# IDIOMS Index
#-----------------------------------------------------------------------------
option(PDC_DART_SUFFIX_TREE_MODE "Enable DART Suffix Tree mode." ON)
if(PDC_DART_SUFFIX_TREE_MODE)
set(PDC_DART_SFX_TREE 1)
# add_compile_definitions(PDC_DART_SFX_TREE=${PDC_DART_SFX_TREE})
option(PDC_ENABLE_IDIOMS "Enable IDIOMS metadata." OFF)
if(PDC_ENABLE_IDIOMS)
set(PDC_ENABLE_IDIOMS 1)
option(PDC_DART_SUFFIX_TREE_MODE "Enable DART Suffix Tree mode." ON)
if(PDC_DART_SUFFIX_TREE_MODE)
set(PDC_DART_SFX_TREE 1)
# add_compile_definitions(PDC_DART_SFX_TREE=${PDC_DART_SFX_TREE})
endif()


endif()

#-----------------------------------------------------------------------------
Expand Down
6 changes: 4 additions & 2 deletions src/api/pdc_client_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -1255,17 +1255,19 @@ PDC_Client_mercury_init(hg_class_t **hg_class, hg_context_t **hg_context, int po
#endif

if ((hg_transport = getenv("HG_TRANSPORT")) == NULL) {
LOG_INFO("Environment variable HG_TRANSPORT was NOT set\n");
hg_transport = default_hg_transport;
if (pdc_client_mpi_rank_g == 0)
LOG_INFO("Environment variable HG_TRANSPORT was NOT set, default to %s\n", default_hg_transport);
}
else
LOG_INFO("Environment variable HG_TRANSPORT was set\n");
if ((hostname = getenv("HG_HOST")) == NULL) {
LOG_INFO("Environment variable HG_HOST was NOT set\n");
hostname = PDC_malloc(HOSTNAME_LEN);
memset(hostname, 0, HOSTNAME_LEN);
gethostname(hostname, HOSTNAME_LEN - 1);
free_hostname = true;
if (pdc_client_mpi_rank_g == 0)
LOG_INFO("Environment variable HG_HOST was NOT set, default to %s\n", hostname);
}
else
LOG_INFO("Environment variable HG_HOST was set\n");
Expand Down
80 changes: 43 additions & 37 deletions src/api/pdc_region/pdc_region_transfer.c
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,9 @@ register_metadata(pdc_transfer_request_start_all_pkg **transfer_request_input, i
{
FUNC_ENTER(NULL);

/* setLogLevel(LOG_LEVEL_DEBUG); */
/* LOG_DEBUG("Enter %s\n", __func__); */

perr_t ret_value = SUCCEED;
int i, j, index, size, output_size, remain_size, n_objs;
pdc_transfer_request_start_all_pkg **transfer_requests;
Expand All @@ -855,51 +858,49 @@ register_metadata(pdc_transfer_request_start_all_pkg **transfer_request_input, i
remain_size = input_size - size;
output_size = 0;

index = 0;
qsort(transfer_requests, size, sizeof(pdc_transfer_request_start_all_pkg *),
sort_by_metadata_server_start_all);
for (i = 1; i < size; ++i) {
if (transfer_requests[i]->transfer_request->metadata_server_id !=
transfer_requests[i - 1]->transfer_request->metadata_server_id) {
n_objs = i - index;
pack_region_metadata_query(transfer_requests + index, n_objs, &buf, &total_buf_size);
PDC_Client_transfer_request_metadata_query(
&bulk_handle, buf, total_buf_size, n_objs,
transfer_requests[index]->transfer_request->metadata_server_id, is_write, &output_buf_size,
&query_id);
PDCregion_transfer_add_bulk_handle(transfer_requests[index]->transfer_request, bulk_handle);
buf = (char *)PDC_free(buf);
if (query_id) {
output_buf = (char *)PDC_malloc(output_buf_size);
PDC_Client_transfer_request_metadata_query2(
&bulk_handle, output_buf, output_buf_size, query_id,
transfer_requests[index]->transfer_request->metadata_server_id);
PDCregion_transfer_add_bulk_handle(transfer_requests[index]->transfer_request, bulk_handle);
unpack_region_metadata_query(output_buf, transfer_requests + index, &transfer_request_head,
&transfer_request_end, &output_size);
output_buf = (char *)PDC_free(output_buf);
if (transfer_request_front_head) {
previous->next = transfer_request_head;
}
else {
transfer_request_front_head = transfer_request_head;
}
previous = transfer_request_end;
}
index = i;

// Each iteration finds the first transfer that has a target meta server different from the previous one
// index is the first transfer index
int current_unique_idx = 0;
int *unique_server_xfer_idx = NULL;
int *unique_server_nboj = NULL;
if (size > 0) {
unique_server_xfer_idx = (int *)PDC_calloc(size, sizeof(int));
unique_server_nboj = (int *)PDC_calloc(size, sizeof(int));
}

// Iterate through the input array
for (i = 0; i < size; ++i) {
if (i == 0 || transfer_requests[i]->transfer_request->metadata_server_id !=
transfer_requests[i - 1]->transfer_request->metadata_server_id) {
// Check if the current element is different from the previous one
// or if it's the first element
unique_server_xfer_idx[current_unique_idx] = i;
unique_server_nboj[current_unique_idx] = 1;

current_unique_idx++;
}
else {
unique_server_nboj[current_unique_idx - 1]++;
}
}
int num_unique_server_ids = current_unique_idx;

// Now we will try to distribute the metadata requests to different servers across clients
for (i = 0; i < num_unique_server_ids; i++) {
int current_index = (pdc_client_mpi_rank_g + i) % num_unique_server_ids;
index = unique_server_xfer_idx[current_index];
n_objs = unique_server_nboj[current_index];

if (size) {
n_objs = size - index;
pack_region_metadata_query(transfer_requests + index, n_objs, &buf, &total_buf_size);
PDC_Client_transfer_request_metadata_query(
&bulk_handle, buf, total_buf_size, n_objs,
transfer_requests[index]->transfer_request->metadata_server_id, is_write, &output_buf_size,
&query_id);
PDCregion_transfer_add_bulk_handle(transfer_requests[index]->transfer_request, bulk_handle);
buf = (char *)PDC_free(buf);
// If it is a valid query ID, then it means regions are overlapping.
if (query_id) {
output_buf = (char *)PDC_malloc(output_buf_size);
PDC_Client_transfer_request_metadata_query2(
Expand All @@ -909,16 +910,21 @@ register_metadata(pdc_transfer_request_start_all_pkg **transfer_request_input, i
unpack_region_metadata_query(output_buf, transfer_requests + index, &transfer_request_head,
&transfer_request_end, &output_size);
output_buf = (char *)PDC_free(output_buf);
if (transfer_request_front_head) {

if (transfer_request_front_head)
previous->next = transfer_request_head;
}
else {
else
transfer_request_front_head = transfer_request_head;
}

previous = transfer_request_end;
}
}

if (unique_server_xfer_idx)
free(unique_server_xfer_idx);
if (unique_server_nboj)
free(unique_server_nboj);

if (output_size) {
transfer_request_output = (pdc_transfer_request_start_all_pkg **)PDC_malloc(
sizeof(pdc_transfer_request_start_all_pkg *) * (output_size + remain_size));
Expand Down
22 changes: 16 additions & 6 deletions src/server/pdc_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -774,17 +774,19 @@ PDC_Server_init(int port, hg_class_t **hg_class, hg_context_t **hg_context)
total_mem_usage_g += (sizeof(char) + sizeof(char *));

if ((hg_transport = getenv("HG_TRANSPORT")) == NULL) {
LOG_INFO("Environment variable HG_TRANSPORT was NOT set\n");
hg_transport = default_hg_transport;
if (pdc_server_rank_g == 0)
LOG_INFO("Environment variable HG_TRANSPORT was NOT set, default to %s\n", hg_transport);
}
else
LOG_INFO("Environment variable HG_TRANSPORT was set\n");
if ((hostname = getenv("HG_HOST")) == NULL) {
LOG_INFO("Environment variable HG_HOST was NOT set\n");
hostname = PDC_malloc(HOSTNAME_LEN);
memset(hostname, 0, HOSTNAME_LEN);
gethostname(hostname, HOSTNAME_LEN - 1);
free_hostname = true;
if (pdc_server_rank_g == 0)
LOG_INFO("Environment variable HG_HOST was NOT set, default to %s\n", hostname);
}
else
LOG_INFO("Environment variable HG_HOST was set\n");
Expand Down Expand Up @@ -922,8 +924,10 @@ PDC_Server_init(int port, hg_class_t **hg_class, hg_context_t **hg_context)
LOG_INFO("Read cache enabled\n");
#endif

#ifdef PDC_ENABLE_IDIOMS
// Initialize IDIOMS
PDC_Server_metadata_index_init(pdc_server_size_g, pdc_server_rank_g);
#endif

// TODO: support restart with different number of servers than previous run
char checkpoint_file[ADDR_MAX + sizeof(int) + 1];
Expand All @@ -934,7 +938,9 @@ PDC_Server_init(int port, hg_class_t **hg_class, hg_context_t **hg_context)
ret_value = PDC_Server_restart(checkpoint_file);
if (ret_value != SUCCEED)
PGOTO_ERROR(FAIL, "Error with PDC_Server_restart");
#ifdef PDC_ENABLE_IDIOMS
metadata_index_recover(pdc_server_tmp_dir_g, pdc_server_size_g, pdc_server_rank_g);
#endif
}
else {
// We are starting a brand new server
Expand Down Expand Up @@ -1364,7 +1370,9 @@ PDC_Server_checkpoint()
LOG_INFO("Checkpointed %10d objects, with %10d regions \n", all_metadata_size, all_region_count);
}

#ifdef PDC_ENABLE_IDIOMS
metadata_index_dump(pdc_server_tmp_dir_g, pdc_server_rank_g);
#endif

done:
FUNC_LEAVE(ret_value);
Expand Down Expand Up @@ -1417,10 +1425,10 @@ PDC_Server_restart(char *filename)
if (file == NULL)
PGOTO_ERROR(FAIL, "Error with fopen, filename: [%s]", filename);

char *slurm_jobid = getenv("SLURM_JOB_ID");
if (slurm_jobid == NULL) {
LOG_ERROR("Error getting slurm job id from SLURM_JOB_ID\n");
}
/* char *slurm_jobid = getenv("SLURM_JOB_ID"); */
/* if (slurm_jobid == NULL) { */
/* LOG_ERROR("Error getting slurm job id from SLURM_JOB_ID\n"); */
/* } */

if (fread(&n_cont, sizeof(int), 1, file) != 1) {
LOG_ERROR("Read failed for n_count\n");
Expand Down Expand Up @@ -1971,8 +1979,10 @@ PDC_Server_mercury_register()
PDC_region_analysis_release_register(hg_class_g);

// DART Index
#ifdef PDC_ENABLE_IDIOMS
PDC_dart_get_server_info_register(hg_class_g);
PDC_dart_perform_one_server_register(hg_class_g);
#endif

// Server to client RPC
server_lookup_client_register_id_g = PDC_server_lookup_client_register(hg_class_g);
Expand Down
1 change: 1 addition & 0 deletions src/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ set(PROGRAMS
misc/vpicio
misc/vpicio_mts
misc/bdcats
misc/bdcats_mts
deprecated/vpicio_old
deprecated/bdcats_old
tags/kvtag_add_get
Expand Down
Loading
Loading