diff --git a/AdminServer/appscale/admin/instance_manager/instance_manager.py b/AdminServer/appscale/admin/instance_manager/instance_manager.py index 772018b89e..6e182f7c74 100644 --- a/AdminServer/appscale/admin/instance_manager/instance_manager.py +++ b/AdminServer/appscale/admin/instance_manager/instance_manager.py @@ -4,6 +4,7 @@ import monotonic import json import os +import socket import urllib2 from tornado import gen @@ -442,21 +443,40 @@ def _stop_app_instance(self, instance): yield self._clean_old_sources() - def _get_lowest_port(self): + def _get_lowest_port(self, excluded_ports): """ Determines the lowest usuable port for a new instance. - + Args: + excluded_ports: A set of ports that the caller is planning on using. + This is used to make sure AppServers do not pick the same port since + it's possible for the port to not be in self._running_instances yet. Returns: An integer specifying a free port. """ existing_ports = {instance.port for instance in self._running_instances} port = STARTING_INSTANCE_PORT while True: - if port in existing_ports: + if port in existing_ports or port in excluded_ports or not self._try_port(port): port += 1 continue return port + def _try_port(self, port): + """ Helper method to check if the port is actually free or not. + + Returns: + True if port is free, False otherwise. + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + result = False + try: + sock.bind(("0.0.0.0", port)) + result = True + except: + logger.info("Port {} is in use".format(port)) + sock.close() + return result + @gen.coroutine def _restart_unrouted_instances(self): """ Restarts instances that the router considers offline. """ @@ -535,6 +555,11 @@ def _fulfill_assignments(self): for instance in to_stop: yield self._stop_app_instance(instance) + excluded_ports = set() + for _, assigned_ports in self._assignments.items(): + for port in assigned_ports: + if port != -1: + excluded_ports.add(port) for version_key, assigned_ports in self._assignments.items(): try: @@ -546,7 +571,6 @@ def _fulfill_assignments(self): # The number of required instances that don't have an assigned port. new_assignment_count = sum(port == -1 for port in assigned_ports) - # Stop instances that aren't assigned. If the assignment list includes # any -1s, match them to running instances that aren't in the assigned # ports list. @@ -571,7 +595,9 @@ def _fulfill_assignments(self): and instance.port not in assigned_ports] to_start = max(new_assignment_count - len(candidates), 0) for _ in range(to_start): - yield self._start_instance(version, self._get_lowest_port()) + port = self._get_lowest_port(excluded_ports) + excluded_ports.add(port) + yield self._start_instance(version, port) @gen.coroutine def _enforce_instance_details(self): diff --git a/AdminServer/tests/test_instance_manager.py b/AdminServer/tests/test_instance_manager.py index c489fe1369..29b62b7e10 100644 --- a/AdminServer/tests/test_instance_manager.py +++ b/AdminServer/tests/test_instance_manager.py @@ -28,6 +28,7 @@ service_helper, testing ) +from appscale.admin.instance_manager.instance import Instance from appscale.common.service_helper import ServiceOperator options.define('login_ip', '127.0.0.1') @@ -303,5 +304,78 @@ def test_wait_for_app(self): instance_started = yield instance_manager._wait_for_app(port) self.assertEqual(False, instance_started) + @gen_test + def test_parallel_appservers(self): + testing.disable_logging() + version_details = {'runtime': 'python27', + 'revision': 1, + 'deployment': { + 'zip': {'sourceUrl': 'source.tar.gz'}}, + 'appscaleExtensions': {'httpPort': '8080'} + } + version_manager = flexmock(version_details=version_details, + project_id = 'test', + revision_key = 'test_default_v1_1', + version_key = 'test_default_v1') + deployment_config = flexmock( + get_config = lambda x: {'default_max_appserver_memory': 400}) + + + source_manager = flexmock() + response = Future() + response.set_result(None) + source_manager.should_receive('ensure_source'). \ + with_args('test_default_v1_1', 'source.tar.gz', 'python27'). \ + and_return(response) + + instance_manager = InstanceManager( + None, None, None, None, deployment_config, + source_manager, None, None, None) + + # Start instance mocks + response = Future() + response.set_result((19999, [])) + flexmock(instance_manager).should_receive('_ensure_api_server').\ + and_return(response) + + # write env + flexmock(file_io).should_receive('write').and_return() + + response = Future() + response.set_result(None) + flexmock(ServiceOperator).should_receive('start_async').\ + and_return(response) + + instance_manager._zk_client = flexmock() + instance_manager._zk_client.should_receive('ensure_path') + + # Within add_routing + response = Future() + response.set_result(True) + flexmock(instance_manager).should_receive('_wait_for_app').\ + and_return(response) + instance_manager._routing_client = flexmock() + instance_manager._routing_client.should_receive('register_instance').and_return() + + flexmock(utils).should_receive("setup_logrotate").and_return() + + + # Fulfill assignments mocks + instance_manager._service_operator = flexmock( + start_async=lambda service, wants, properties: response) + + instance_manager._login_server = '192.168.33.10' + instance_manager._running_instances = {Instance('test_default_v1_1', 20000)} + instance_manager._assignments = {'test_default_v1': [20000, -1, -1, -1, -1]} + instance_manager._projects_manager = flexmock() + instance_manager._projects_manager.should_receive('version_from_key')\ + .and_return(version_manager) + + yield instance_manager._fulfill_assignments() + should_be_running = {Instance('test_default_v1_1', x) for x in + range(20000, 20005)} + assert instance_manager._running_instances == should_be_running + + if __name__ == "__main__": unittest.main()