diff --git a/dpdispatcher/contexts/openapi_context.py b/dpdispatcher/contexts/openapi_context.py index 94b25ba8..3dfc0c2d 100644 --- a/dpdispatcher/contexts/openapi_context.py +++ b/dpdispatcher/contexts/openapi_context.py @@ -72,6 +72,7 @@ def __init__( self.init_remote_root = remote_root self.temp_local_root = os.path.abspath(local_root) self.remote_profile = remote_profile + os.makedirs(DP_CLOUD_SERVER_HOME_DIR, exist_ok=True) access_key = ( remote_profile.get("access_key", None) or os.getenv("BOHRIUM_ACCESS_KEY", None) @@ -170,7 +171,7 @@ def upload_job(self, job, common_files=None): object_key = os.path.join(data["storePath"], zip_filename) # type: ignore job.upload_path = object_key job.job_id = data["jobId"] # type: ignore - job.jgid = data["jobGroupId"] # type: ignore + job.jgid = data.get("jobGroupId", "") # type: ignore self.storage.upload_From_file_multi_part( object_key=object_key, file_path=upload_zip, token=token ) diff --git a/dpdispatcher/machines/openapi.py b/dpdispatcher/machines/openapi.py index e5514dce..c6926b6c 100644 --- a/dpdispatcher/machines/openapi.py +++ b/dpdispatcher/machines/openapi.py @@ -129,10 +129,13 @@ def do_submit(self, job): ), "out_files": self._gen_backward_files_list(job), "platform": self.remote_profile.get("platform", "ali"), - "image_address": self.remote_profile.get("image_address", ""), + "image_name": self.remote_profile.get("image_address", ""), } - if job.job_state == JobStatus.unsubmitted: - openapi_params["job_id"] = job.job_id + if "real_user_id" in self.remote_profile: + openapi_params["real_user_id"] = self.remote_profile["real_user_id"] + if "session_id" in self.remote_profile: + openapi_params["session_id"] = self.remote_profile["session_id"] + openapi_params["job_id"] = job.job_id data = self.job.insert(**openapi_params) job.job_id = data.get("jobId", 0) # type: ignore @@ -182,8 +185,8 @@ def check_status(self, job): self.ignore_exit_code, ) if job_state == JobStatus.finished: - job_log = self.job.log(job_id) if self.remote_profile.get("output_log"): + job_log = self.job.log(job_id) print(job_log, end="") self._download_job(job) elif self.remote_profile.get("output_log") and job_state == JobStatus.running: @@ -193,7 +196,7 @@ def check_status(self, job): def _download_job(self, job): data = self.job.detail(job.job_id) - job_url = data["jobFiles"]["outFiles"][0]["url"] # type: ignore + job_url = data["resultUrl"] # type: ignore if not job_url: return job_hash = job.job_hash @@ -243,7 +246,7 @@ def map_dp_job_state(status, exit_code, ignore_exit_code=True): if status not in map_dict: dlog.error(f"unknown job status {status}") return JobStatus.unknown - if status == -1 and exit_code != 0 and ignore_exit_code: + if status == -1 and ignore_exit_code: return JobStatus.finished return map_dict[status]