Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions AdminServer/appscale/admin/instance_manager/instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import monotonic
import json
import os
import socket
import urllib2

from tornado import gen
Expand Down Expand Up @@ -442,21 +443,40 @@ def _stop_app_instance(self, instance):

yield self._clean_old_sources()

def _get_lowest_port(self):
def _get_lowest_port(self, excluded_ports):
""" Determines the lowest usuable port for a new instance.

Args:
excluded_ports: A set of ports that the caller is planning on using.
This is used to make sure AppServers do not pick the same port since
it's possible for the port to not be in self._running_instances yet.
Returns:
An integer specifying a free port.
"""
existing_ports = {instance.port for instance in self._running_instances}
port = STARTING_INSTANCE_PORT
while True:
if port in existing_ports:
if port in existing_ports or port in excluded_ports or not self._try_port(port):
port += 1
continue

return port

def _try_port(self, port):
""" Helper method to check if the port is actually free or not.

Returns:
True if port is free, False otherwise.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = False
try:
sock.bind(("0.0.0.0", port))
result = True
except:
logger.info("Port {} is in use".format(port))
sock.close()
return result

@gen.coroutine
def _restart_unrouted_instances(self):
""" Restarts instances that the router considers offline. """
Expand Down Expand Up @@ -535,6 +555,11 @@ def _fulfill_assignments(self):

for instance in to_stop:
yield self._stop_app_instance(instance)
excluded_ports = set()
for _, assigned_ports in self._assignments.items():
for port in assigned_ports:
if port != -1:
excluded_ports.add(port)

for version_key, assigned_ports in self._assignments.items():
try:
Expand All @@ -546,7 +571,6 @@ def _fulfill_assignments(self):

# The number of required instances that don't have an assigned port.
new_assignment_count = sum(port == -1 for port in assigned_ports)

# Stop instances that aren't assigned. If the assignment list includes
# any -1s, match them to running instances that aren't in the assigned
# ports list.
Expand All @@ -571,7 +595,9 @@ def _fulfill_assignments(self):
and instance.port not in assigned_ports]
to_start = max(new_assignment_count - len(candidates), 0)
for _ in range(to_start):
yield self._start_instance(version, self._get_lowest_port())
port = self._get_lowest_port(excluded_ports)
excluded_ports.add(port)
yield self._start_instance(version, port)

@gen.coroutine
def _enforce_instance_details(self):
Expand Down
74 changes: 74 additions & 0 deletions AdminServer/tests/test_instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
service_helper,
testing
)
from appscale.admin.instance_manager.instance import Instance
from appscale.common.service_helper import ServiceOperator

options.define('login_ip', '127.0.0.1')
Expand Down Expand Up @@ -303,5 +304,78 @@ def test_wait_for_app(self):
instance_started = yield instance_manager._wait_for_app(port)
self.assertEqual(False, instance_started)

@gen_test
def test_parallel_appservers(self):
testing.disable_logging()
version_details = {'runtime': 'python27',
'revision': 1,
'deployment': {
'zip': {'sourceUrl': 'source.tar.gz'}},
'appscaleExtensions': {'httpPort': '8080'}
}
version_manager = flexmock(version_details=version_details,
project_id = 'test',
revision_key = 'test_default_v1_1',
version_key = 'test_default_v1')
deployment_config = flexmock(
get_config = lambda x: {'default_max_appserver_memory': 400})


source_manager = flexmock()
response = Future()
response.set_result(None)
source_manager.should_receive('ensure_source'). \
with_args('test_default_v1_1', 'source.tar.gz', 'python27'). \
and_return(response)

instance_manager = InstanceManager(
None, None, None, None, deployment_config,
source_manager, None, None, None)

# Start instance mocks
response = Future()
response.set_result((19999, []))
flexmock(instance_manager).should_receive('_ensure_api_server').\
and_return(response)

# write env
flexmock(file_io).should_receive('write').and_return()

response = Future()
response.set_result(None)
flexmock(ServiceOperator).should_receive('start_async').\
and_return(response)

instance_manager._zk_client = flexmock()
instance_manager._zk_client.should_receive('ensure_path')

# Within add_routing
response = Future()
response.set_result(True)
flexmock(instance_manager).should_receive('_wait_for_app').\
and_return(response)
instance_manager._routing_client = flexmock()
instance_manager._routing_client.should_receive('register_instance').and_return()

flexmock(utils).should_receive("setup_logrotate").and_return()


# Fulfill assignments mocks
instance_manager._service_operator = flexmock(
start_async=lambda service, wants, properties: response)

instance_manager._login_server = '192.168.33.10'
instance_manager._running_instances = {Instance('test_default_v1_1', 20000)}
instance_manager._assignments = {'test_default_v1': [20000, -1, -1, -1, -1]}
instance_manager._projects_manager = flexmock()
instance_manager._projects_manager.should_receive('version_from_key')\
.and_return(version_manager)

yield instance_manager._fulfill_assignments()
should_be_running = {Instance('test_default_v1_1', x) for x in
range(20000, 20005)}
assert instance_manager._running_instances == should_be_running


if __name__ == "__main__":
unittest.main()