-
Notifications
You must be signed in to change notification settings - Fork 24
Proposals for module loader for lambdas in the server
See Proposal 5 below for final working proposal.
We have been having several design discussions around Issue 12 and wanted to have a place to capture the architectural proposals/ideas that we are investigating as it is a relatively complicated problem to solve. The main crux of the issue is we need a way to pass any modules required by a user that are used in lambda functions from the driver to the worker node that will execute the function. For example consider the following:
var sc = new SparkContext("local[*]", "Linear Regression Test");
var data = sc.textFile("examples/data/lpsa.data").cache();
var scopeVars = {};
var parsedData = data.map( function(s) {
var parts = s.split(",");
var features = parts[1].split(" ");
return new LabeledPoint(parts[0], new DenseVector(features));
});
var t = parsedData.take(5);
print("take 5 = " + JSON.stringify(parsedData.take(5)));The LabeledPoint and DenseVector used in the map function need to be "required" so they are available on the worker node.
We investigated the use of "amp/requires", the key drawback to this approach is knowledge of the variable that the required result is assigned to for use in the LAMDA functions.
var myLP = require("LabeledPoint");
var DenseVector = require("DenseVector");
var sc = new SparkContext("local[*]", "Linear Regression Test");
var data = sc.textFile("examples/data/lpsa.data").cache();
var scopeVars = {};
var parsedData = data.map( function(s) {
var parts = s.split(",");
var features = parts[1].split(" ");
/*
* We need to know that the user assigned LabeledPoint to myLP and duplicate the
* the require in the LAMDA
*/
return new myLP(parts[0], new DenseVector(features));
});
var t = parsedData.take(5);
print("take 5 = " + JSON.stringify(parsedData.take(5)));- Have the user 'require' modules needed by lambda functions in the lambda functions
- We currently load all of our Spark files in SparkBootstrap.js so they are all available on the driver. This will stay the same.
- Add a method to our SparkContext wrapper for which the user can add files to be used in lambda functions (e.g. SparkContext.addJavaScriptFile(pathToLocalFile)). The user will only have to call our addJavaScriptFile for only those modules used directly in lambdas. This function will:
- Call SparkContext.addFile() so that it will be available to the worker nodes that execute the lambda functions.
- Keep track of what JavaScript modules have been added (e.g. 'required') by lambda functions (by className?).
- When we construct the wrapper for the lambda function we will consult SparkContext (or Utils??) to see what modules have been required for it and use SparkFiles.get(fileName) to find it's download location.
- In org.eclairjs.nashorn.Utils.javaToJs() we invoke the JS function src.main.resources.Utils.createJavaWrapperObject() where we know the className that can be used to lookup what modules need to be added from previous addJavaScriptFile calls.
- We currently load all of our Spark files in SparkBootstrap.js so they are all available on the driver. This will stay the same.
- With the code delivered for Issue 111 we now have the ability to bind variables to lambda functions. Since a 'require' is really just a variable declaration we can use this solution to pass module bindings to lambda functions that use them.
- This improves on proposal 1 because the require will only have to be done once at the top of the file and can be bound to multiple lambdas if need be whereas with proposal 1 the user would have to setup variable declaration multiple times if same module was needed for more than one lambda function.
- This improves on proposal 2 because only the lambda functions that actually need them will get the modules.
- When we construct the JSFunction everything we need will be bound to the lambda so we will know what modules need to be loaded on the worker node.
This seems to work for simple cases (see example). Have had to create our own require (just using jvm-npm with some minor modifications for now). Can require a simple module and assign it to a var that gets bound to a lambda function and serialized as a string that is then converted back into JS in org/eclairjs/nashorn/Utils.javajavaToJs(). Problem still remains for more complicated required modules (that are proper classes) like mllib/regression/LabeledPoint.js. Can serialize the class (to a string only) but deserialization remains an issue. This work is being done on 'lambdaReqiure' branch. The basic flow is this:
- Module gets 'required' and passed through lambda as bound argument e.g.: from linearregression.js
var LabeledPoint = require('mllib/regression/LabeledPoint');
var parsedData = data.map(function(s, LabeledPoint) {
var parts = s.split(",");
var features = parts[1].split(" ");
return new LabeledPoint(parts[0], new DenseVector(features));
}, [LabeledPoint]);- Under the lambda function (e.g. in this case RDD.map()) we call createLambdaFunction() in Utils.js which examines and unwraps any bound arguments. Here we can recognize the bound module and serialize it as a String. It is not a live object at this point because we don't have an instance of it yet (recall user is passing it through to use inside of lambda function) so I think all we can do is serialize it as a String although it does implement JavaWrapper (once it's a live object) and does have a member function getJavaObject(). I think this is one of the stumbling points e.g. in Utils.unwrapObject() the else gets executed on the return:
return (obj && obj.getJavaObject) ? obj.getJavaObject() : obj;- On the deserialization we have a similar problem. In Utils.javaToJs we drop all the way into the final else (and this is obviously because it's getting serialized as java.lang.String instead of it's proper type) and then Utils.convertJavaStringToJSFunction() is invoked on it to make it an instance of the module class (which I actually think is working), but then if the user goes to do a "new" on the required class/module to return out of the lambda as with the example above, we invoke Utils.jsToJava where again getJavaObject() is not a member because we don't have that live instance of it yet as user is "newing" it, so it falls through to the
javascript else if(o instanceof JSObject)which is not what we want.
Both simple and linearregression requires bound to lambda functions are working with a slight modification to the above 3rd proposal. The route to get it working is the following:
- Pulled in Brian's work that does serialization of args and invocation of lambda functions all on JS side.
- Still supplying our own 'require'
- This is still being done using jvm-npm but with a few minor modifications.
- Using 'Module' to keep track of module body, name and exports. Returning entire module on require.
- Keeping track of required modules with Utils.addRequiredFile
- Dependency modules (e.g. LabeledPoint needs to require JavaWrapper) are correctly being added to list of required files when parent required file is being read in (e.g. JavaWrapper is added in Utils when added 'require' is encountered upon reading in LabeledPoint). Still need to pass dependencies to lambda function - right now they are still in SparkBootstrap - Need to go through and make sure all dependencies are properly 'required'
- Using argBinding of modules to lambda functions as originally proposed.
- In evaluating argBindings if it's discovered that arg is a module by consulting Utils.isRequiredFile it is serialized in Serialize.jsModule(https://github.com/EclairJS/eclairjs-nashorn/blob/lambdaRequire/src/main/resources/Serialize.js) by doing an 'eval' on the module body to get the JS code into the Nashorn JavaScript engine for the current worker node and then it's exports are returned from the serialization function to be passed as args to lambda function (this actually seems to be working decently).
Note: There is an error that is encountered on the 'parsedData.take(5)' in the linearregression.js example (after the map call - require test). It seems to be on the Serialize.javaToJs(value) in RDD.take() while looping through the results. In putting LabeledPoint back into SparkBootstrap and not using module binding to lambda function the same error persists. This needs to be further investigated to make sure the example works in it's entirety although it doesn't seem to be a result of the require code.
Note2: If we go this route we will still need to make sure all of our modules properly require their dependencies and use module.exports
Both simple and linearregression requires bound to lambda functions are working with a slight modification to the above 3rd proposal. The route to get it working is the following:
- Pulled in Brian's work that does serialization of args and invocation of lambda functions all on JS side.
- Still supplying our own 'require'
- This is still being done using jvm-npm but with a few minor modifications.
- Using 'Module' to keep track of module body, name and exports. Returning entire module on require.
- Verified EclairJS modules are getting loaded out of EclaisJS JAR and not off file system.
- Keeping track of required modules with Utils.addRequiredFile
- Dependency modules (e.g. LabeledPoint needs to require JavaWrapper) are correctly being added to list of required files when parent required file is being read in (e.g. JavaWrapper is added in Utils when added 'require' is encountered upon reading in LabeledPoint). Still need to pass dependencies to lambda function - right now they are still in SparkBootstrap - Need to go through and make sure all dependencies are properly 'required'
- Using argBinding of modules to lambda functions as originally proposed.
- In evaluating argBindings if it's discovered that arg is a module by consulting Utils.isRequiredFile it is serialized in Serialize.jsModule(https://github.com/EclairJS/eclairjs-nashorn/blob/lambdaRequire/src/main/resources/Serialize.js) by doing an 'eval' on the module body to get the JS code into the Nashorn JavaScript engine for the current worker node and then it's exports are returned from the serialization function to be passed as args to lambda function (this actually seems to be working decently).
- To fix 'parsedData.take(5)' in the linearregression.js example had to slightly modify RDD.take() to call Serialize.javaToJs(value) on the JSList and let Serialize.javaToJs Serialize each individual element in the list (e.g. do it all in Serialize function).
- Had to add TypeError exception check in Serialize.javaSparkObject() such that if we have a case for a lambda like:
var valuesAndPreds = parsedData.mapToPair(function(lp, linearRegressionModel, delta) {
<function body>
}, [linearRegressionModel, delta]);where the first argument is the data for the function that is an instance of a class for which we have a 'require' for the module (such is the case in the example where 'lp' is of type LabeledPoint). The lambda would not have the LabeledPoint as an available class at the time of evaluation in javaSparkObject() even if it was a bound module argument (that would get evaluated after the 'lp' argument) so had to add check for TypeError exception when we try and do a 'new' on the Spark class. 6. In order to get 'dependency requires' working (for example LabeledPoint is dependent on JavaWrapper which in turn is dependent on Logger) had to add to Utils.addRequiredFile such that when a module is encountered that has a parent (e.g. it's in the dependency chain) it is loaded into the Nashorn ScriptEngin so it is available to the parent module when it is evaluated. This seems to be working nicely even for a 3-level deep dependency chain.
Note: Given #6 above have to go through all of our Spark wrapper objects and make sure all of our modules properly require their dependencies and use module.exports
When testing with cluster found that scripts were not getting loaded into Nashorn engine of Spark worker node thus proposal 4 is a modification to proposal 3 to accommodate this without having to serialize entire JS module body which could get quite large with dependency requires. We have to introduce use of SparkFiles since it appears a Nashorn only solution is not going to completely work without large overhead.
Similar to proposal 3 above steps under investigation:
- Have our own require to figure out modules to load for lambdas and keep track of them
- Bind modules as args to lambdas that need them
- Serialize module path/load information so it can be found
- On createLambdaFunction call pass SparkContext along for current app
- Use SparkContext.addFile to add JS required files for Spark job to run on worker node before it gets invoked as Java Function wrapper.
- Once it passes thru Java Function wrapper via Invocable into Utils_invoke SparkFiles.get can be used to get the location of the required scripts that were downloaded into the worker node and then loaded (via eval) into the Nashorn engine as before when a bound module is found. Difference is instead of JS body being serialized as part of the bound arg, it is retrieved via SparkFiles.
This is the final architecture that we found to work (on branch load-module) and that got merged to master:
- Have our own require to figure out modules to load for lambdas and keep track of them (a version of jvm-npm got used for this)
- Have user "require" any module they will use at top of file
- module.exports get assigned to user variable name
- All EclairJS modules require what they themselves need/use and define their exports.
- Rule of thumb for good module definition is one export per module.
- Bind modules as args to lambdas that need them
- Serialize module path/load information so it can be found
- On createLambdaFunction pass SparkContext along for current app
- SparkContext is used to determine if user is in clustered environment and if so prepares any custom modules for download to worker node
- SparkContext.addFile is used to add JS required files for Spark job to run on worker node before it gets invoked as Java Function wrapper (custom modules are zipped up to preserve path information)
- Once it passes thru Java Function wrapper via Invocable into Utils_invoke SparkFiles.get is used to get the location of any custom modules passed via zipfile that was downloaded into the worker node. Zipfile is unzipped and modules used by lambda function are loaded.
- Both driver and worker nodes still get entire EclairJS JAR, but only what is required by the running code (either local or just lambda on worker node) is loaded to reduce the memory print and only load what is needed.