Skip to content

Commit 1ed1565

Browse files
authored
Merge pull request #64 from ttedeschi/AF20-experimental
Af20 experimental
2 parents 8fe2823 + 698f9c4 commit 1ed1565

File tree

9 files changed

+88
-35
lines changed

9 files changed

+88
-35
lines changed

dask_remote_jobqueue/dask_remote_jobqueue.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,14 @@ def __init__(
5151
self,
5252
ssh_namespace="default",
5353
user: str = "NONE",
54+
ssh_url: str = "",
5455
ssh_url_port: int = 8122,
5556
asynchronous: bool = True, # Set by dask-labextension but not used in this class
5657
sitename: str = "",
5758
singularity_wn_image = "/cvmfs/images.dodas.infn.it/registry.hub.docker.com/dodasts/root-in-docker:ubuntu22-kernel-v1",
5859
debug: bool = True,
60+
user_cores: int = 1,
61+
user_memory: str = "2 GiB",
5962
):
6063
self.__debug = debug
6164

@@ -112,6 +115,7 @@ def __init__(
112115
logger.debug(f"generated -> controller_port: {self.controller_port}")
113116

114117
# Custom ssh port for the tunnel
118+
self.ssh_url: str = ssh_url
115119
self.ssh_url_port: int = ssh_url_port
116120

117121
self.cluster_id: str = ""
@@ -148,6 +152,10 @@ def __init__(
148152
self.client_id = os.environ.get("IAM_CLIENT_ID")
149153
self.client_secret = os.environ.get("IAM_CLIENT_SECRET")
150154

155+
# Dask worker spec
156+
self.user_cores: int = user_cores
157+
self.user_memory: str = user_memory
158+
151159
##
152160
# Dask labextension variables
153161
#
@@ -422,6 +430,7 @@ async def _make_connections(self):
422430
self.connection_process_q,
423431
cluster_id=self.cluster_id,
424432
ssh_namespace=self.ssh_namespace,
433+
ssh_url=self.ssh_url,
425434
ssh_url_port=self.ssh_url_port,
426435
username=self.username,
427436
token=self.token,

dask_remote_jobqueue/templates/job_rm.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
# This software is released under the MIT License.
55
# https://opensource.org/licenses/MIT
66

7-
source /cvmfs/cms.dodas.infn.it/miniconda3/etc/profile.d/conda.sh
8-
conda activate af
7+
#source /cvmfs/cms.dodas.infn.it/miniconda3/etc/profile.d/conda.sh
8+
#conda activate af
99

1010
export _condor_AUTH_SSL_CLIENT_CAFILE={{ htc_ca }}
1111
export _condor_TOOL_DEBUG={{ htc_debug }}

dask_remote_jobqueue/templates/job_submit.sh

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
# This software is released under the MIT License.
55
# https://opensource.org/licenses/MIT
66

7-
source /cvmfs/cms.dodas.infn.it/miniconda3/etc/profile.d/conda.sh
8-
conda activate af
7+
#source /cvmfs/cms.dodas.infn.it/miniconda3/etc/profile.d/conda.sh
8+
#conda activate af
99

1010
export _condor_AUTH_SSL_CLIENT_CAFILE={{ htc_ca }}
1111
export _condor_TOOL_DEBUG={{ htc_debug }}
@@ -15,4 +15,5 @@ export _condor_SCHEDD_NAME={{ htc_schedd_name }}
1515
export _condor_SCITOKENS_FILE={{ htc_scitoken_file }}
1616
export _condor_SEC_DEFAULT_AUTHENTICATION_METHODS={{ htc_sec_method}}
1717

18+
cat $@
1819
condor_submit -spool $@

dask_remote_jobqueue/templates/scheduler.sh

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
chmod +x job_submit.sh
88
chmod +x job_rm.sh
9-
chmod +x entrypoint.sh
9+
#chmod +x entrypoint.sh
1010

1111
while true; do
1212
curl -d grant_type=urn:ietf:params:oauth:grant-type:token-exchange \
@@ -20,11 +20,12 @@ while true; do
2020
sleep 72000
2121
done &
2222

23-
source /cvmfs/cms.dodas.infn.it/miniconda3/bin/activate
24-
conda activate af-test
23+
#source /cvmfs/cms.dodas.infn.it/miniconda3/bin/activate
24+
#conda activate af-test
2525

26-
if command -V tini &>/dev/null; then
27-
tini -s python3 -- start_scheduler.py
28-
else
29-
python3 start_scheduler.py
30-
fi
26+
#if command -V tini &>/dev/null; then
27+
# tini -s python3 -- start_scheduler.py
28+
#else
29+
# python3 start_scheduler.py
30+
#fi
31+
python3 start_scheduler.py

dask_remote_jobqueue/templates/scheduler.sub

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ executable = scheduler.sh
55
log = dask_scheduler.log
66
output = dask_scheduler.out
77
error = dask_scheduler.error
8-
+SingularityImage = "/cvmfs/images.dodas.infn.it/registry.hub.docker.com/dodasts/dask-scheduler:v1"
8+
#+SingularityImage = "/cvmfs/unpacked.cern.ch/registry.hub.docker.com/dodasts/dask-scheduler:v1"
9+
+SingularityImage = {{ singularity_wn_image }}
910
{{ selected_sitename }}
1011
transfer_input_files = start_scheduler.py, scheduler.sh, /ca.crt, /tmp/token, job_submit.sh, job_rm.sh, .bashrc, config.yaml
11-
environment = HOME=./; OIDC_CONFIG_DIR=./.oidc-agent; JHUB_TOKEN={{ token }};JHUB_USER={{ name }};SCHED_PORT={{ sched_port }};DASH_PORT={{ dash_port }};CONTROLLER_PORT={{ controller_port }};REFRESH_TOKEN={{ refresh_token }};IAM_SERVER={{ iam_server }};IAM_CLIENT_ID={{ client_id }};IAM_CLIENT_SECRET={{ client_secret }};_condor_AUTH_SSL_CLIENT_CAFILE={{ htc_ca }};_condor_TOOL_DEBUG={{ htc_debug }};_condor_COLLECTOR_HOST={{ htc_collector }}; _condor_SCHEDD_HOST={{ htc_schedd_host }};_condor_SCHEDD_NAME={{ htc_schedd_name }};_condor_SCITOKENS_FILE={{ htc_scitoken_file }};_condor_SEC_DEFAULT_AUTHENTICATION_METHODS={{ htc_sec_method}};SINGULARITY_WN_IMAGE={{ singularity_wn_image }}
12+
environment = HOME=./; OIDC_CONFIG_DIR=./.oidc-agent; JHUB_TOKEN={{ token }};JHUB_USER={{ name }};SCHED_PORT={{ sched_port }};DASH_PORT={{ dash_port }};CONTROLLER_PORT={{ controller_port }};REFRESH_TOKEN={{ refresh_token }};IAM_SERVER={{ iam_server }};IAM_CLIENT_ID={{ client_id }};IAM_CLIENT_SECRET={{ client_secret }};_condor_AUTH_SSL_CLIENT_CAFILE={{ htc_ca }};_condor_TOOL_DEBUG={{ htc_debug }};_condor_COLLECTOR_HOST={{ htc_collector }}; _condor_SCHEDD_HOST={{ htc_schedd_host }};_condor_SCHEDD_NAME={{ htc_schedd_name }};_condor_SCITOKENS_FILE={{ htc_scitoken_file }};_condor_SEC_DEFAULT_AUTHENTICATION_METHODS={{ htc_sec_method}};SINGULARITY_WN_IMAGE={{ singularity_wn_image }};USER_CORES={{ user_cores }};USER_MEMORY={{ user_memory }}
1213
queue

dask_remote_jobqueue/templates/start_scheduler.py

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,11 @@ def __init__(self, *args, **kwargs):
4545
**kwargs,
4646
death_timeout=60 * 5, # 5min
4747
python="python3",
48+
#submit_command_extra = "-spool"
4849
)
50+
self.submit_command = "./job_submit.sh"
51+
self.cancel_command = "./job_rm.sh"
52+
self.executable = "/bin/bash"
4953

5054

5155
##
@@ -81,7 +85,12 @@ def __init__(self, *args, **kwargs):
8185
sched_port = int(os.environ.get("SCHED_PORT", "42000"))
8286
dash_port = int(os.environ.get("DASH_PORT", "42001"))
8387
controller_port = int(os.environ.get("CONTROLLER_PORT", "42002"))
84-
singularity_wn_image = os.environ.get("SINGULARITY_WN_IMAGE", "/cvmfs/images.dodas.infn.it/registry.hub.docker.com/dodasts/root-in-docker:ubuntu22-kernel-v1")
88+
singularity_wn_image = os.environ.get("SINGULARITY_WN_IMAGE", "/cvmfs/unpacked.cern.ch/registry.hub.docker.com/dodasts/root-in-docker:ubuntu22-kernel-v1")
89+
90+
user_cores = int(os.environ.get("USER_CORES", 1))
91+
user_memory = os.environ.get("USER_MEMORY", "2 GiB")
92+
if user_memory == "":
93+
user_memory = "2 GiB"
8594

8695
logger.debug(f"name: {name}")
8796
logger.debug(f"token: {token}")
@@ -173,13 +182,16 @@ def _worker_spec(self) -> dict:
173182
def run(self):
174183
self.cluster = HTCondorCluster(
175184
job_cls=MyHTCondorJob,
176-
cores=1,
177-
memory="2 GiB", # ref: https://github.com/dask/dask/blob/e4799c0498b5e5877705bb5542d8d01116ee1320/dask/utils.py#L1404
185+
cores = user_cores,
186+
memory = user_memory,
187+
#cores=1,
188+
#memory="2 GiB", # ref: https://github.com/dask/dask/blob/e4799c0498b5e5877705bb5542d8d01116ee1320/dask/utils.py#L1404
178189
disk="1 GB",
179190
scheduler_options=scheduler_options_vars,
180191
job_extra=job_extra_vars,
181192
# silence_logs="debug",
182193
local_directory="./scratch",
194+
job_script_prologue=['eval "$(conda shell.bash hook)"']
183195
)
184196

185197
while self.cluster.status != Status.running:
@@ -296,7 +308,7 @@ def run(self):
296308
async def tunnel_scheduler():
297309
logger.debug("start tunnel scheduler")
298310
connection = await asyncssh.connect(
299-
"jhub.131.154.96.124.myip.cloud.infn.it",
311+
"jhub.131.154.98.185.myip.cloud.infn.it",
300312
port=31022,
301313
username=name,
302314
password=token,
@@ -311,7 +323,7 @@ async def tunnel_scheduler():
311323
async def tunnel_dashboard():
312324
logger.debug("start tunnel dashboard")
313325
connection = await asyncssh.connect(
314-
"jhub.131.154.96.124.myip.cloud.infn.it",
326+
"jhub.131.154.98.185.myip.cloud.infn.it",
315327
port=31022,
316328
username=name,
317329
password=token,
@@ -326,7 +338,7 @@ async def tunnel_dashboard():
326338
async def tunnel_controller():
327339
logger.debug("start tunnel controller")
328340
connection = await asyncssh.connect(
329-
"jhub.131.154.96.124.myip.cloud.infn.it",
341+
"jhub.131.154.98.185.myip.cloud.infn.it",
330342
port=31022,
331343
username=name,
332344
password=token,
@@ -389,7 +401,7 @@ def get(self):
389401
def prepare(self):
390402
logger.debug(self.request.arguments)
391403

392-
404+
393405
class LogsHandler(tornado.web.RequestHandler):
394406
def initialize(self, sched_q: Queue, controller_q: Queue):
395407
self.sched_q: Queue = sched_q
@@ -424,22 +436,18 @@ async def get(self):
424436
font-size: 15px;
425437
border-bottom:
426438
}
427-
428439
.active, .collapsible:hover {
429440
background-color: #ec8f72;
430441
}
431-
432442
.content {
433443
padding: 0 18px;
434444
display: none;
435445
overflow: hidden;
436446
background-color: #fafafa;
437447
}
438-
439448
table, th, td {
440449
border: 1px solid black;
441450
}
442-
443451
table {
444452
width: 100%;
445453
}
@@ -545,7 +553,6 @@ async def get(self):
545553
"""<script>
546554
var coll = document.getElementsByClassName("collapsible");
547555
var i;
548-
549556
for (i = 0; i < coll.length; i++) {
550557
coll[i].addEventListener("click", function() {
551558
this.classList.toggle("active");
@@ -557,20 +564,16 @@ async def get(self):
557564
}
558565
});
559566
}
560-
561567
window.onscroll = function() {myFunction()};
562-
563568
var header = document.getElementById("myHeader");
564569
var sticky = header.offsetTop;
565-
566570
function myFunction() {
567571
if (window.pageYOffset > sticky) {
568572
header.classList.add("sticky");
569573
} else {
570574
header.classList.remove("sticky");
571575
}
572576
}
573-
574577
var origin_location = window.location.href;
575578
function reload() {
576579
window.location.href = origin_location;
@@ -666,7 +669,6 @@ def initialize(self, sched_q: Queue, controller_q: Queue):
666669

667670
def get(self):
668671
"""Return a descriptive dictionary of worker specs.
669-
670672
Example worker_spec:
671673
{
672674
"HTCondorCluster-0": {

dask_remote_jobqueue/utils.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ def __init__(
3737
asyncio.set_event_loop(self.cur_loop)
3838
self.cluster_id: str = cluster_id
3939
self.ssh_namespace: str = ssh_namespace
40-
self.ssh_url: str = f"ssh-listener.{self.ssh_namespace}.svc.cluster.local"
40+
if ssh_url:
41+
self.ssh_url: str = ssh_url
42+
else:
43+
self.ssh_url: str = f"ssh-listener.{self.ssh_namespace}.svc.cluster.local"
4144
self.ssh_url_port: int = ssh_url_port
4245
self.username: str = username
4346
self.token: str = token
@@ -103,7 +106,7 @@ def run(self):
103106
connected = self._connection_ok(1)
104107
logger.debug(f"[ConnectionManager][attempt: {attempt}][{connected}]")
105108

106-
if attempt >= 42:
109+
if attempt >= 10000:
107110
self.connection_manager_q.put(
108111
f"ERROR - ATTEMPT TO CONNECT EXCEEDED # {attempt}"
109112
)
@@ -412,6 +415,9 @@ def __init__(
412415
self._htc_scitoken_file: str = ""
413416
self._htc_sec_method: str = ""
414417

418+
self._user_cores: int = 1
419+
self._user_memory: str = "2 GiB"
420+
415421
def _copy_attributes(self):
416422
try:
417423
self._sitename = getattr(self._remoteHTCondor, "sitename")
@@ -468,6 +474,16 @@ def _copy_attributes(self):
468474
logger.debug(
469475
f"[StartDaskScheduler][copy of htc_sec_method: {self._htc_sec_method}]"
470476
)
477+
self._user_cores = getattr(self._remoteHTCondor, "user_cores")
478+
logger.debug(
479+
f"[StartDaskScheduler][copy of user_cores: {self._user_cores}]"
480+
)
481+
self._user_memory = getattr(self._remoteHTCondor, "user_memory")
482+
logger.debug(
483+
f"[StartDaskScheduler][copy of user_memory: {self._user_memory}]"
484+
)
485+
486+
471487
except AttributeError as exc:
472488
logger.debug(f"[StartDaskScheduler][copy error: {exc}]")
473489
raise
@@ -516,7 +532,9 @@ def run(self):
516532
htc_scitoken_file=self._htc_scitoken_file,
517533
htc_sec_method=self._htc_sec_method,
518534
selected_sitename=selected_sitename,
519-
singularity_wn_image=self.singularity_wn_image
535+
singularity_wn_image=self.singularity_wn_image,
536+
user_cores=self._user_cores,
537+
user_memory=self._user_memory
520538
)
521539

522540
logger.debug(f"[StartDaskScheduler][run][{dest.name}]")

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
asyncssh
2-
dask==2021.11.1
2+
dask
33
dask_jobqueue
44
httpx
55
jinja2

tests/main.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from dask_remote_jobqueue import RemoteHTCondor
2+
3+
def main():
4+
cluster = RemoteHTCondor(
5+
user = "ttedesch",
6+
ssh_url = "cms-it-hub.cloud.cnaf.infn.it",
7+
ssh_url_port = 31023,
8+
sitename = "T2_LNL_PD",
9+
singularity_wn_image = "/cvmfs/unpacked.cern.ch/registry.hub.docker.com/dodasts/root-in-docker:ubuntu22-kernel-v1",
10+
asynchronous = False,
11+
debug = False
12+
)
13+
14+
cluster.start()
15+
16+
print(cluster.scheduler_info)
17+
18+
cluster.close()
19+
20+
if __name__=='__main__':
21+
main()

0 commit comments

Comments
 (0)