diff --git a/bin/run_beam_simulator.sh b/bin/run_beam_simulator.sh new file mode 100755 index 0000000000..80b1db7f0d --- /dev/null +++ b/bin/run_beam_simulator.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +VERSION=$(mvn -q \ + -Dexec.executable=echo -Dexec.args='${project.version}' \ + --non-recursive exec:exec) + +java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000 -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/beam/target/nemo-examples-beam-${VERSION}-shaded.jar:client/target/nemo-client-${VERSION}-shaded.jar:`$YARN_HOME/bin/yarn classpath` org.apache.nemo.client.SimulatorLauncher "$@" diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java index acd9a2224d..f4601c63e1 100644 --- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java +++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java @@ -18,52 +18,29 @@ */ package org.apache.nemo.client; -import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import org.apache.nemo.common.exception.InvalidUserMainException; import org.apache.commons.lang3.SerializationUtils; import org.apache.nemo.common.Util; import org.apache.nemo.common.ir.IRDAG; -import org.apache.nemo.compiler.backend.nemo.NemoPlanRewriter; import org.apache.nemo.conf.JobConf; -import org.apache.nemo.driver.NemoDriver; import org.apache.nemo.runtime.common.comm.ControlMessage; -import org.apache.nemo.runtime.common.message.MessageEnvironment; -import org.apache.nemo.runtime.common.message.MessageParameters; -import org.apache.nemo.runtime.common.plan.PlanRewriter; -import org.apache.nemo.runtime.master.scheduler.Scheduler; -import org.apache.reef.client.DriverConfiguration; import org.apache.reef.client.DriverLauncher; -import org.apache.reef.client.parameters.JobMessageHandler; -import org.apache.reef.io.network.naming.LocalNameResolverConfiguration; -import org.apache.reef.io.network.naming.NameServerConfiguration; -import org.apache.reef.io.network.util.StringIdentifierFactory; -import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; -import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; import org.apache.reef.tang.*; -import org.apache.reef.tang.annotations.Name; import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.tang.formats.CommandLine; -import org.apache.reef.util.EnvironmentUtils; import org.apache.reef.util.Optional; -import org.apache.reef.wake.IdentifierFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; import java.util.*; import java.util.concurrent.CountDownLatch; /** * Job launcher. */ -public final class JobLauncher { +public final class JobLauncher extends Launcher { static { System.out.println( @@ -78,7 +55,6 @@ public final class JobLauncher { private static final Tang TANG = Tang.Factory.getTang(); private static final Logger LOG = LoggerFactory.getLogger(JobLauncher.class.getName()); - private static final int LOCAL_NUMBER_OF_EVALUATORS = 100; // hopefully large enough for our use.... private static Configuration jobAndDriverConf = null; private static Configuration deployModeConf = null; private static Configuration builtJobConf = null; @@ -91,7 +67,6 @@ public final class JobLauncher { private static CountDownLatch jobDoneLatch; private static String serializedDAG; private static final List COLLECTED_DATA = new ArrayList<>(); - private static final String[] EMPTY_USER_ARGS = new String[0]; /** * private constructor. @@ -215,29 +190,6 @@ public static void shutdown() { } } - /** - * Validate the configuration of the application's main method. - * @param jobConf Configuration of the application. - * @throws InvalidUserMainException when the user main is invalid (e.g., non-existing class/method). - */ - private static void validateJobConfig(final Configuration jobConf) throws InvalidUserMainException { - final Injector injector = TANG.newInjector(jobConf); - try { - final String className = injector.getNamedInstance(JobConf.UserMainClass.class); - final Class userCode = Class.forName(className); - final Method method = userCode.getMethod("main", String[].class); - if (!Modifier.isStatic(method.getModifiers())) { - throw new InvalidUserMainException("User Main Method not static"); - } - if (!Modifier.isPublic(userCode.getModifiers())) { - throw new InvalidUserMainException("User Main Class not public"); - } - - } catch (final InjectionException | ClassNotFoundException | NoSuchMethodException e) { - throw new InvalidUserMainException(e); - } - } - /** * Launch application using the application DAG. * Notice that we launch the DAG one at a time, as the result of a DAG has to be immediately returned to the @@ -315,195 +267,6 @@ public static void launchDAG(final IRDAG dag, } } - /** - * Run user-provided main method. - * - * @param jobConf the job configuration - * @throws Exception on any exceptions on the way - */ - private static void runUserProgramMain(final Configuration jobConf) throws Exception { - final Injector injector = TANG.newInjector(jobConf); - final String className = injector.getNamedInstance(JobConf.UserMainClass.class); - final String userArgsString = injector.getNamedInstance(JobConf.UserMainArguments.class); - final String[] args = userArgsString.isEmpty() ? EMPTY_USER_ARGS : userArgsString.split(" "); - final Class userCode = Class.forName(className); - final Method method = userCode.getMethod("main", String[].class); - - LOG.info("User program started"); - method.invoke(null, (Object) args); - LOG.info("User program finished"); - } - - /** - * @return client configuration. - */ - private static Configuration getClientConf() { - final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder(); - jcb.bindNamedParameter(JobMessageHandler.class, NemoClient.JobMessageHandler.class); - return jcb.build(); - } - - /** - * Fetch scheduler configuration. - * - * @param jobConf job configuration. - * @return the scheduler configuration. - * @throws ClassNotFoundException exception while finding the class. - * @throws InjectionException exception while injection (REEF Tang). - */ - private static Configuration getSchedulerConf(final Configuration jobConf) - throws ClassNotFoundException, InjectionException { - final Injector injector = TANG.newInjector(jobConf); - final String classImplName = injector.getNamedInstance(JobConf.SchedulerImplClassName.class); - final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder(); - final Class schedulerImpl = ((Class) Class.forName(classImplName)); - jcb.bindImplementation(Scheduler.class, schedulerImpl); - jcb.bindImplementation(PlanRewriter.class, NemoPlanRewriter.class); - return jcb.build(); - } - - /** - * Get driver ncs configuration. - * - * @return driver ncs configuration. - */ - private static Configuration getDriverNcsConf() { - return Configurations.merge(NameServerConfiguration.CONF.build(), - LocalNameResolverConfiguration.CONF.build(), - TANG.newConfigurationBuilder() - .bindImplementation(IdentifierFactory.class, StringIdentifierFactory.class) - .build()); - } - - /** - * Get driver message configuration. - * - * @return driver message configuration. - */ - private static Configuration getDriverMessageConf() { - return TANG.newConfigurationBuilder() - .bindNamedParameter(MessageParameters.SenderId.class, MessageEnvironment.MASTER_COMMUNICATION_ID) - .build(); - } - - /** - * Get driver configuration. - * - * @param jobConf job Configuration to get job id and driver memory. - * @return driver configuration. - * @throws InjectionException exception while injection. - */ - private static Configuration getDriverConf(final Configuration jobConf) throws InjectionException { - final Injector injector = TANG.newInjector(jobConf); - final String jobId = injector.getNamedInstance(JobConf.JobId.class); - final int driverMemory = injector.getNamedInstance(JobConf.DriverMemMb.class); - return DriverConfiguration.CONF - .setMultiple(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getAllClasspathJars()) - .set(DriverConfiguration.ON_DRIVER_STARTED, NemoDriver.StartHandler.class) - .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, NemoDriver.AllocatedEvaluatorHandler.class) - .set(DriverConfiguration.ON_CONTEXT_ACTIVE, NemoDriver.ActiveContextHandler.class) - .set(DriverConfiguration.ON_EVALUATOR_FAILED, NemoDriver.FailedEvaluatorHandler.class) - .set(DriverConfiguration.ON_CONTEXT_FAILED, NemoDriver.FailedContextHandler.class) - .set(DriverConfiguration.ON_DRIVER_STOP, NemoDriver.DriverStopHandler.class) - .set(DriverConfiguration.DRIVER_IDENTIFIER, jobId) - .set(DriverConfiguration.DRIVER_MEMORY, driverMemory) - .build(); - } - - /** - * Get job configuration. - * - * @param args arguments to be processed as command line. - * @return job configuration. - * @throws IOException exception while processing command line. - */ - @VisibleForTesting - public static Configuration getJobConf(final String[] args) throws IOException { - final JavaConfigurationBuilder confBuilder = TANG.newConfigurationBuilder(); - final CommandLine cl = new CommandLine(confBuilder); - cl.registerShortNameOfClass(JobConf.JobId.class); - cl.registerShortNameOfClass(JobConf.UserMainClass.class); - cl.registerShortNameOfClass(JobConf.UserMainArguments.class); - cl.registerShortNameOfClass(JobConf.DAGDirectory.class); - cl.registerShortNameOfClass(JobConf.OptimizationPolicy.class); - cl.registerShortNameOfClass(JobConf.DeployMode.class); - cl.registerShortNameOfClass(JobConf.ExecutorType.class); - cl.registerShortNameOfClass(JobConf.DriverMemMb.class); - cl.registerShortNameOfClass(JobConf.ExecutorJSONPath.class); - cl.registerShortNameOfClass(JobConf.BandwidthJSONPath.class); - cl.registerShortNameOfClass(JobConf.JVMHeapSlack.class); - cl.registerShortNameOfClass(JobConf.IORequestHandleThreadsTotal.class); - cl.registerShortNameOfClass(JobConf.MaxTaskAttempt.class); - cl.registerShortNameOfClass(JobConf.FileDirectory.class); - cl.registerShortNameOfClass(JobConf.GlusterVolumeDirectory.class); - cl.registerShortNameOfClass(JobConf.PartitionTransportServerPort.class); - cl.registerShortNameOfClass(JobConf.PartitionTransportServerBacklog.class); - cl.registerShortNameOfClass(JobConf.PartitionTransportServerNumListeningThreads.class); - cl.registerShortNameOfClass(JobConf.PartitionTransportServerNumWorkingThreads.class); - cl.registerShortNameOfClass(JobConf.PartitionTransportClientNumThreads.class); - cl.registerShortNameOfClass(JobConf.MaxNumDownloadsForARuntimeEdge.class); - cl.registerShortNameOfClass(JobConf.SchedulerImplClassName.class); - cl.registerShortNameOfClass(JobConf.ScheduleSerThread.class); - cl.registerShortNameOfClass(JobConf.MaxOffheapRatio.class); - cl.registerShortNameOfClass(JobConf.ChunkSizeKb.class); - cl.processCommandLine(args); - return confBuilder.build(); - } - - /** - * Get deploy mode configuration. - * - * @param jobConf job configuration to get deploy mode. - * @return deploy mode configuration. - * @throws InjectionException exception while injection. - */ - private static Configuration getDeployModeConf(final Configuration jobConf) throws InjectionException { - final Injector injector = TANG.newInjector(jobConf); - final String deployMode = injector.getNamedInstance(JobConf.DeployMode.class); - switch (deployMode) { - case "local": - return LocalRuntimeConfiguration.CONF - .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, LOCAL_NUMBER_OF_EVALUATORS) - .build(); - case "yarn": - return YarnClientConfiguration.CONF - .set(YarnClientConfiguration.JVM_HEAP_SLACK, injector.getNamedInstance(JobConf.JVMHeapSlack.class) - + injector.getNamedInstance(JobConf.MaxOffheapRatio.class)) - // Off-heap memory size is added to memory slack so that JVM heap region does not invade the off-heap region. - .build(); - default: - throw new UnsupportedOperationException(deployMode); - } - } - - /** - * Read json file and return its contents as configuration parameter. - * - * @param jobConf job configuration to get json path. - * @param pathParameter named parameter represents path to the json file, or an empty string - * @param contentsParameter named parameter represents contents of the file - * @param defaultContent the default configuration - * @return configuration with contents of the file, or an empty string as value for {@code contentsParameter} - * @throws InjectionException exception while injection. - */ - private static Configuration getJSONConf(final Configuration jobConf, - final Class> pathParameter, - final Class> contentsParameter, - final String defaultContent) - throws InjectionException { - final Injector injector = TANG.newInjector(jobConf); - try { - final String path = injector.getNamedInstance(pathParameter); - final String contents = path.isEmpty() ? defaultContent - : new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8); - return TANG.newConfigurationBuilder() - .bindNamedParameter(contentsParameter, contents) - .build(); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - /** * Get the built job configuration. * It can be {@code null} if this method is not called by the process which called the main function of this class. diff --git a/client/src/main/java/org/apache/nemo/client/Launcher.java b/client/src/main/java/org/apache/nemo/client/Launcher.java new file mode 100644 index 0000000000..3834da4a8a --- /dev/null +++ b/client/src/main/java/org/apache/nemo/client/Launcher.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.client; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.nemo.common.exception.InvalidUserMainException; +import org.apache.nemo.compiler.backend.nemo.NemoPlanRewriter; +import org.apache.nemo.conf.JobConf; +import org.apache.nemo.driver.NemoDriver; +import org.apache.nemo.runtime.common.message.MessageEnvironment; +import org.apache.nemo.runtime.common.message.MessageParameters; +import org.apache.nemo.runtime.common.plan.PlanRewriter; +import org.apache.nemo.runtime.master.scheduler.Scheduler; +import org.apache.reef.client.DriverConfiguration; +import org.apache.reef.client.parameters.JobMessageHandler; +import org.apache.reef.io.network.naming.LocalNameResolverConfiguration; +import org.apache.reef.io.network.naming.NameServerConfiguration; +import org.apache.reef.io.network.util.StringIdentifierFactory; +import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; +import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; +import org.apache.reef.tang.*; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.tang.formats.CommandLine; +import org.apache.reef.util.EnvironmentUtils; +import org.apache.reef.wake.IdentifierFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * Job launcher. + */ +public abstract class Launcher { + + static { + System.out.println( + "\nPowered by\n" + + " _ __ \n" + + " / | / /__ ____ ___ ____ \n" + + " / |/ / _ \\/ __ `__ \\/ __ \\\n" + + " / /| / __/ / / / / / /_/ /\n" + + "/_/ |_/\\___/_/ /_/ /_/\\____/ \n" + ); + } + + private static final Tang TANG = Tang.Factory.getTang(); + private static final Logger LOG = LoggerFactory.getLogger(Launcher.class.getName()); + private static final int LOCAL_NUMBER_OF_EVALUATORS = 100; // hopefully large enough for our use.... + + private static final String[] EMPTY_USER_ARGS = new String[0]; + + /** + * Validate the configuration of the application's main method. + * @param jobConf Configuration of the application. + * @throws InvalidUserMainException when the user main is invalid (e.g., non-existing class/method). + */ + static void validateJobConfig(final Configuration jobConf) throws InvalidUserMainException { + final Injector injector = TANG.newInjector(jobConf); + try { + final String className = injector.getNamedInstance(JobConf.UserMainClass.class); + final Class userCode = Class.forName(className); + final Method method = userCode.getMethod("main", String[].class); + if (!Modifier.isStatic(method.getModifiers())) { + throw new InvalidUserMainException("User Main Method not static"); + } + if (!Modifier.isPublic(userCode.getModifiers())) { + throw new InvalidUserMainException("User Main Class not public"); + } + + } catch (final InjectionException | ClassNotFoundException | NoSuchMethodException e) { + throw new InvalidUserMainException(e); + } + } + + /** + * Run user-provided main method. + * + * @param jobConf the job configuration + * @throws Exception on any exceptions on the way + */ + static void runUserProgramMain(final Configuration jobConf) throws Exception { + final Injector injector = TANG.newInjector(jobConf); + final String className = injector.getNamedInstance(JobConf.UserMainClass.class); + final String userArgsString = injector.getNamedInstance(JobConf.UserMainArguments.class); + final String[] args = userArgsString.isEmpty() ? EMPTY_USER_ARGS : userArgsString.split(" "); + final Class userCode = Class.forName(className); + final Method method = userCode.getMethod("main", String[].class); + + LOG.info("User program started"); + method.invoke(null, (Object) args); + LOG.info("User program finished"); + } + + /** + * @return client configuration. + */ + static Configuration getClientConf() { + final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder(); + jcb.bindNamedParameter(JobMessageHandler.class, NemoClient.JobMessageHandler.class); + return jcb.build(); + } + + /** + * Fetch scheduler configuration. + * + * @param jobConf job configuration. + * @return the scheduler configuration. + * @throws ClassNotFoundException exception while finding the class. + * @throws InjectionException exception while injection (REEF Tang). + */ + static Configuration getSchedulerConf(final Configuration jobConf) + throws ClassNotFoundException, InjectionException { + final Injector injector = TANG.newInjector(jobConf); + final String classImplName = injector.getNamedInstance(JobConf.SchedulerImplClassName.class); + final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder(); + final Class schedulerImpl = ((Class) Class.forName(classImplName)); + jcb.bindImplementation(Scheduler.class, schedulerImpl); + jcb.bindImplementation(PlanRewriter.class, NemoPlanRewriter.class); + return jcb.build(); + } + + /** + * Get driver ncs configuration. + * + * @return driver ncs configuration. + */ + static Configuration getDriverNcsConf() { + return Configurations.merge(NameServerConfiguration.CONF.build(), + LocalNameResolverConfiguration.CONF.build(), + TANG.newConfigurationBuilder() + .bindImplementation(IdentifierFactory.class, StringIdentifierFactory.class) + .build()); + } + + /** + * Get driver message configuration. + * + * @return driver message configuration. + */ + static Configuration getDriverMessageConf() { + return TANG.newConfigurationBuilder() + .bindNamedParameter(MessageParameters.SenderId.class, MessageEnvironment.MASTER_COMMUNICATION_ID) + .build(); + } + + /** + * Get driver configuration. + * + * @param jobConf job Configuration to get job id and driver memory. + * @return driver configuration. + * @throws InjectionException exception while injection. + */ + static Configuration getDriverConf(final Configuration jobConf) throws InjectionException { + final Injector injector = TANG.newInjector(jobConf); + final String jobId = injector.getNamedInstance(JobConf.JobId.class); + final int driverMemory = injector.getNamedInstance(JobConf.DriverMemMb.class); + return DriverConfiguration.CONF + .setMultiple(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getAllClasspathJars()) + .set(DriverConfiguration.ON_DRIVER_STARTED, NemoDriver.StartHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, NemoDriver.AllocatedEvaluatorHandler.class) + .set(DriverConfiguration.ON_CONTEXT_ACTIVE, NemoDriver.ActiveContextHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_FAILED, NemoDriver.FailedEvaluatorHandler.class) + .set(DriverConfiguration.ON_CONTEXT_FAILED, NemoDriver.FailedContextHandler.class) + .set(DriverConfiguration.ON_DRIVER_STOP, NemoDriver.DriverStopHandler.class) + .set(DriverConfiguration.DRIVER_IDENTIFIER, jobId) + .set(DriverConfiguration.DRIVER_MEMORY, driverMemory) + .build(); + } + + /** + * Get job configuration. + * + * @param args arguments to be processed as command line. + * @return job configuration. + * @throws IOException exception while processing command line. + */ + @VisibleForTesting + public static Configuration getJobConf(final String[] args) throws IOException { + final JavaConfigurationBuilder confBuilder = TANG.newConfigurationBuilder(); + final CommandLine cl = new CommandLine(confBuilder); + cl.registerShortNameOfClass(JobConf.JobId.class); + cl.registerShortNameOfClass(JobConf.UserMainClass.class); + cl.registerShortNameOfClass(JobConf.UserMainArguments.class); + cl.registerShortNameOfClass(JobConf.DAGDirectory.class); + cl.registerShortNameOfClass(JobConf.OptimizationPolicy.class); + cl.registerShortNameOfClass(JobConf.DeployMode.class); + cl.registerShortNameOfClass(JobConf.ExecutorType.class); + cl.registerShortNameOfClass(JobConf.DriverMemMb.class); + cl.registerShortNameOfClass(JobConf.ExecutorJSONPath.class); + cl.registerShortNameOfClass(JobConf.BandwidthJSONPath.class); + cl.registerShortNameOfClass(JobConf.NodeSpecJsonPath.class); + cl.registerShortNameOfClass(JobConf.JVMHeapSlack.class); + cl.registerShortNameOfClass(JobConf.IORequestHandleThreadsTotal.class); + cl.registerShortNameOfClass(JobConf.MaxTaskAttempt.class); + cl.registerShortNameOfClass(JobConf.FileDirectory.class); + cl.registerShortNameOfClass(JobConf.GlusterVolumeDirectory.class); + cl.registerShortNameOfClass(JobConf.PartitionTransportServerPort.class); + cl.registerShortNameOfClass(JobConf.PartitionTransportServerBacklog.class); + cl.registerShortNameOfClass(JobConf.PartitionTransportServerNumListeningThreads.class); + cl.registerShortNameOfClass(JobConf.PartitionTransportServerNumWorkingThreads.class); + cl.registerShortNameOfClass(JobConf.PartitionTransportClientNumThreads.class); + cl.registerShortNameOfClass(JobConf.MaxNumDownloadsForARuntimeEdge.class); + cl.registerShortNameOfClass(JobConf.SchedulerImplClassName.class); + cl.registerShortNameOfClass(JobConf.ScheduleSerThread.class); + cl.registerShortNameOfClass(JobConf.MaxOffheapRatio.class); + cl.registerShortNameOfClass(JobConf.ChunkSizeKb.class); + cl.processCommandLine(args); + return confBuilder.build(); + } + + /** + * Get deploy mode configuration. + * + * @param jobConf job configuration to get deploy mode. + * @return deploy mode configuration. + * @throws InjectionException exception while injection. + */ + static Configuration getDeployModeConf(final Configuration jobConf) throws InjectionException { + final Injector injector = TANG.newInjector(jobConf); + final String deployMode = injector.getNamedInstance(JobConf.DeployMode.class); + switch (deployMode) { + case "local": + return LocalRuntimeConfiguration.CONF + .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, LOCAL_NUMBER_OF_EVALUATORS) + .build(); + case "yarn": + return YarnClientConfiguration.CONF + .set(YarnClientConfiguration.JVM_HEAP_SLACK, injector.getNamedInstance(JobConf.JVMHeapSlack.class) + + injector.getNamedInstance(JobConf.MaxOffheapRatio.class)) + // Off-heap memory size is added to memory slack so that JVM heap region does not invade the off-heap region. + .build(); + default: + throw new UnsupportedOperationException(deployMode); + } + } + + /** + * Read json file and return its contents as configuration parameter. + * + * @param jobConf job configuration to get json path. + * @param pathParameter named parameter represents path to the json file, or an empty string + * @param contentsParameter named parameter represents contents of the file + * @param defaultContent the default configuration + * @return configuration with contents of the file, or an empty string as value for {@code contentsParameter} + * @throws InjectionException exception while injection. + */ + static Configuration getJSONConf(final Configuration jobConf, + final Class> pathParameter, + final Class> contentsParameter, + final String defaultContent) + throws InjectionException { + final Injector injector = TANG.newInjector(jobConf); + try { + final String path = injector.getNamedInstance(pathParameter); + final String contents = path.isEmpty() ? defaultContent + : new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8); + return TANG.newConfigurationBuilder() + .bindNamedParameter(contentsParameter, contents) + .build(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/client/src/main/java/org/apache/nemo/client/SimulatorLauncher.java b/client/src/main/java/org/apache/nemo/client/SimulatorLauncher.java new file mode 100644 index 0000000000..0e45a7237e --- /dev/null +++ b/client/src/main/java/org/apache/nemo/client/SimulatorLauncher.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.client; + +import org.apache.nemo.client.beam.NemoRunner; +import org.apache.nemo.common.Util; +import org.apache.nemo.common.exception.InvalidUserMainException; +import org.apache.nemo.common.ir.IRDAG; +import org.apache.nemo.compiler.backend.Backend; +import org.apache.nemo.compiler.backend.nemo.NemoBackend; +import org.apache.nemo.compiler.backend.nemo.NemoPlanRewriter; +import org.apache.nemo.compiler.optimizer.NemoOptimizer; +import org.apache.nemo.compiler.optimizer.Optimizer; +import org.apache.nemo.conf.JobConf; +import org.apache.nemo.runtime.common.message.ClientRPC; +import org.apache.nemo.runtime.common.message.MessageEnvironment; +import org.apache.nemo.runtime.common.message.local.LocalMessageEnvironment; +import org.apache.nemo.runtime.common.plan.PhysicalPlan; +import org.apache.nemo.runtime.common.plan.PlanRewriter; +import org.apache.nemo.runtime.master.BroadcastManagerMaster; +import org.apache.nemo.runtime.master.scheduler.*; +import org.apache.reef.tang.*; +import org.apache.reef.tang.exceptions.InjectionException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import java.io.IOException; +import java.io.Serializable; +import java.util.*; + +/** + * Job launcher. + */ +public final class SimulatorLauncher extends Launcher { + private static final Tang TANG = Tang.Factory.getTang(); + private static final Logger LOG = LoggerFactory.getLogger(SimulatorLauncher.class.getName()); + private static Configuration jobAndDriverConf = null; + private static Configuration builtJobConf = null; + private static PlanSimulator planSimulator; + + private static Optimizer optimizer = null; + private static Backend backend = null; + private static PlanRewriter planRewriter = null; + + /** + * private construct or. + */ + @Inject + private SimulatorLauncher() { + // empty + } + + /** + * Main JobLauncher method. + * + * @param args arguments. + * @throws Exception exception on the way. + */ + public static void main(final String[] args) throws Exception { + try { + LOG.info("start simulator launcher"); + setup(args); + runUserProgramMain(builtJobConf); + } catch (final InjectionException e) { + throw new RuntimeException(e); + } + LOG.info("end main"); + } + + /** + * Set up the driver, etc. before the actual execution. + * + * @param args arguments. + * @throws InjectionException injection exception from REEF. + * @throws ClassNotFoundException class not found exception. + * @throws IOException IO exception. + */ + public static void setup(final String[] args) + throws InjectionException, ClassNotFoundException, IOException, InvalidUserMainException { + // Set launcher of NemoRunner + NemoRunner.setJobLauncher(SimulatorLauncher.class); + LOG.info("Project Root Path: {}", Util.fetchProjectRootPath()); + + // Get Job and Driver Confs + builtJobConf = getJobConf(args); + validateJobConfig(builtJobConf); + + // Registers actions for launching the DAG. + + final Configuration driverMessageConfig = getDriverMessageConf(); + final String defaultExecutorResourceConfig = "[{\"type\":\"Transient\",\"memory_mb\":512,\"capacity\":5}," + + "{\"type\":\"Reserved\",\"memory_mb\":512,\"capacity\":5}]"; + final Configuration executorResourceConfig = getJSONConf(builtJobConf, JobConf.ExecutorJSONPath.class, + JobConf.ExecutorJSONContents.class, defaultExecutorResourceConfig); + final Configuration bandwidthConfig = getJSONConf(builtJobConf, JobConf.BandwidthJSONPath.class, + JobConf.BandwidthJSONContents.class, ""); + final Configuration nodeConfig = getJSONConf(builtJobConf, JobConf.NodeSpecJsonPath.class, + JobConf.NodeSpecJsonContents.class, ""); + final Configuration schedulerConf = getSchedulerConf(builtJobConf); + + // Merge Job and Driver Confs + jobAndDriverConf = Configurations.merge(builtJobConf, executorResourceConfig, driverMessageConfig, + bandwidthConfig, schedulerConf, nodeConfig); + + Injector injector = TANG.newInjector(jobAndDriverConf); + + optimizer = injector.getInstance(NemoOptimizer.class); + backend = injector.getInstance(NemoBackend.class); + planRewriter = injector.getInstance(NemoPlanRewriter.class); + + planSimulator = injector.getInstance(StreamingPlanSimulator.class); + + + // Start Driver and launch user program. + if (jobAndDriverConf == null) { + throw new RuntimeException("Configuration for launching driver is not ready"); + } + } + + public static Configuration getSchedulerConf(final Configuration jobConf) + throws InjectionException { + final Injector injector = TANG.newInjector(jobConf); + final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder(); + String classImplName = injector.getNamedInstance(JobConf.SchedulerImplClassName.class); + if (classImplName.equals(StreamingScheduler.class.getName())) { + jcb.bindImplementation(ScheduleSimulator.class, StreamingScheduleSimulator.class); + jcb.bindImplementation(PlanSimulator.class, StreamingPlanSimulator.class); + } else { + throw new RuntimeException("batch simulator is not ready"); + } + jcb.bindImplementation(MessageEnvironment.class, LocalMessageEnvironment.class); + jcb.bindImplementation(ClientRPC.class, ClientRPCSimulator.class); + jcb.bindImplementation(PlanRewriter.class, NemoPlanRewriter.class); + return jcb.build(); + } + + /** + * Launch application using the application DAG. + * Notice that we launch the DAG one at a time, as the result of a DAG has to be immediately returned to the + * Java variable before the application can be resumed. + * + * @param dag the application DAG. + */ + // When modifying the signature of this method, see CompilerTestUtil#compileDAG and make corresponding changes + public static void launchDAG(final IRDAG dag) { + launchDAG(dag, Collections.emptyMap(), ""); + } + + /** + * @param dag the application DAG. + * @param jobId job ID. + */ + public static void launchDAG(final IRDAG dag, final String jobId) { + launchDAG(dag, Collections.emptyMap(), jobId); + } + + /** + * @param dag the application DAG. + * @param broadcastVariables broadcast variables (can be empty). + * @param jobId job ID. + */ + public static void launchDAG(final IRDAG dag, + final Map broadcastVariables, + final String jobId) { + // launch driver if it hasn't been already + LOG.info("Launching DAG..."); + try { + final IRDAG optimizedDAG = optimizer.optimizeAtCompileTime(dag); + ((NemoPlanRewriter) planRewriter).setCurrentIRDAG(optimizedDAG); + final PhysicalPlan physicalPlan = backend.compile(optimizedDAG); + + ((NemoPlanRewriter) planRewriter).setCurrentPhysicalPlan(physicalPlan); + BroadcastManagerMaster.registerBroadcastVariablesFromClient(broadcastVariables); + + planSimulator.simulate(physicalPlan, 2); + LOG.info("end execution"); + planSimulator.terminate(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } +} + diff --git a/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java b/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java index bf3323dbb2..783bff475f 100644 --- a/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java +++ b/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java @@ -24,9 +24,11 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.nemo.client.JobLauncher; +import org.apache.nemo.common.ir.IRDAG; import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions; import org.apache.nemo.compiler.frontend.beam.PipelineVisitor; +import java.lang.reflect.InvocationTargetException; import java.util.concurrent.CompletableFuture; /** @@ -34,6 +36,7 @@ */ public final class NemoRunner extends PipelineRunner { private final NemoPipelineOptions nemoPipelineOptions; + private static Class jobLauncher = JobLauncher.class; /** * BEAM Pipeline Runner. @@ -86,9 +89,21 @@ public NemoPipelineResult run(final Pipeline pipeline) { final PipelineVisitor pipelineVisitor = new PipelineVisitor(pipeline, nemoPipelineOptions); pipeline.traverseTopologically(pipelineVisitor); final NemoPipelineResult nemoPipelineResult = new NemoPipelineResult(); - CompletableFuture.runAsync(() -> - JobLauncher.launchDAG(pipelineVisitor.getConvertedPipeline(), nemoPipelineOptions.getJobName())) - .thenRun(nemoPipelineResult::setJobDone); + + CompletableFuture.runAsync(() -> { + try { + Class[] methodParamClass = new Class[] {IRDAG.class, String.class}; + Object[] methodParamObejct = new Object[] {pipelineVisitor.getConvertedPipeline(), + nemoPipelineOptions.getJobName()}; + jobLauncher.getMethod("launchDAG", methodParamClass).invoke(null, methodParamObejct); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + e.printStackTrace(); + } + }).thenRun(nemoPipelineResult::setJobDone); return nemoPipelineResult; } + + public static void setJobLauncher(final Class jobLauncher) { + NemoRunner.jobLauncher = jobLauncher; + } } diff --git a/conf/src/main/java/org/apache/nemo/conf/JobConf.java b/conf/src/main/java/org/apache/nemo/conf/JobConf.java index b7ce27328e..f2468fe067 100644 --- a/conf/src/main/java/org/apache/nemo/conf/JobConf.java +++ b/conf/src/main/java/org/apache/nemo/conf/JobConf.java @@ -325,6 +325,20 @@ public final class PartitionTransportClientNumThreads implements Name { public final class ChunkSizeKb implements Name { } + /** + * Path to the JSON file that specifies node specification. + */ + @NamedParameter(doc = "Path to the JSON file that specifies node specification", short_name = "node_json") + public final class NodeSpecJsonPath implements Name { + } + + /** + * Contents of JSON file that specifies node specification. + */ + @NamedParameter(doc = "Contents of JSON file that specifies node specification") + public final class NodeSpecJsonContents implements Name { + } + //////////////////////////////// Intermediate Configurations /** diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ClientRPC.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ClientRPC.java index 4317bb1553..2ee6934e7a 100644 --- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ClientRPC.java +++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ClientRPC.java @@ -18,57 +18,15 @@ */ package org.apache.nemo.runtime.common.message; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.nemo.conf.JobConf; import org.apache.nemo.runtime.common.comm.ControlMessage; -import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.annotations.DefaultImplementation; import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.impl.SyncStage; -import org.apache.reef.wake.remote.Encoder; -import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.impl.TransportEvent; -import org.apache.reef.wake.remote.transport.Link; -import org.apache.reef.wake.remote.transport.LinkListener; -import org.apache.reef.wake.remote.transport.Transport; -import org.apache.reef.wake.remote.transport.TransportFactory; - -import javax.inject.Inject; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** - * Driver-side RPC implementation for communication from/to Nemo Client. + * This class handles communication from/to clients. */ -public final class ClientRPC { - private static final DriverToClientMessageEncoder ENCODER = new DriverToClientMessageEncoder(); - private static final ClientRPCLinkListener LINK_LISTENER = new ClientRPCLinkListener(); - private static final int RETRY_COUNT = 10; - private static final int RETRY_TIMEOUT = 100; - - private final Map> - handlers = new ConcurrentHashMap<>(); - private final Transport transport; - private final Link link; - private volatile boolean isClosed = false; - - @Inject - private ClientRPC(final TransportFactory transportFactory, - final LocalAddressProvider localAddressProvider, - @Parameter(JobConf.ClientSideRPCServerHost.class) final String clientHost, - @Parameter(JobConf.ClientSideRPCServerPort.class) final int clientPort) { - transport = transportFactory.newInstance(localAddressProvider.getLocalAddress(), - 0, new SyncStage<>(new RPCEventHandler()), null, RETRY_COUNT, RETRY_TIMEOUT); - final SocketAddress clientAddress = new InetSocketAddress(clientHost, clientPort); - try { - link = transport.open(clientAddress, ENCODER, LINK_LISTENER); - } catch (final IOException e) { - throw new IllegalStateException("Failed to setup an RPC connection to the Client. " - + "A failure at the client-side is suspected."); - } - } +@DefaultImplementation(ClientRPCImpl.class) +public interface ClientRPC { /** * Registers handler for the given type of message. @@ -77,102 +35,18 @@ private ClientRPC(final TransportFactory transportFactory, * @param handler handler implementation * @return {@code this} */ - public ClientRPC registerHandler(final ControlMessage.ClientToDriverMessageType type, - final EventHandler handler) { - if (handlers.putIfAbsent(type, handler) != null) { - throw new RuntimeException(String.format("A handler for %s already registered", type)); - } - return this; - } + ClientRPC registerHandler(ControlMessage.ClientToDriverMessageType type, + EventHandler handler); /** * Shuts down the transport. */ - public void shutdown() { - ensureRunning(); - try { - transport.close(); - } catch (final Exception e) { - throw new RuntimeException(e); - } finally { - isClosed = true; - } - } + void shutdown(); /** * Write message to client. * * @param message message to send. */ - public void send(final ControlMessage.DriverToClientMessage message) { - ensureRunning(); - link.write(message); - } - - /** - * Handles message from client. - * - * @param message message to process - */ - private void handleMessage(final ControlMessage.ClientToDriverMessage message) { - final ControlMessage.ClientToDriverMessageType type = message.getType(); - final EventHandler handler = handlers.get(type); - if (handler == null) { - throw new RuntimeException(String.format("Handler for message type %s not registered", type)); - } else { - handler.onNext(message); - } - } - - /** - * Provides event handler for messages from client. - */ - private final class RPCEventHandler implements EventHandler { - @Override - public void onNext(final TransportEvent transportEvent) { - try { - final byte[] data = transportEvent.getData(); - final ControlMessage.ClientToDriverMessage message = ControlMessage.ClientToDriverMessage.parseFrom(data); - handleMessage(message); - } catch (final InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - } - } - - /** - * Ensure the Transport is running. - */ - private void ensureRunning() { - if (isClosed) { - throw new RuntimeException("The ClientRPC is already closed"); - } - } - - /** - * Provides encoder for {@link org.apache.nemo.runtime.common.comm.ControlMessage.DriverToClientMessage}. - */ - private static final class DriverToClientMessageEncoder implements Encoder { - @Override - public byte[] encode(final ControlMessage.DriverToClientMessage driverToClientMessage) { - return driverToClientMessage.toByteArray(); - } - } - - /** - * Provides {@link LinkListener}. - */ - private static final class ClientRPCLinkListener implements LinkListener { - - @Override - public void onSuccess(final ControlMessage.DriverToClientMessage driverToClientMessage) { - } - - @Override - public void onException(final Throwable throwable, - final SocketAddress socketAddress, - final ControlMessage.DriverToClientMessage driverToClientMessage) { - throw new RuntimeException(throwable); - } - } + void send(ControlMessage.DriverToClientMessage message); } diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ClientRPCImpl.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ClientRPCImpl.java new file mode 100644 index 0000000000..cfce019844 --- /dev/null +++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ClientRPCImpl.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.runtime.common.message; + +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.nemo.conf.JobConf; +import org.apache.nemo.runtime.common.comm.ControlMessage; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.impl.SyncStage; +import org.apache.reef.wake.remote.Encoder; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.impl.TransportEvent; +import org.apache.reef.wake.remote.transport.Link; +import org.apache.reef.wake.remote.transport.LinkListener; +import org.apache.reef.wake.remote.transport.Transport; +import org.apache.reef.wake.remote.transport.TransportFactory; + +import javax.inject.Inject; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Driver-side RPC implementation for communication from/to Nemo Client. + */ +public final class ClientRPCImpl implements ClientRPC { + private static final DriverToClientMessageEncoder ENCODER = new DriverToClientMessageEncoder(); + private static final ClientRPCLinkListener LINK_LISTENER = new ClientRPCLinkListener(); + private static final int RETRY_COUNT = 10; + private static final int RETRY_TIMEOUT = 100; + + private final Map> + handlers = new ConcurrentHashMap<>(); + private final Transport transport; + private final Link link; + private volatile boolean isClosed = false; + + @Inject + private ClientRPCImpl(final TransportFactory transportFactory, + final LocalAddressProvider localAddressProvider, + @Parameter(JobConf.ClientSideRPCServerHost.class) final String clientHost, + @Parameter(JobConf.ClientSideRPCServerPort.class) final int clientPort) { + transport = transportFactory.newInstance(localAddressProvider.getLocalAddress(), + 0, new SyncStage<>(new RPCEventHandler()), null, RETRY_COUNT, RETRY_TIMEOUT); + final SocketAddress clientAddress = new InetSocketAddress(clientHost, clientPort); + try { + link = transport.open(clientAddress, ENCODER, LINK_LISTENER); + } catch (final IOException e) { + throw new IllegalStateException("Failed to setup an RPC connection to the Client. " + + "A failure at the client-side is suspected."); + } + } + + /** + * Registers handler for the given type of message. + * + * @param type the type of message + * @param handler handler implementation + * @return {@code this} + */ + @Override + public ClientRPC registerHandler(final ControlMessage.ClientToDriverMessageType type, + final EventHandler handler) { + if (handlers.putIfAbsent(type, handler) != null) { + throw new RuntimeException(String.format("A handler for %s already registered", type)); + } + return this; + } + + /** + * Shuts down the transport. + */ + @Override + public void shutdown() { + ensureRunning(); + try { + transport.close(); + } catch (final Exception e) { + throw new RuntimeException(e); + } finally { + isClosed = true; + } + } + + /** + * Write message to client. + * + * @param message message to send. + */ + @Override + public void send(final ControlMessage.DriverToClientMessage message) { + ensureRunning(); + link.write(message); + } + + /** + * Handles message from client. + * + * @param message message to process + */ + private void handleMessage(final ControlMessage.ClientToDriverMessage message) { + final ControlMessage.ClientToDriverMessageType type = message.getType(); + final EventHandler handler = handlers.get(type); + if (handler == null) { + throw new RuntimeException(String.format("Handler for message type %s not registered", type)); + } else { + handler.onNext(message); + } + } + + /** + * Provides event handler for messages from client. + */ + private final class RPCEventHandler implements EventHandler { + @Override + public void onNext(final TransportEvent transportEvent) { + try { + final byte[] data = transportEvent.getData(); + final ControlMessage.ClientToDriverMessage message = ControlMessage.ClientToDriverMessage.parseFrom(data); + handleMessage(message); + } catch (final InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Ensure the Transport is running. + */ + private void ensureRunning() { + if (isClosed) { + throw new RuntimeException("The ClientRPC is already closed"); + } + } + + /** + * Provides encoder for {@link org.apache.nemo.runtime.common.comm.ControlMessage.DriverToClientMessage}. + */ + private static final class DriverToClientMessageEncoder implements Encoder { + @Override + public byte[] encode(final ControlMessage.DriverToClientMessage driverToClientMessage) { + return driverToClientMessage.toByteArray(); + } + } + + /** + * Provides {@link LinkListener}. + */ + private static final class ClientRPCLinkListener implements LinkListener { + + @Override + public void onSuccess(final ControlMessage.DriverToClientMessage driverToClientMessage) { + } + + @Override + public void onException(final Throwable throwable, + final SocketAddress socketAddress, + final ControlMessage.DriverToClientMessage driverToClientMessage) { + throw new RuntimeException(throwable); + } + } +} diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ClientRPCSimulator.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ClientRPCSimulator.java new file mode 100644 index 0000000000..35340a8d62 --- /dev/null +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ClientRPCSimulator.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.runtime.master.scheduler; + +import org.apache.nemo.runtime.common.comm.ControlMessage; +import org.apache.nemo.runtime.common.message.ClientRPC; +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; + +/** + * This is a Simulator for Driver-side RPC. + * It manages communication from/to Nemo Client. + */ +// TODO #XXX: This part should support XGBoost optimization. +public final class ClientRPCSimulator implements ClientRPC { + + @Inject + public ClientRPCSimulator() { + super(); + } + + /** + * Registers handler for the given type of message. + * + * @param type the type of message + * @param handler handler implementation + * @return {@code this} + */ + @Override + public ClientRPCSimulator registerHandler(final ControlMessage.ClientToDriverMessageType type, + final EventHandler handler) { + return this; + } + + /** + * Shuts down the transport. + */ + @Override + public void shutdown() { + } + + /** + * Write message to client. + * + * @param message message to send. + */ + @Override + public void send(final ControlMessage.DriverToClientMessage message) { + } +} diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ContainerManageSimulator.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ContainerManageSimulator.java new file mode 100644 index 0000000000..f9d0e09382 --- /dev/null +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ContainerManageSimulator.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nemo.runtime.master.scheduler; + +import com.fasterxml.jackson.core.TreeNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nemo.common.Pair; +import org.apache.nemo.common.exception.JsonParseException; +import org.apache.nemo.runtime.common.plan.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +/** + * This class manages virtual nodes and data transfer size between tasks. + */ +final class ContainerManageSimulator { + private static final Logger LOG = LoggerFactory.getLogger(ContainerManageSimulator.class.getName()); + private final Map nodeSimulatorMap; + private final Map executorIdToNodeId; + private final Map taskIdToNodeId; + private final Map taskMap; + private final NetworkSimulator networkSimulator; + + ContainerManageSimulator() { + nodeSimulatorMap = new HashMap<>(); + executorIdToNodeId = new HashMap<>(); + taskIdToNodeId = new HashMap<>(); + taskMap = new HashMap<>(); + networkSimulator = new NetworkSimulator(); + } + + /** + * Utility method for parsing the node specification string. + */ + public void parseNodeSpecificationString(final String nodeSpecficationString) { + final Map, Long> latencyMap = new HashMap<>(); + final Map, Long> bandwidthMap = new HashMap<>(); + + try { + final TreeNode jsonRootNode = new ObjectMapper().readTree(nodeSpecficationString); + + // parse network specification + final TreeNode networkNode = jsonRootNode.get("network"); + for (Iterator it = networkNode.fieldNames(); it.hasNext();) { + String pairs = it.next(); + + final TreeNode pairNode = networkNode.get(pairs); + List hostNames = Arrays.asList(pairs.split("/")); + if (hostNames.size() != 2) { + continue; + } + + Pair nodePair = Pair.of(hostNames.get(0), hostNames.get(1)); + + final long bandwidth = Long.parseLong(pairNode.get("bw").traverse().nextTextValue().split(" ")[0]); + final long latency = Long.parseLong(pairNode.get("latency").traverse().nextTextValue().split(" ")[0]); + latencyMap.put(nodePair, latency / 1000); + bandwidthMap.put(nodePair, bandwidth / 1000); + } + + // update bandwidth and latency to networkSimulator + this.networkSimulator.updateBandwidth(bandwidthMap); + this.networkSimulator.updateLatency(latencyMap); + + // parse node names + final TreeNode nodeNode = jsonRootNode.get("nodes"); + for (int i = 0; i < nodeNode.size(); i++) { + final String nodeName = nodeNode.get(i).traverse().nextTextValue(); + this.nodeSimulatorMap.put(nodeName, new NodeSimulator(nodeName, networkSimulator)); + } + + } catch (final Exception e) { + throw new JsonParseException(e); + } + } + + /** + * allocated executor to virtual node. + * Assign in order in nodeSimulatorMap. + * + * @param executorId Executor Id to allocate. + * @param capacity The number of tasks that can be processed at the same time in the executor. + */ + // TODO XXX: We need to check how hadoop assigns executors. + public void allocateExecutor(final String executorId, final int capacity) throws Exception { + for (NodeSimulator simulator : this.nodeSimulatorMap.values()) { + if (!simulator.isAllocated()) { + simulator.allocateExecutor(executorId, capacity); + executorIdToNodeId.put(executorId, simulator.getNodeName()); + return; + } + } + throw new Exception(); + } + + /** + * assign task to executor. + * + * @param executorId Executor Id to allocate. + * @param task task to allocate. + */ + public void onTaskReceived(final String executorId, final Task task) { + nodeSimulatorMap.get(executorIdToNodeId.get(executorId)).onTaskReceived(task); + taskIdToNodeId.put(task.getTaskId(), executorIdToNodeId.get(executorId)); + taskMap.put(task.getTaskId(), task); + + // get upstream task + networkSimulator.updateRuntimeEdgeSrcIndexToNode(task.getTaskOutgoingEdges(), + task.getTaskId(), executorIdToNodeId.get(executorId)); + networkSimulator.updateRuntimeEdgeDstIndestToTaskId(task.getTaskIncomingEdges(), task.getTaskId()); + } + + /** + * trigger prepare to every executors. + */ + public void prepare() { + nodeSimulatorMap.values().forEach(NodeSimulator::prepare); + } + + public void reset() { + nodeSimulatorMap.clear(); + executorIdToNodeId.clear(); + taskIdToNodeId.clear(); + taskMap.clear(); + networkSimulator.reset(); + } + /** + * get every tasks of job. + * + * @return list of all tasks. + */ + public List getTasks() { + return new ArrayList<>(taskMap.values()); + } + + /** + * get every executors of job. + * + * @return list of all nodeSimulators + */ + public List getNodeSimulators() { + return new ArrayList<>(this.nodeSimulatorMap.values()); + } + + /** + * simulate task for duration. + * When task and durations are entered, the number of tuples that can be processed in a given period is calculated + * and the number of tuples that the downstream task needs to process is updated. + * + * @param task task to simulate. + * @param duration The duration of the simulation. + */ + public void simulate(final Task task, final long duration) throws Exception { + String srcNodeName = this.taskIdToNodeId.get(task.getTaskId()); + // simulate task + Map, Long>> numOfProcessedTuples = + nodeSimulatorMap.get(srcNodeName).simulate(task, duration); + + // add the number of tuples to process of downstream tasks. + numOfProcessedTuples.forEach((srcVertexId, numberOfProcessedTupleMap) -> { + numberOfProcessedTupleMap.forEach((pair, numberOfProcessedTuples) -> { + addNumOfTuplesToProcess(pair.left(), pair.right(), srcVertexId, srcNodeName, numberOfProcessedTuples); + }); + }); + } + + /** + * transfer the number of tuples to process to nodeSimulator. + */ + public void addNumOfTuplesToProcess(final String dstTaskId, + final String dstVertexId, + final String srcVertexId, + final String srcNodeName, + final long numOfTuples) { + String nodeName = this.taskIdToNodeId.get(dstTaskId); + nodeSimulatorMap.get(nodeName).addNumOfTuplesToProcess(dstTaskId, dstVertexId, srcVertexId, + srcNodeName, numOfTuples); + } +} diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/NetworkSimulator.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/NetworkSimulator.java new file mode 100644 index 0000000000..c119bd9455 --- /dev/null +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/NetworkSimulator.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nemo.runtime.master.scheduler; + +import org.apache.nemo.common.Pair; +import org.apache.nemo.runtime.common.RuntimeIdManager; +import org.apache.nemo.runtime.common.plan.StageEdge; + +import java.util.*; + +/** + * This is a Simulator for managing network environment. + * It manages network bandwidth and latency between node pairs. + */ +final class NetworkSimulator { + private static final long DEFAULT_BANDWIDTH = 1000000; //kbps + private static final long LOCAL_BANDWIDTH = 1000000; //kbps + private static final long DEFAULT_LATENCY = 100; // ms + private static final long LOCAL_LATENCY = 0; // ms + + // hostname1, hostname 2 pair to bandwidth + private final Map, Long> bandwidthMap; + + // hostname1, hostname 2 pair to latency + private final Map, Long> latencyMap; + + // runtime edge id, edge index pair to upstream node name. + private final Map, String> runtimeEdgeSrcIndexToNode; + + // runtime edge id, edge index pair to downstream task id. + private final Map, String> runtimeEdgeDstIndexToTaskId; + + NetworkSimulator() { + bandwidthMap = new HashMap<>(); + latencyMap = new HashMap<>(); + runtimeEdgeDstIndexToTaskId = new HashMap<>(); + runtimeEdgeSrcIndexToNode = new HashMap<>(); + } + + public void reset() { + bandwidthMap.clear(); + latencyMap.clear(); + runtimeEdgeDstIndexToTaskId.clear(); + runtimeEdgeSrcIndexToNode.clear(); + } + + /** + * update bandwidthMap. + */ + public void updateBandwidth(final Map, Long> newBandwidthMap) { + this.bandwidthMap.putAll(newBandwidthMap); + } + + /** + * update latencyMap. + */ + public void updateLatency(final Map, Long> newLatencyMap) { + this.latencyMap.putAll(newLatencyMap); + } + + /** + * get network bandwidth between two nodes. + */ + public long getBandwidth(final String nodeName1, final String nodeName2) { + // If two nodes are the same, It is the data transfer in the same node. + if (nodeName1.equals(nodeName2)) { + return LOCAL_BANDWIDTH; + } else if (bandwidthMap.containsKey(Pair.of(nodeName1, nodeName2))) { + return bandwidthMap.get(Pair.of(nodeName1, nodeName2)); + } else { + return bandwidthMap.getOrDefault(Pair.of(nodeName2, nodeName1), DEFAULT_BANDWIDTH); + } + } + + /** + * get network latency between two nodes. + */ + public long getLatency(final String nodeName1, final String nodeName2) { + // If two nodes are the same, It is the data transfer in the same node. + if (nodeName1.equals(nodeName2)) { + return LOCAL_LATENCY; + } else if (latencyMap.containsKey(Pair.of(nodeName1, nodeName2))) { + return latencyMap.get(Pair.of(nodeName1, nodeName2)); + } else { + return latencyMap.getOrDefault(Pair.of(nodeName2, nodeName1), DEFAULT_LATENCY); + } + } + + /** + * update runtimeEdgeSrcIndexToNode. + * + * @param edges outgoing edges from task. + * @param srcTaskId task id. + * @param nodeName node name where the task is. + */ + public void updateRuntimeEdgeSrcIndexToNode(final List edges, + final String srcTaskId, + final String nodeName) { + edges.forEach(edge -> { + final Pair keyPair = + Pair.of(edge.getId(), RuntimeIdManager.getIndexFromTaskId(srcTaskId)); + + runtimeEdgeSrcIndexToNode.put(keyPair, nodeName); + }); + } + + /** + * update runtimeEdgeSrcIndexToNode. + * + * @param edges incoming edges of task. + * @param taskId the task id. + */ + public void updateRuntimeEdgeDstIndestToTaskId(final List edges, final String taskId) { + edges.forEach(edge -> { + final Pair keyPair = + Pair.of(edge.getId(), RuntimeIdManager.getIndexFromTaskId(taskId)); + + runtimeEdgeDstIndexToTaskId.put(keyPair, taskId); + }); + } + /** + * get the node name of upstream task by edge id and edge index. + * + * @param edgeId edge id. + * @param dstIndex index of edge. + * + * @return node name of upstream task. + */ + public String getUpstreamNode(final String edgeId, final int dstIndex) { + return runtimeEdgeSrcIndexToNode.get(Pair.of(edgeId, dstIndex)); + } + + /** + * get the task id of downstream task by edge id and edge index. + * + * @param edgeId edge id. + * @param srcIndex index of edge. + * + * @return task id of downstream task. + */ + public String getDownstreamTaskId(final String edgeId, final int srcIndex) { + return runtimeEdgeDstIndexToTaskId.get(Pair.of(edgeId, srcIndex)); + } +} diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/NodeSimulator.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/NodeSimulator.java new file mode 100644 index 0000000000..751899d211 --- /dev/null +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/NodeSimulator.java @@ -0,0 +1,554 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nemo.runtime.master.scheduler; + +import org.apache.commons.lang3.SerializationUtils; +import org.apache.nemo.common.Pair; +import org.apache.nemo.common.dag.DAG; +import org.apache.nemo.common.exception.UnsupportedCommPatternException; +import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty; +import org.apache.nemo.common.ir.vertex.IRVertex; +import org.apache.nemo.common.ir.vertex.OperatorVertex; +import org.apache.nemo.common.ir.vertex.SourceVertex; +import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty; +import org.apache.nemo.runtime.common.RuntimeIdManager; +import org.apache.nemo.runtime.common.plan.RuntimeEdge; +import org.apache.nemo.runtime.common.plan.StageEdge; +import org.apache.nemo.runtime.common.plan.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.DoubleStream; +import java.util.stream.LongStream; + +/** + * Class for simulating node. + * This class manages resource distribution for each tasks. + */ +public final class NodeSimulator { + private static final Logger LOG = LoggerFactory.getLogger(NodeSimulator.class.getName()); + private static final int DEFAULT_BYTES_PER_LINE = 5000; + // TODO XXX: Strict definition for CPU Resource required. + private static final int DEFAULT_CPU_RESOURCE = 100; + private static final double DEFAULT_TRANSFORM_CPU_UTILIZATION = 1.3; // tuples / ms + + private final NetworkSimulator networkSimulator; + + // whether it allocate executor of not. + private boolean allocated = false; + private final String nodeName; + private String executorId = ""; + private int capacity; + private final Map taskMap; + + private int cpuResource; + + NodeSimulator(final String nodeName, final NetworkSimulator networkSimulator) { + this.nodeName = nodeName; + this.networkSimulator = networkSimulator; + this.cpuResource = DEFAULT_CPU_RESOURCE; + this.taskMap = new HashMap<>(); + } + + /** + * add the number of tuples to process. + * + * @param taskId Target task id to add the number of tuples to process + * @param dstVertexId Vertex id that consume tuples. + * @param srcVertexId Vertex id that process tuples. + * @param srcNodeName the name of node that process tuples. + * @param numOfTuples the number of tuples to process that processed from upstream task. + */ + public void addNumOfTuplesToProcess(final String taskId, + final String dstVertexId, + final String srcVertexId, + final String srcNodeName, + final long numOfTuples) { + taskMap.get(taskId).addNumOfTuplesToProcess(dstVertexId, srcVertexId, srcNodeName, numOfTuples); + } + + /** + * Distribute CPU resource to tasks. + * All tasks get resource equally. + */ + // TODO XXX: It is required to distribute resource according to the number of tuples that can be processed. + public void distributeResource() { + long distributedCPU = this.cpuResource / (taskMap.size() == 0 ? 1 : taskMap.size()); + this.taskMap.values().forEach(taskHarness -> { + taskHarness.setAllocatedCPU(distributedCPU); + }); + } + + public String getNodeName() { + return nodeName; + } + + /** + * allocated executor. + * + * @param targetExecutorId Executor Id to allocate. + * @param executorCapacity The number of tasks that can be processed at the same time in the executor. + */ + public void allocateExecutor(final String targetExecutorId, final int executorCapacity) { + LOG.info(String.format("put executor %s to %s", executorId, nodeName)); + this.executorId = executorId; + this.capacity = executorCapacity; + this.allocated = true; + } + + /** + * simulate task for duration. + * + * @param task task to simulate. + * @param duration The duration of the simulation. + */ + public Map, Long>> simulate(final Task task, final long duration) { + return taskMap.get(task.getTaskId()).simulate(duration); + } + + /** + * assign task to executor. + * + * @param task the task to execute. + */ + public void onTaskReceived(final Task task) { + LOG.info(String.format("task receive %s %s", executorId, task.getTaskId())); + taskMap.put(task.getTaskId(), new TaskHarness(task)); + } + + public void prepare() { + taskMap.values().forEach(TaskHarness::prepare); + } + + + boolean isAllocated() { + return allocated; + } + + /** + * Class for simulating node. + * This class manages resource distribution for each tasks. + */ + private class TaskHarness { + private final Task task; + + // the time it takes until the first tuples is processed, including latency. + private long waitUntilProduceFirstTuple; + + // allocated CPU resource from node. + private long allocatedCPU; + + // CPU utilization of task. + private double cpuUtilization; + + // dstVertexId, srcVertexId pair of stage edge to dataFetcher + private final Map, DataFetchSimulator> dataFetcherMap; + + // srcVertexId of stage edge to writer + private final Map writerMap; + + TaskHarness(final Task task) { + this.task = task; + this.allocatedCPU = 0; + this.waitUntilProduceFirstTuple = Long.MAX_VALUE; + this.cpuUtilization = 0; + this.dataFetcherMap = new HashMap<>(); + this.writerMap = new HashMap<>(); + } + + /** + * Decompose task into a structure that is easy to simulate. + */ + public void prepare() { + final DAG> irDag = + SerializationUtils.deserialize(task.getSerializedIRDag()); + + for (IRVertex irVertex : irDag.getTopologicalSort()) { + if (irVertex instanceof SourceVertex) { + // Source vertex DataFetcher + String vertexId = irVertex.getId(); + dataFetcherMap.put(Pair.of(vertexId, vertexId), + new DataFetchSimulator(Collections.singletonList(nodeName), true)); + } + + // Set Data Fetcher of incoming edge + task.getTaskIncomingEdges() + .stream() + .filter(stageEdge -> stageEdge.getDstIRVertex().getId().equals(irVertex.getId())) // edge to this vertex + .forEach(stageEdge -> { + if (irVertex instanceof OperatorVertex) { + + List nodeNames = getDataFetcherNodeNames(stageEdge); + String srcVertexId = stageEdge.getSrcIRVertex().getId(); + String dstVertexId = irVertex.getId(); + + dataFetcherMap.put(Pair.of(dstVertexId, srcVertexId), new DataFetchSimulator(nodeNames, false)); + } + }); + + // Set Writer + task.getTaskOutgoingEdges() + .stream() + .filter(stageEdge -> stageEdge.getSrcIRVertex().getId().equals(irVertex.getId())) // edge to this vertex + .forEach(stageEdge -> { + if (irVertex instanceof OperatorVertex) { + WriteSimulator writeSimulator = getWriter(stageEdge); + writerMap.put(irVertex.getId(), writeSimulator); + } + }); + } + + // Set CPU utilization of task. + this.cpuUtilization = DEFAULT_TRANSFORM_CPU_UTILIZATION; + } + + /** + * add the number of tuples to process. + * + * @param dstVertexId Vertex id that consume tuples. + * @param srcVertexId Vertex id that process tuples. + * @param srcNodeName the name of node that process tuples. + * @param numOfTuples the number of tuples to process that processed from upstream task. + */ + public void addNumOfTuplesToProcess(final String dstVertexId, + final String srcVertexId, + final String srcNodeName, + final Long numOfTuples) { + dataFetcherMap.get(Pair.of(dstVertexId, srcVertexId)).addNumOfTuplesToProcess(srcNodeName, numOfTuples); + } + + /** + * set allocate CPU resource. + */ + public void setAllocatedCPU(final long cpu) { + this.allocatedCPU = cpu; + } + + /** + * calculate latency. + * + * @return latency of task. + */ + public long calculateExpectedTaskLatency() { + long networkLatency = dataFetcherMap.values().stream() + .map(DataFetchSimulator::getMinimumLatency) + .reduce(Long.MAX_VALUE, Math::min); + + long transformLatency = (long) (1L / (cpuUtilization * allocatedCPU)); // i have to sophisticated here + return networkLatency + transformLatency; // ms + } + + /** + * calculate maximum throughput when there are infinite tuples to process. + * + * @return maximum bandwidth of task. + */ + public double calculateExpectedMaximumTaskThroughput() { + double networkBandwidth = dataFetcherMap.values().stream() + .map(DataFetchSimulator::getMaximumBandwidth) + .flatMapToDouble(DoubleStream::of) + .average() + .orElse(0) / DEFAULT_BYTES_PER_LINE; + + double transformThroughput = cpuUtilization * allocatedCPU; + return Math.min(networkBandwidth, transformThroughput); + } + + /** + * get total number of tuples to process from all data fetchers. + * + * @return the number of tuples to process + */ + public long getTotalNumOfTuplesToProcess() { + return this.dataFetcherMap.values().stream() + .map(DataFetchSimulator::getTotalNumOfTuplesToProcess) + .flatMapToLong(LongStream::of) + .sum(); + } + + /** + * calculates the number of tuples that are processed for duration. + * Consumes the number of processed tuples and returns the number of tuples to transfer to downstream task. + * + * @param duration The duration of the simulation. + * @return the number of tuples per downstream tasks. + */ + public Map, Long>> simulate(final long duration) { + long totalNumOfTupelsToProcess = getTotalNumOfTuplesToProcess(); + long executionDuration; + + // It means there are no tuples to process. + if (totalNumOfTupelsToProcess == 0) { + return new HashMap<>(); + } + + // It means that it starts to process tuples. + if (waitUntilProduceFirstTuple == Long.MAX_VALUE) { + waitUntilProduceFirstTuple = calculateExpectedTaskLatency(); + } + + if (waitUntilProduceFirstTuple < 0) { + executionDuration = duration; + } else { + executionDuration = duration - waitUntilProduceFirstTuple; + } + waitUntilProduceFirstTuple -= duration; + + // It means that it until process first tuples. + if (executionDuration <= 0) { + return new HashMap<>(); + } + + // calculate the number of processed tuples. + + long maximumNumOfProcessedTuple = (long) calculateExpectedMaximumTaskThroughput() * executionDuration; + long numOfprocessedTuple = Math.min(maximumNumOfProcessedTuple, totalNumOfTupelsToProcess); + + // calculate rate to total number of tuples to process. + double consumeRate = (double) numOfprocessedTuple / totalNumOfTupelsToProcess; + + // consume tuples from data fetcher. + long finalNumOfprocessedTuple = dataFetcherMap.values().stream() + .map(dataFetcher -> dataFetcher.consume(consumeRate)) + .flatMapToLong(LongStream::of) + .sum(); + + // return the number of tuples to transfer to downstream tasks. + return writerMap.values().stream() + .map(writer -> writer.write(finalNumOfprocessedTuple)) + .collect(Collectors.toMap(Pair::left, Pair::right)); + } + + /** + * Create writer according to the runtimeEdge. + * + * @param runtimeEdge runtimeEdge. + * @return WriterSimulator corresponds to edge. + */ + public WriteSimulator getWriter(final StageEdge runtimeEdge) { + final Optional comValueOptional = + runtimeEdge.getPropertyValue(CommunicationPatternProperty.class); + final CommunicationPatternProperty.Value comm = comValueOptional.orElseThrow(IllegalStateException::new); + + if (comm == CommunicationPatternProperty.Value.ONE_TO_ONE) { + String taskId = networkSimulator.getDownstreamTaskId(runtimeEdge.getId(), + RuntimeIdManager.getIndexFromTaskId(task.getTaskId())); + return new WriteSimulator(Collections.singletonList(taskId), runtimeEdge.getDstIRVertex().getId(), + runtimeEdge.getSrcIRVertex().getId(), 1); + } + String taskId; + final List taskIds = new LinkedList<>(); + final int numDstTasks = runtimeEdge.getDstIRVertex().getPropertyValue(ParallelismProperty.class).get(); + for (int dstTaskIdx = 0; dstTaskIdx < numDstTasks; dstTaskIdx++) { + taskId = networkSimulator.getDownstreamTaskId(runtimeEdge.getId(), + RuntimeIdManager.getIndexFromTaskId(task.getTaskId())); + taskIds.add(taskId); + } + int partition; + if (comm == CommunicationPatternProperty.Value.BROADCAST) { + partition = 1; + } else { + partition = numDstTasks; + } + return new WriteSimulator(taskIds, runtimeEdge.getDstIRVertex().getId(), + runtimeEdge.getSrcIRVertex().getId(), partition); + } + + /** + * get the list of node name of upstream tasks. + * + * @param runtimeEdge runtimeEdge. + * @return list of node name of upstream task. + */ + public List getDataFetcherNodeNames(final StageEdge runtimeEdge) { + final Optional comValueOptional = + runtimeEdge.getPropertyValue(CommunicationPatternProperty.class); + final CommunicationPatternProperty.Value comm = comValueOptional.orElseThrow(IllegalStateException::new); + + switch (comm) { + case ONE_TO_ONE: + return Collections.singletonList(networkSimulator.getUpstreamNode(runtimeEdge.getId(), + RuntimeIdManager.getIndexFromTaskId(task.getTaskId()))); + case BROADCAST: + case SHUFFLE: + final List nodeNames = new LinkedList<>(); + final int numDstTasks = runtimeEdge.getDstIRVertex().getPropertyValue(ParallelismProperty.class).get(); + for (int dstTaskIdx = 0; dstTaskIdx < numDstTasks; dstTaskIdx++) { + nodeNames.add(networkSimulator.getUpstreamNode(runtimeEdge.getId(), + RuntimeIdManager.getIndexFromTaskId(task.getTaskId()))); + } + return nodeNames; + default: + throw new UnsupportedCommPatternException(new Exception("Communication pattern not supported")); + } + } + } + + /** + * This class manages downstream tasks and edge type. + */ + private class WriteSimulator { + private final List taskIds; + private final String srcVertexId; + private final String dstVertexId; + private final int partition; + + WriteSimulator(final List taskIds, + final String dstVertexId, + final String srcVertexId, + final int partition) { + this.taskIds = taskIds; + this.dstVertexId = dstVertexId; + this.srcVertexId = srcVertexId; + this.partition = partition; + } + + /** + * calculate the number of tuples to transfer to downstream tasks. + * + * @param numOfProcessedTuples the number of processed tuple. + * @return the number of tuples to transfer downstream tasks. + */ + public Pair, Long>> write(final long numOfProcessedTuples) { + Map, Long> numOfProcessedTUples = new HashMap<>(); + long numOfTuplesToTransfer = numOfProcessedTuples / partition; + for (String taskId : taskIds) { + numOfProcessedTUples.put(Pair.of(taskId, dstVertexId), numOfTuplesToTransfer); + } + return Pair.of(srcVertexId, numOfProcessedTUples); + } + } + + /** + * This class manages upstream tasks and node name where it is. + */ + private class DataFetchSimulator { + private final boolean isSource; + private final Map numOfTuplesToProcess; + + DataFetchSimulator(final List nodeNames, final boolean isSource) { + this.numOfTuplesToProcess = new HashMap<>(); + this.isSource = isSource; + for (String srcNodeName : nodeNames) { + if (isSource) { + this.numOfTuplesToProcess.put(srcNodeName, Long.MAX_VALUE); + } else { + this.numOfTuplesToProcess.put(srcNodeName, 0L); + } + } + } + + /** + * add the number of tuples to process. + * + * @param srcNodeName node name of upstream task. + * @param numOfTuplesToAdd the number of tuples to add. + */ + public void addNumOfTuplesToProcess(final String srcNodeName, final long numOfTuplesToAdd) { + long beforeNumOfTuplesToProcess = this.numOfTuplesToProcess.get(srcNodeName); + this.numOfTuplesToProcess.put(srcNodeName, beforeNumOfTuplesToProcess + numOfTuplesToAdd); + } + + /** + * get total number of tuples to process. + */ + public long getTotalNumOfTuplesToProcess() { + if (isSource) { + return Long.MAX_VALUE; + } else { + return this.numOfTuplesToProcess.values().stream().flatMapToLong(LongStream::of).sum(); + } + } + + /** + * calculate current latency considering whether there is tuples to process for each upstream task. + */ + public long getCurrLatency() { + return this.numOfTuplesToProcess.entrySet().stream() + .filter(entry -> entry.getValue() > 0) + .map(Map.Entry::getKey) + .map(srcNodeName -> networkSimulator.getLatency(nodeName, srcNodeName)) + .reduce(Long.MAX_VALUE, Math::min); + } + + /** + * calculate the minimum latency assuming there is tuples to process for every upstream task. + */ + public long getMinimumLatency() { + return this.numOfTuplesToProcess.keySet().stream() + .map(srcNodeName -> networkSimulator.getLatency(nodeName, srcNodeName)) + .reduce(Long.MAX_VALUE, Math::min); + } + + /** + * calculate current latency considering the ratio of tuples to be processed for each upstream task. + */ + public long getCurrBandwidth() { + long numWeighted = 0; + long sum = 0; + + for (Map.Entry entry : numOfTuplesToProcess.entrySet()) { + if (entry.getValue() <= 0) { + continue; + } + + numWeighted += entry.getValue(); + sum += entry.getValue() * networkSimulator.getBandwidth(nodeName, entry.getKey()); + } + + if (numWeighted == 0) { + return 0; + } + return sum / numWeighted; + } + + /** + * calculate Maximum latency assuming that every upstream task has the same number of the tuples to process. + */ + public double getMaximumBandwidth() { + return this.numOfTuplesToProcess.keySet().stream() + .flatMapToLong(srcNodeName -> LongStream.of(networkSimulator.getBandwidth(nodeName, srcNodeName))) + .average() + .orElse(0); + } + + /** + * Consume tuples. + * + * @param consumeRate consume rate to consume tuples. + * @return the number of the tuples that are processed. + */ + public long consume(final double consumeRate) { + long numOfConsumedTuples = 0; + for (String srcNodeName : numOfTuplesToProcess.keySet()) { + long beforeNumOfTuples = numOfTuplesToProcess.get(srcNodeName); + long numOfProcessedTuples = (long) Math.ceil(numOfTuplesToProcess.get(srcNodeName) * consumeRate); + numOfTuplesToProcess.put(srcNodeName, beforeNumOfTuples - numOfProcessedTuples); + numOfConsumedTuples += numOfProcessedTuples; + } + return numOfConsumedTuples; + } + } +} + + + diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/PlanSimulator.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/PlanSimulator.java new file mode 100644 index 0000000000..960a136b76 --- /dev/null +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/PlanSimulator.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nemo.runtime.master.scheduler; + +import org.apache.commons.lang3.SerializationUtils; +import org.apache.nemo.common.Pair; +import org.apache.nemo.common.Util; +import org.apache.nemo.common.exception.*; +import org.apache.nemo.common.ir.executionproperty.ResourceSpecification; +import org.apache.nemo.runtime.common.comm.ControlMessage; +import org.apache.nemo.runtime.common.message.*; +import org.apache.nemo.runtime.common.plan.*; +import org.apache.nemo.runtime.master.resource.DefaultExecutorRepresenter; +import org.apache.nemo.runtime.master.resource.ExecutorRepresenter; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.tang.Configuration; +import org.apache.reef.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Simulator for simulating an execution based on functional model. + * This class manages running environment. + */ +@NotThreadSafe +public abstract class PlanSimulator { + private static final Logger LOG = LoggerFactory.getLogger(PlanSimulator.class.getName()); + private final String resourceSpecificationString; + private final String nodeSpecficationString; + + private final ExecutorService serializationExecutorService; + private final ScheduleSimulator scheduler; + private final ContainerManageSimulator containerManager; + + public PlanSimulator(final ScheduleSimulator scheduler, + final String resourceSpecificationString, + final String nodeSpecificationString) throws Exception { + this.scheduler = scheduler; + this.resourceSpecificationString = resourceSpecificationString; + this.nodeSpecficationString = nodeSpecificationString; + this.containerManager = new ContainerManageSimulator(); + this.serializationExecutorService = new ExecutorServiceSimulator(); + setUpExecutors(); + } + + /** + * Construct virtual executors for simulation. + */ + private void setUpExecutors() throws Exception { + LOG.info("Starts to setup executors"); + long startTimestamp = System.currentTimeMillis(); + this.containerManager.parseNodeSpecificationString(this.nodeSpecficationString); + + final List> resourceSpecs = + Util.parseResourceSpecificationString(resourceSpecificationString); + + final AtomicInteger executorIdGenerator = new AtomicInteger(0); + for (Pair p : resourceSpecs) { + for (int i = 0; i < p.left(); i++) { + final ActiveContext ac = new SimulationEvaluatorActiveContext(executorIdGenerator.getAndIncrement()); + ExecutorRepresenter executorRepresenter = new DefaultExecutorRepresenter(ac.getId(), p.right(), + new SimulationMessageSender(ac.getId(), this.containerManager), ac, serializationExecutorService, ac.getId()); + this.containerManager.allocateExecutor(executorRepresenter.getExecutorId(), p.right().getCapacity()); + this.scheduler.onExecutorAdded(executorRepresenter); + } + } + LOG.info("Time To setup Executors: " + (System.currentTimeMillis() - startTimestamp)); + } + + /** + * Reset the instance to its initial state. + */ + public void reset() throws Exception { + scheduler.reset(); + containerManager.reset(); + setUpExecutors(); + } + + /** + * terminates the instances before terminate. + */ + public void terminate() { + scheduler.terminate(); + serializationExecutorService.shutdown(); + } + + /** + * get serializationExecutorService. + */ + public ExecutorService getSerializationExecutorService() { + return serializationExecutorService; + } + + /** + * get scheduler. + */ + public ScheduleSimulator getScheduler() { + return scheduler; + } + + /** + * get containerManager. + */ + public ContainerManageSimulator getContainerManager() { + return containerManager; + } + + /** + * Submits the {@link PhysicalPlan} to Simulator. + * + * @param plan to execute + * @param maxScheduleAttempt the max number of times this plan/sub-part of the plan should be attempted. + */ + public abstract void simulate(PhysicalPlan plan, int maxScheduleAttempt) throws Exception; + + /** + * Evaluator ActiveContext for the Simulation. + */ + private final class SimulationEvaluatorActiveContext implements ActiveContext { + private final Integer id; + + /** + * Default constructor. + * + * @param id Evaluator ID. + */ + SimulationEvaluatorActiveContext(final Integer id) { + this.id = id; + } + + @Override + public void close() { + // do nothing + } + + @Override + public void submitTask(final Configuration taskConf) { + // do nothing + } + + @Override + public void submitContext(final Configuration contextConfiguration) { + // do nothing + } + + @Override + public void submitContextAndService(final Configuration contextConfiguration, + final Configuration serviceConfiguration) { + // do nothing + } + + @Override + public void sendMessage(final byte[] message) { + // do nothing + } + + @Override + public String getEvaluatorId() { + return getId(); + } + + @Override + public Optional getParentId() { + return null; + } + + @Override + public EvaluatorDescriptor getEvaluatorDescriptor() { + return null; + } + + @Override + public String getId() { + return "Evaluator" + id; + } + } + + /** + * ExecutorService for Simulations. + */ + private final class ExecutorServiceSimulator implements ExecutorService { + + @Override + public void shutdown() { + + } + + @Override + public List shutdownNow() { + return null; + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException { + return false; + } + + @Override + public Future submit(final Callable task) { + return null; + } + + @Override + public Future submit(final Runnable task, final T result) { + return null; + } + + @Override + public Future submit(final Runnable task) { + return null; + } + + @Override + public List> invokeAll(final Collection> tasks) throws InterruptedException { + return null; + } + + @Override + public List> invokeAll(final Collection> tasks, + final long timeout, final TimeUnit unit) + throws InterruptedException { + return null; + } + + @Override + public T invokeAny(final Collection> tasks) + throws InterruptedException, ExecutionException { + return null; + } + + @Override + public T invokeAny(final Collection> tasks, final long timeout, final TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return null; + } + + @Override + public void execute(final Runnable command) { + command.run(); + } + } + + /** + * MessageSender for Simulations. + */ + private final class SimulationMessageSender implements MessageSender { + private final String executorId; + private final ContainerManageSimulator containerManager; + + /** + * Constructor for the message sender that simply passes on the messages, instead of sending actual messages. + * + * @param executorId the simulated executor id of where the message sender communicates from. + * @param containerManager the simulation scheduler to communicate with. + */ + SimulationMessageSender(final String executorId, final ContainerManageSimulator containerManager) { + this.executorId = executorId; + this.containerManager = containerManager; + } + + @Override + public void send(final ControlMessage.Message message) { + switch (message.getType()) { + // Messages sent to the master + case TaskStateChanged: + case ExecutorFailed: + case RunTimePassMessage: + case MetricMessageReceived: + break; + case ScheduleTask: + final ControlMessage.ScheduleTaskMsg scheduleTaskMsg = message.getScheduleTaskMsg(); + final Task task = + SerializationUtils.deserialize(scheduleTaskMsg.getTask().toByteArray()); + this.containerManager.onTaskReceived(this.executorId, task); + break; + // No metric messaging in simulation. + case MetricFlushed: + case RequestMetricFlush: + break; + default: + throw new IllegalMessageException( + new Exception("This message should not be received by Master or the Executor :" + message.getType())); + } + } + + @Override + public CompletableFuture request(final ControlMessage.Message message) { + return null; + } + + @Override + public void close() { + // do nothing. + } + } +} diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ScheduleSimulator.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ScheduleSimulator.java new file mode 100644 index 0000000000..01378f449b --- /dev/null +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/ScheduleSimulator.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nemo.runtime.master.scheduler; + +import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.nemo.common.exception.UnknownExecutionStateException; +import org.apache.nemo.conf.JobConf; +import org.apache.nemo.runtime.common.plan.PhysicalPlan; +import org.apache.nemo.runtime.common.plan.Task; +import org.apache.nemo.runtime.common.state.TaskState; +import org.apache.nemo.runtime.master.PlanStateManager; +import org.apache.nemo.runtime.master.resource.ExecutorRepresenter; +import org.apache.reef.tang.annotations.Parameter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.inject.Inject; +import java.util.*; +import java.util.stream.Collectors; + +/** + * Scheduler for simulating an execution not controlled by the runtime master. + */ +public abstract class ScheduleSimulator implements Scheduler { + /** + * Components related to scheduling the given plan. The role of each class can be found in {@link BatchScheduler}. + */ + private TaskDispatchSimulator taskDispatcher; + private final PendingTaskCollectionPointer pendingTaskCollectionPointer; + private ExecutorRegistry executorRegistry; + private PlanStateManager planStateManager; + + /** + * Components that tell how to schedule the given tasks. + */ + private final SchedulingConstraintRegistry schedulingConstraintRegistry; + private final SchedulingPolicy schedulingPolicy; + /** + * String to generate simulated executors from. + */ + private final String dagDirectory; + + + @Inject + public ScheduleSimulator(final SchedulingConstraintRegistry schedulingConstraintRegistry, + final SchedulingPolicy schedulingPolicy, + @Parameter(JobConf.DAGDirectory.class) final String dagDirectory) { + this.pendingTaskCollectionPointer = PendingTaskCollectionPointer.newInstance(); + this.executorRegistry = ExecutorRegistry.newInstance(); + this.schedulingConstraintRegistry = schedulingConstraintRegistry; + this.schedulingPolicy = schedulingPolicy; + this.dagDirectory = dagDirectory; + this.planStateManager = PlanStateManager.newInstance(dagDirectory); + this.taskDispatcher = new TaskDispatchSimulator(schedulingConstraintRegistry, schedulingPolicy, + pendingTaskCollectionPointer, executorRegistry, planStateManager); + } + + /** + * Reset the instance to its initial state. + */ + public void reset() { + this.terminate(); + this.executorRegistry = ExecutorRegistry.newInstance(); + this.planStateManager = PlanStateManager.newInstance(dagDirectory); + this.pendingTaskCollectionPointer.getAndSetNull(); + this.taskDispatcher = new TaskDispatchSimulator(schedulingConstraintRegistry, schedulingPolicy, + pendingTaskCollectionPointer, executorRegistry, planStateManager); + } + + /** + * get planStateManager. + */ + public PlanStateManager getPlanStateManager() { + return planStateManager; + } + + /** + * get pendingTaskCollectionPointer. + */ + public PendingTaskCollectionPointer getPendingTaskCollectionPointer() { + return pendingTaskCollectionPointer; + } + + /** + * get taskDispatcher. + */ + public TaskDispatchSimulator getTaskDispatcher() { + return taskDispatcher; + } + + /** + * Schedules a given plan. + * + * @param physicalPlan the physical plan to schedule. + * @param maxScheduleAttempt the max number of times this plan/sub-part of the plan should be attempted. + */ + @Override + public abstract void schedulePlan(PhysicalPlan physicalPlan, int maxScheduleAttempt); + + /** + * Update the physical plan and maximum attempt. + * But it is not supported yet. + * + * @param newPhysicalPlan the physical plan to manage. + */ + @Override + public abstract void updatePlan(PhysicalPlan newPhysicalPlan); + + /** + * Called when an executor is added to Simulator. + * + * @param executorRepresenter a representation of the added executor. + */ + @Override + public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) { + executorRegistry.registerExecutor(executorRepresenter); + taskDispatcher.onExecutorSlotAvailable(); + } + + /** + * Called when an executor is removed from Simulator. + * But it is not supported yet. + * + * @param executorId of the executor that has been removed. + */ + @Override + public void onExecutorRemoved(final String executorId) { + // we don't simulate executor removed case yet. + throw new UnsupportedOperationException(); + } + + /** + * Process the RuntimePassMessage. + * + * @param taskId that generated the message. + * @param data of the message. + */ + public void onRunTimePassMessage(final String taskId, final Object data) { + // we don't simulate runtime optimization yet. + throw new UnsupportedOperationException(); + } + + /** + * Handles task state transition notifications sent from executors. + * + * @param executorId the id of the executor where the message was sent from. + * @param taskId whose state has changed + * @param taskAttemptIndex of the task whose state has changed + * @param newState the state to change to + * @param vertexPutOnHold the ID of vertex that is put on hold. It is null otherwise. + */ + @Override + public void onTaskStateReportFromExecutor(final String executorId, + final String taskId, + final int taskAttemptIndex, + final TaskState.State newState, + @Nullable final String vertexPutOnHold, + final TaskState.RecoverableTaskFailureCause failureCause) { + planStateManager.onTaskStateChanged(taskId, newState); + + switch (newState) { + case COMPLETE: + // Do nothing. + break; + case ON_HOLD: + case FAILED: + case SHOULD_RETRY: + // TODO #226: StreamingScheduler Fault Tolerance + throw new UnsupportedOperationException(); + case READY: + case EXECUTING: + throw new RuntimeException("The states READY/EXECUTING cannot occur at this point"); + default: + throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + newState)); + } + } + + /** + * Called to check for speculative execution. + */ + @Override + public void onSpeculativeExecutionCheck() { + // we don't simulate speculate execution yet. + return; + } + + /** + * To be called when a job should be terminated. + * Any clean up code should be implemented in this method. + */ + @Override + public void terminate() { + this.taskDispatcher.terminate(); + this.executorRegistry.terminate(); + } + + /** + * TaskDispatcher for simulating an execution not controlled by the runtime master. + * This class follows the structure of {@link TaskDispatcher}, so when a change has to be made on TaskDispatcher, + * it also means that it should be reflected in this class as well. + */ + static final class TaskDispatchSimulator { + private static final Logger LOG = LoggerFactory.getLogger(TaskDispatcher.class.getName()); + private final PendingTaskCollectionPointer pendingTaskCollectionPointer; + private PlanStateManager planStateManager; + + private final ExecutorRegistry executorRegistry; + private final SchedulingConstraintRegistry schedulingConstraintRegistry; + private final SchedulingPolicy schedulingPolicy; + + @Inject + private TaskDispatchSimulator(final SchedulingConstraintRegistry schedulingConstraintRegistry, + final SchedulingPolicy schedulingPolicy, + final PendingTaskCollectionPointer pendingTaskCollectionPointer, + final ExecutorRegistry executorRegistry, + final PlanStateManager planStateManager) { + this.pendingTaskCollectionPointer = pendingTaskCollectionPointer; + this.planStateManager = planStateManager; + this.executorRegistry = executorRegistry; + this.schedulingPolicy = schedulingPolicy; + this.schedulingConstraintRegistry = schedulingConstraintRegistry; + } + + private void doScheduleTaskList() { + final java.util.Optional> taskListOptional = pendingTaskCollectionPointer.getAndSetNull(); + if (!taskListOptional.isPresent()) { + // Task list is empty + LOG.debug("PendingTaskCollectionPointer is empty. Awaiting for more Tasks..."); + return; + } + + final Collection taskList = taskListOptional.get(); + final List couldNotSchedule = new ArrayList<>(); + for (final Task task : taskList) { + if (!planStateManager.getTaskState(task.getTaskId()).equals(TaskState.State.READY)) { + // Guard against race conditions causing duplicate task launches + LOG.debug("Skipping {} as it is not READY", task.getTaskId()); + continue; + } + + executorRegistry.viewExecutors(executors -> { + final MutableObject> candidateExecutors = new MutableObject<>(executors); + // Filter out the candidate executors that do not meet scheduling constraints. + task.getExecutionProperties().forEachProperties(property -> { + final Optional constraint = schedulingConstraintRegistry.get(property.getClass()); + if (constraint.isPresent() && !candidateExecutors.getValue().isEmpty()) { + candidateExecutors.setValue(candidateExecutors.getValue().stream() + .filter(e -> constraint.get().testSchedulability(e, task)) + .collect(Collectors.toSet())); + } + }); + if (!candidateExecutors.getValue().isEmpty()) { + // Select executor + final ExecutorRepresenter selectedExecutor + = schedulingPolicy.selectExecutor(candidateExecutors.getValue(), task); + // update metadata first + planStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING); + + LOG.info("{} scheduled to {}", task.getTaskId(), selectedExecutor.getExecutorId()); + // send the task + selectedExecutor.onTaskScheduled(task); + } else { + couldNotSchedule.add(task); + } + }); + } + + LOG.debug("All except {} were scheduled among {}", new Object[]{couldNotSchedule, taskList}); + if (couldNotSchedule.size() > 0) { + // Try these again, if no new task list has been set + pendingTaskCollectionPointer.setIfNull(couldNotSchedule); + } + } + + /** + * Signals to the condition on executor slot availability. + */ + void onExecutorSlotAvailable() { + doScheduleTaskList(); + } + + /** + * Signals to the condition on the Task collection availability. + */ + void onNewPendingTaskCollectionAvailable() { + doScheduleTaskList(); + } + + void terminate() { + doScheduleTaskList(); + } + } +} diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingPlanSimulator.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingPlanSimulator.java new file mode 100644 index 0000000000..bfb0ff3321 --- /dev/null +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingPlanSimulator.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nemo.runtime.master.scheduler; + +import org.apache.nemo.conf.JobConf; +import org.apache.nemo.runtime.common.metric.Metric; +import org.apache.nemo.runtime.common.plan.PhysicalPlan; +import org.apache.nemo.runtime.common.plan.Task; +import org.apache.nemo.runtime.master.metric.MetricStore; +import org.apache.reef.tang.annotations.Parameter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.List; + +/** + * Simulator for stream processing. + */ +public class StreamingPlanSimulator extends PlanSimulator { + private static final Logger LOG = LoggerFactory.getLogger(StreamingPlanSimulator.class.getName()); + + /** + * The metric store for the simulation. + */ + private MetricStore metricStore; + + /** + * The below variables depend on the submitted plan to execute. + */ + @Inject + public StreamingPlanSimulator(final ScheduleSimulator scheduler, + @Parameter(JobConf.ExecutorJSONContents.class) final String resourceSpecificationString, + @Parameter(JobConf.NodeSpecJsonContents.class) final String nodeSpecificationString) + throws Exception { + super(scheduler, resourceSpecificationString, nodeSpecificationString); + LOG.info("Start StreamPlanSimulator"); + this.metricStore = MetricStore.newInstance(); + } + + /** + * Reset the instance to its initial state. + */ + public void reset() throws Exception { + super.reset(); + this.terminate(); + metricStore = MetricStore.newInstance(); + } + + /** + * Simulate Plan. + * + * @param plan to execute + * @param maxScheduleAttempt the max number of times this plan/sub-part of the plan should be attempted. + */ + public void simulate(final PhysicalPlan plan, + final int maxScheduleAttempt) throws Exception { + // distribute tasks to executors + getScheduler().schedulePlan(plan, maxScheduleAttempt); + + // prepare tasks to simulate. + getContainerManager().prepare(); + + // distribute resources to tasks. + getContainerManager().getNodeSimulators().forEach(NodeSimulator::distributeResource); + List tasks = getContainerManager().getTasks(); + + // get topological sorted tasks. + List sortedTasks = new ArrayList<>(); + plan.getStageDAG().getTopologicalSort().forEach(stage -> { + for (Task task : tasks) { + if (task.getStageId().equals(stage.getId())) { + sortedTasks.add(task); + } + } + }); + + long startTimestamp = System.currentTimeMillis(); + + // TODO XXX: Interval and maximum timestamp should be researched. + long maximumTimeStamp = 200000; + long timestamp = 0; + long duration = 2000; + + LOG.info("Start simulation"); + + // iterate until timestamp reached maximumTimeStamp; + while (maximumTimeStamp > timestamp) { + for (Task task : sortedTasks) { + getContainerManager().simulate(task, duration); + } + timestamp += duration; + } + LOG.info("Time to simulate: " + (System.currentTimeMillis() - startTimestamp)); + } + + /** + * Record metrics in the metricStore. + */ + public void onMetricMessageReceived(final String metricType, + final String metricId, + final String metricField, + final byte[] metricValue) { + final Class metricClass = metricStore.getMetricClassByName(metricType); + metricStore.getOrCreateMetric(metricClass, metricId).processMetricMessage(metricField, metricValue); + } + + /** + * get throughput. + */ + // TODO XXX: Implement this method + public float getThroughput() { + return 0; + } + + /** + * get latency. + */ + // TODO XXX: Implement this method + public float getLatency() { + return 0; + } +} diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingScheduleSimulator.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingScheduleSimulator.java new file mode 100644 index 0000000000..8e328e52a0 --- /dev/null +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingScheduleSimulator.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nemo.runtime.master.scheduler; + +import org.apache.nemo.common.ir.Readable; +import org.apache.nemo.conf.JobConf; +import org.apache.nemo.runtime.common.RuntimeIdManager; +import org.apache.nemo.runtime.common.plan.*; +import org.apache.reef.tang.annotations.Parameter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import java.util.*; +import java.util.stream.Collectors; + +/** + * Scheduler for stream processing. This class follows the structure of + * {@link StreamingScheduler}, so when a change has to be made on StreamingScheduler, it also means that it should be + * reflected in this class as well. + */ +public final class StreamingScheduleSimulator extends ScheduleSimulator { + private static final Logger LOG = LoggerFactory.getLogger(StreamingScheduleSimulator.class.getName()); + + @Inject + private StreamingScheduleSimulator(final SchedulingConstraintRegistry schedulingConstraintRegistry, + final SchedulingPolicy schedulingPolicy, + @Parameter(JobConf.DAGDirectory.class) final String dagDirectory) { + super(schedulingConstraintRegistry, schedulingPolicy, dagDirectory); + } + + /** + * The entrance point of the simulator. Simulate a plan by submitting a plan through this method. + * + * @param submittedPhysicalPlan the plan to simulate. + * @param maxScheduleAttempt the max number of times this plan/sub-part of the plan should be attempted. + */ + @Override + public void schedulePlan(final PhysicalPlan submittedPhysicalPlan, final int maxScheduleAttempt) { + long startTimeStamp = System.currentTimeMillis(); + + // Housekeeping stuff + getPlanStateManager().updatePlan(submittedPhysicalPlan, maxScheduleAttempt); + getPlanStateManager().storeJSON("submitted"); + + // Prepare tasks + final List allStages = submittedPhysicalPlan.getStageDAG().getTopologicalSort(); + final List allTasks = allStages.stream().flatMap(stageToSchedule -> { + // Helper variables for this stage + final List stageIncomingEdges = + submittedPhysicalPlan.getStageDAG().getIncomingEdgesOf(stageToSchedule.getId()); + final List stageOutgoingEdges = + submittedPhysicalPlan.getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId()); + final List> vertexIdToReadables = stageToSchedule.getVertexIdToReadables(); + final List taskIdsToSchedule = getPlanStateManager().getTaskAttemptsToSchedule(stageToSchedule.getId()); + + // Create tasks of this stage + return taskIdsToSchedule.stream().map(taskId -> new Task( + submittedPhysicalPlan.getPlanId(), + taskId, + stageToSchedule.getExecutionProperties(), + stageToSchedule.getSerializedIRDAG(), + stageIncomingEdges, + stageOutgoingEdges, + vertexIdToReadables.get(RuntimeIdManager.getIndexFromTaskId(taskId)))); + }).collect(Collectors.toList()); + + // Schedule everything at once + getPendingTaskCollectionPointer().setToOverwrite(allTasks); + getTaskDispatcher().onNewPendingTaskCollectionAvailable(); + LOG.info("Time to assign task to executor: " + (System.currentTimeMillis() - startTimeStamp)); + } + + @Override + public void updatePlan(final PhysicalPlan newPhysicalPlans) { + // TODO #227: StreamingScheduler Dynamic Optimization + throw new UnsupportedOperationException(); + } +}