Skip to content

Commit aecb91d

Browse files
committed
Adapt worker and nanny to ipv6
1 parent 1ee4756 commit aecb91d

File tree

4 files changed

+24
-6
lines changed

4 files changed

+24
-6
lines changed

distributed/nanny.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from distributed import preloading
2828
from distributed._async_taskgroup import AsyncTaskGroupClosedError
2929
from distributed.comm import get_address_host
30-
from distributed.comm.addressing import address_from_user_args
30+
from distributed.comm.addressing import address_from_user_args, unparse_host_port
3131
from distributed.compatibility import asyncio_run
3232
from distributed.config import get_loop_factory
3333
from distributed.core import (
@@ -48,7 +48,7 @@
4848
from distributed.protocol.serialize import _is_dumpable
4949
from distributed.security import Security
5050
from distributed.utils import (
51-
get_ip,
51+
get_ip_adaptive,
5252
get_mp_context,
5353
json_load_robust,
5454
log_errors,
@@ -276,7 +276,9 @@ def __init__( # type: ignore[no-untyped-def]
276276
and not interface
277277
and not self.scheduler_addr.startswith("inproc://")
278278
):
279-
host = get_ip(get_address_host(self.scheduler.address))
279+
host = unparse_host_port(
280+
get_ip_adaptive(get_address_host(self.scheduler.address))
281+
)
280282

281283
self._start_port = port
282284
self._start_host = host

distributed/node.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,13 @@ def start_http_server(
176176
bound_addresses = get_tcp_server_addresses(self.http_server)
177177

178178
# If more than one address is configured we just use the first here
179-
self.http_server.address, self.http_server.port = bound_addresses[0]
179+
# Socket addresses representation: https://docs.python.org/3/library/socket.html#socket-families
180+
if len(bound_addresses[0]) == 2: # (host, port) for ipv4
181+
self.http_server.address, self.http_server.port = bound_addresses[0]
182+
elif len(bound_addresses[0]) == 4: # (host, port, flowinfo, scope_id) for ipv6
183+
self.http_server.address, self.http_server.port = bound_addresses[0][:2]
184+
else:
185+
raise RuntimeError("Unexpected bound address format")
180186
self.services["dashboard"] = self.http_server
181187

182188
# Warn on port changes

distributed/utils.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,16 @@ def get_ipv6(host="2001:4860:4860::8888", port=80):
226226
return _get_ip(host, port, family=socket.AF_INET6)
227227

228228

229+
def get_ip_adaptive(host="8.8.8.8", port=80):
230+
"""
231+
Get ipv4 or ipv6 adaptively depending on the host to reach.
232+
"""
233+
if ":" in host:
234+
return get_ipv6(host, port)
235+
else:
236+
return get_ip(host, port)
237+
238+
229239
def get_ip_interface(ifname):
230240
"""
231241
Get the local IPv4 address of a network interface.

distributed/worker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@
9696
from distributed.threadpoolexecutor import secede as tpe_secede
9797
from distributed.utils import (
9898
TimeoutError,
99-
get_ip,
99+
get_ip_adaptive,
100100
has_arg,
101101
in_async_call,
102102
iscoroutinefunction,
@@ -1405,7 +1405,7 @@ async def start_unsafe(self):
14051405
kwargs = self.security.get_listen_args("worker")
14061406
if self._protocol in ("tcp", "tls"):
14071407
kwargs = kwargs.copy()
1408-
kwargs["default_host"] = get_ip(
1408+
kwargs["default_host"] = get_ip_adaptive(
14091409
get_address_host(self.scheduler.address)
14101410
)
14111411
try:

0 commit comments

Comments
 (0)