diff --git a/hadoopy/_runner.py b/hadoopy/_runner.py index 8902727..7c7704b 100644 --- a/hadoopy/_runner.py +++ b/hadoopy/_runner.py @@ -25,6 +25,7 @@ import tempfile import hadoopy._freeze +import pdb def _find_hstreaming(): """Finds the whole path to the hadoop streaming jar. @@ -39,19 +40,29 @@ def _find_hstreaming(): try: search_root = os.environ['HADOOP_HOME'] except KeyError: - search_root = '/' - cmd = 'find %s -name hadoop*streaming*.jar' % (search_root) - p = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, + # cloudera default install + if os.path.isdir('/usr/lib/hadoop'): + sys.stderr.write('HADOOP_HOME not set, found /usr/lib/hadoop, searching for streaming jar\n') + search_root = '/usr/lib/hadoop' + else: + sys.stderr.write('HADOOP_HOME not set, falling back to /, please set HADOOP_HOME\n') + search_root = '/' + cmd = "find -L %s -name 'hadoop*streaming*.jar'" % (search_root) + p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE, stderr=subprocess.PIPE) - return p.communicate()[0].split('\n')[0] + streamingjar = p.communicate()[0].split('\n')[0] + if not os.path.isfile(streamingjar): + raise Exception('cannot find streaming jar') + else: + return streamingjar def launch(in_name, out_name, script_path, mapper=True, reducer=True, combiner=False, partitioner=False, files=(), jobconfs=(), cmdenvs=(), copy_script=True, hstreaming=None, name=None, - use_typedbytes=True, use_seqoutput=True, use_autoinput=True, - pretend=False, add_python=True, config=None, - python_cmd="python", num_mappers=None, num_reducers=None, + use_typedbytes=False, use_seqoutput=False, use_autoinput=True, + pretend=False, add_python=False, config=None, + python_cmd=None, num_mappers=None, num_reducers=None, script_dir='',**kw): """Run Hadoop given the parameters @@ -98,12 +109,14 @@ def launch(in_name, out_name, script_path, mapper=True, reducer=True, subprocess.CalledProcessError: Hadoop error. OSError: Hadoop streaming not found. """ - try: + if hstreaming: hadoop_cmd = 'hadoop jar ' + hstreaming - except TypeError: + else: hadoop_cmd = 'hadoop jar ' + _find_hstreaming() job_name = os.path.basename(script_path) if add_python: + if not python_cmd: + python_cmd = sys.executable script_name = '%s %s' % (python_cmd, os.path.basename(script_path)) else: script_name = os.path.basename(script_path) @@ -120,7 +133,7 @@ def launch(in_name, out_name, script_path, mapper=True, reducer=True, if isinstance(in_name, str): in_name = [in_name] for f in in_name: - cmd += ['-input', f] + cmd += ['-input', "'%s'" % (f,)] # Add mapper/reducer cmd += ['-mapper', '"%s"' % (mapper)] @@ -137,9 +150,9 @@ def launch(in_name, out_name, script_path, mapper=True, reducer=True, cmd += ['-partitioner', '"%s"' % (partitioner)] if num_mappers: - cmd += ['-numMapTasks', "'%i'"%(int(num_mappers))] + cmd += ['-numMapTasks', '%i' % (int(num_mappers),)] if num_reducers: - cmd += ['-numReduceTasks', "'%i'"%(int(num_reducers))] + cmd += ['-numReduceTasks', '%i' %(int(num_reducers),)] # Add files if isinstance(files, str): files = [files] @@ -157,7 +170,7 @@ def launch(in_name, out_name, script_path, mapper=True, reducer=True, del new_files # END BUG for f in files: - cmd += ['-file', f] + cmd += ['-file', "'%s'" % (f,)] # Add jobconfs if isinstance(jobconfs, str): jobconfs = [jobconfs]