diff --git a/dga-giraph/build.gradle b/dga-giraph/build.gradle index 841252f..d3477e2 100644 --- a/dga-giraph/build.gradle +++ b/dga-giraph/build.gradle @@ -22,12 +22,12 @@ configurations { dependencies { compile project(':dga-core') - compile('org.apache.giraph:giraph-core:1.1.0-hadoop2') { + compile('org.apache.giraph:giraph-core:1.2.0-SNAPSHOT') { exclude module: 'guava' exclude module: 'zookeeper' } hadoopProvided group: 'org.apache.hadoop', name: 'hadoop-client', version: cdh_version - testCompile 'org.apache.giraph:giraph-core:1.1.0-hadoop2' + testCompile 'org.apache.giraph:giraph-core:1.2.0' testCompile "org.mockito:mockito-core:1.9.5" testCompile 'junit:junit:4.11' testCompile 'commons-httpclient:commons-httpclient:3.0.1' @@ -50,7 +50,6 @@ task distConf(dependsOn: 'assemble', type: Copy) { task dist(dependsOn: 'distJars', type: Copy) { from "src/main/resources/" - include "dga-mr1-giraph" - include "dga-yarn-giraph" + include "dga-*-giraph" into "${buildDir}/dist/bin" } diff --git a/dga-giraph/gradle.properties b/dga-giraph/gradle.properties index 42e2cef..9baf37f 100644 --- a/dga-giraph/gradle.properties +++ b/dga-giraph/gradle.properties @@ -1,2 +1,2 @@ -cdh5version=2.6.0-cdh5.4.0 +cdh5version=2.6.0-cdh5.4.2 cdh4version=2.0.0-mr1-cdh4.7.0 diff --git a/dga-giraph/src/main/java/com/soteradefense/dga/BfsTree/BfsTreeComputation.java b/dga-giraph/src/main/java/com/soteradefense/dga/BfsTree/BfsTreeComputation.java new file mode 100644 index 0000000..b31ec82 --- /dev/null +++ b/dga-giraph/src/main/java/com/soteradefense/dga/BfsTree/BfsTreeComputation.java @@ -0,0 +1,102 @@ +package com.soteradefense.dga.BfsTree; + +import com.soteradefense.dga.DGALoggingUtil; +import org.apache.giraph.comm.WorkerClientRequestProcessor; +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.GraphTaskManager; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.worker.WorkerContext; +import org.apache.giraph.worker.WorkerGlobalCommUsage; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; + +/** + * + * Starting from a provided search key (vertex ID) we create a BFS tree from the graph. + * + * All vertex ids must be > 0 for valid results due to default vertex values being set to 0 and + * vertices with no output edges aren't loaded in step 0, so their default vertex (parent id) value can + * not be reliable set to -1. Instead we must check for parent values < 1 and set them explicitly to -1 at each step. + * + * input: int value csv. + * + * Created by ekimbrel on 9/24/15. + */ +public class BfsTreeComputation extends BasicComputation { + + private static final Logger logger = LoggerFactory.getLogger(BfsTreeComputation.class); + + @Override + public void initialize(GraphState graphState, WorkerClientRequestProcessor workerClientRequestProcessor, GraphTaskManager graphTaskManager, WorkerGlobalCommUsage workerGlobalCommUsage, WorkerContext workerContext) { + super.initialize(graphState, workerClientRequestProcessor, graphTaskManager, workerGlobalCommUsage, workerContext); + DGALoggingUtil.setDGALogLevel(this.getConf()); + } + + @Override + public void compute(Vertex vertex, Iterable messages) throws IOException { + + long step = getSuperstep(); + int thisVertexId = vertex.getId().get(); + if (thisVertexId < 1){ + throw new IllegalStateException("Invalid vertex id: all ids must be > 0 for bfs-tree-computation"); + } + + + // on step 0 send original messages from root node to its adjacent nodes. + if (0 == step){ + + // default all parent values to -1 + vertex.setValue(new IntWritable(-1)); + + // get the search key from the global aggregator (set by BfsTreeMasterCompute) + int searchKey = ( (IntWritable) getAggregatedValue(BfsTreeMasterCompute.SEARCH_KEY_AGG)).get(); + + // if the search key matches this vertex set the partent to itself + // and send out thisVertexId to all adjacent nodes. + if (searchKey == thisVertexId){ + vertex.setValue(new IntWritable(thisVertexId)); + this.sendMessageToAllEdges(vertex,vertex.getId()); + } + + } + + /* + on each step after step 0: + if this node gets any messages: + if this nodes parent is -1 + set its parent to the first message value + send out this nodes id to all adjacent nodes + */ + + else { + + // 0 can be used as a default value for verticies that aren't loaded until later in the computation due to have no out edges + // as a result we must check and replace any 0's with -1, and all graph vertices must have id >= 1 + if (vertex.getValue().get() < 1){ + vertex.setValue(new IntWritable(-1)); + } + + int thisParentId = vertex.getValue().get(); + if (-1 == thisParentId){ + // to ensure consistent results on multiple runs take the max value + int maxValue = -1; + for (IntWritable message: messages) maxValue = Math.max(maxValue,message.get()); + + if (maxValue > -1){ + vertex.setValue(new IntWritable(maxValue)); + sendMessageToAllEdges(vertex,vertex.getId()); + } + } + } + + // all nodes vote to halt after every super step. + vertex.voteToHalt(); + + } +} diff --git a/dga-giraph/src/main/java/com/soteradefense/dga/BfsTree/BfsTreeMasterCompute.java b/dga-giraph/src/main/java/com/soteradefense/dga/BfsTree/BfsTreeMasterCompute.java new file mode 100644 index 0000000..fb2f32e --- /dev/null +++ b/dga-giraph/src/main/java/com/soteradefense/dga/BfsTree/BfsTreeMasterCompute.java @@ -0,0 +1,47 @@ +package com.soteradefense.dga.BfsTree; + +import com.soteradefense.dga.DGALoggingUtil; +import org.apache.giraph.master.DefaultMasterCompute; +import org.apache.hadoop.io.IntWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.giraph.aggregators.IntOverwriteAggregator; + +/** + * Created by ekimbrel on 9/24/15. + * + * For algorithim description see BfsTreeComputation. + * + * The job of the master compute class in the BFS tree computation is to set the initial search key + * in a global aggregator before super step 0. + * + * + */ +public class BfsTreeMasterCompute extends DefaultMasterCompute { + + + private static final Logger logger = LoggerFactory.getLogger(BfsTreeMasterCompute.class); + public static final String SEARCH_KEY_AGG = "com.soteradefense.dga.BfsTree.searchKeyAggregator"; + public static final String SEARCH_KEY_CONF = "dga.bfstree.searchkey"; + private int searchKey; + + + @Override + public void initialize() throws InstantiationException, IllegalAccessException { + DGALoggingUtil.setDGALogLevel(this.getConf()); + registerAggregator(SEARCH_KEY_AGG,IntOverwriteAggregator.class); + searchKey = Integer.parseInt(getConf().get(SEARCH_KEY_CONF,"-1")); + if (searchKey == -1){ + throw new IllegalArgumentException("Search Key value must be set to a postive integer. set -ca "+SEARCH_KEY_CONF+"= when running BfsTree"); + } + } + + @Override + public void compute() { + if (0 == this.getSuperstep()){ + setAggregatedValue(SEARCH_KEY_AGG,new IntWritable(searchKey)); + } + } + + +} diff --git a/dga-giraph/src/main/java/com/soteradefense/dga/DGACommandLineUtil.java b/dga-giraph/src/main/java/com/soteradefense/dga/DGACommandLineUtil.java index 230cbe7..a0c1769 100644 --- a/dga-giraph/src/main/java/com/soteradefense/dga/DGACommandLineUtil.java +++ b/dga-giraph/src/main/java/com/soteradefense/dga/DGACommandLineUtil.java @@ -43,7 +43,9 @@ public static void printUsageAndExit(Options options, int exitCode) { "\t\thbse - High Betweenness Set Extraction\n" + "\t\twcc - Weakly Connected Components\n" + "\t\tlc - Leaf Compression\n" + - "\t\tpr - Page Rank\n"; + "\t\tpr - Page Rank\n" + + "\t\ttricount - Triangle Counting\n" + + "\t\tbfstree - BFS Tree\n"; HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(commandLine, options); System.exit(exitCode); diff --git a/dga-giraph/src/main/java/com/soteradefense/dga/DGARunner.java b/dga-giraph/src/main/java/com/soteradefense/dga/DGARunner.java index 6916589..fcb2d32 100644 --- a/dga-giraph/src/main/java/com/soteradefense/dga/DGARunner.java +++ b/dga-giraph/src/main/java/com/soteradefense/dga/DGARunner.java @@ -19,13 +19,21 @@ package com.soteradefense.dga; +import com.soteradefense.dga.BfsTree.BfsTreeComputation; +import com.soteradefense.dga.BfsTree.BfsTreeMasterCompute; +import com.soteradefense.dga.combiners.IntIntMaxMessageCombiner; import com.soteradefense.dga.hbse.HBSEComputation; import com.soteradefense.dga.hbse.HBSEConfigurationConstants; import com.soteradefense.dga.hbse.HBSEMasterCompute; import com.soteradefense.dga.io.formats.*; import com.soteradefense.dga.lc.LeafCompressionComputation; +import com.soteradefense.dga.pr.NormalizedPageRankComputation; import com.soteradefense.dga.pr.PageRankComputation; import com.soteradefense.dga.pr.PageRankMasterCompute; +import com.soteradefense.dga.scan1.Scan1Computation; +import com.soteradefense.dga.triangles.TriangleCountCombiner; +import com.soteradefense.dga.triangles.TriangleCountComputation; +import com.soteradefense.dga.triangles.TriangleCountMasterCompute; import com.soteradefense.dga.wcc.WeaklyConnectedComponentComputation; import org.apache.commons.cli.Options; import org.apache.giraph.GiraphRunner; @@ -49,6 +57,10 @@ public class DGARunner { supportedAnalytics.add("hbse"); supportedAnalytics.add("lc"); supportedAnalytics.add("pr"); + supportedAnalytics.add("pr-norm"); + supportedAnalytics.add("scan1"); + supportedAnalytics.add("tricount"); + supportedAnalytics.add("bfstree"); } public void run(String[] args) throws Exception { @@ -162,8 +174,85 @@ public void run(String[] args) throws Exception { String[] giraphArgs = finalConf.convertToCommandLineArguments(PageRankComputation.class.getCanonicalName()); System.exit(ToolRunner.run(new GiraphRunner(), giraphArgs)); + } else if (analytic.equals("pr-norm")) { + + logger.info("Analytic: PageRank-Normalized"); + DGAConfiguration requiredConf = new DGAConfiguration(); + requiredConf.setDGAGiraphProperty("-eif", DGATextEdgeValueInputFormat.class.getCanonicalName()); + requiredConf.setDGAGiraphProperty("-vof", PageRankVertexOutputFormat.class.getCanonicalName()); + requiredConf.setDGAGiraphProperty("-eip", inputPath); + requiredConf.setDGAGiraphProperty("-mc", PageRankMasterCompute.class.getCanonicalName()); + requiredConf.setDGAGiraphProperty("-op", outputPath); + requiredConf.setDGAGiraphProperty("-vsd", outputPath); + requiredConf.setCustomProperty(DGAEdgeTDTOutputFormat.WRITE_VERTEX_VALUE, "true"); + DGAConfiguration finalConf = DGAConfiguration.coalesce(fileConf, commandLineConf, requiredConf); + + finalConf.setLibDir(libDir); + + String[] giraphArgs = finalConf.convertToCommandLineArguments(NormalizedPageRankComputation.class.getCanonicalName()); + System.exit(ToolRunner.run(new GiraphRunner(), giraphArgs)); + + } else if (analytic.equals("scan1")) { + logger.info("Analytic: Scan1"); + DGAConfiguration requiredConf = new DGAConfiguration(); + requiredConf.setDGAGiraphProperty("-eif", UndirectedIntCsvEdgeInputFormat.class.getCanonicalName()); + requiredConf.setDGAGiraphProperty("-vof", IntVertexOutputFormat.class.getCanonicalName()); + requiredConf.setDGAGiraphProperty("-eip", inputPath); + requiredConf.setDGAGiraphProperty("-op", outputPath); + requiredConf.setDGAGiraphProperty("-vsd", outputPath); + requiredConf.setCustomProperty("io.edge.reverse.duplicator", "true"); + DGAConfiguration minimalDefaults = new DGAConfiguration(); + minimalDefaults.setCustomProperty(DGAEdgeTTTOutputFormat.WRITE_VERTEX_VALUE, "true"); + DGAConfiguration finalConf = DGAConfiguration.coalesce(minimalDefaults, fileConf, commandLineConf, requiredConf); + + finalConf.setLibDir(libDir); + + String[] giraphArgs = finalConf.convertToCommandLineArguments(Scan1Computation.class.getCanonicalName()); + System.exit(ToolRunner.run(new GiraphRunner(), giraphArgs)); + + } else if (analytic.equals("tricount")) { + logger.info("Analytic: Triangle Counting"); + DGAConfiguration requiredConf = new DGAConfiguration(); + requiredConf.setDGAGiraphProperty("-eif", UndirectedIntCsvEdgeInputFormat.class.getCanonicalName()); + requiredConf.setDGAGiraphProperty("-vof", IntVertexOutputFormat.class.getCanonicalName()); + requiredConf.setDGAGiraphProperty("-mc", TriangleCountMasterCompute.class.getCanonicalName()); + requiredConf.setDGAGiraphProperty("-eip", inputPath); + requiredConf.setDGAGiraphProperty("-op", outputPath); + requiredConf.setDGAGiraphProperty("-vsd", outputPath); + //requiredConf.setCustomProperty("giraph.messageCombinerClass", TriangleCountCombiner.class.getCanonicalName()); + DGAConfiguration minimalDefaults = new DGAConfiguration(); + minimalDefaults.setCustomProperty(DGAEdgeTTTOutputFormat.WRITE_VERTEX_VALUE, "true"); + DGAConfiguration finalConf = DGAConfiguration.coalesce(minimalDefaults, fileConf, commandLineConf, requiredConf); + + finalConf.setLibDir(libDir); + + String[] giraphArgs = finalConf.convertToCommandLineArguments(TriangleCountComputation.class.getCanonicalName()); + System.exit(ToolRunner.run(new GiraphRunner(), giraphArgs)); + + } else if (analytic.equals("bfstree")) { + logger.info("Analytic: BFS Tree"); + DGAConfiguration requiredConf = new DGAConfiguration(); + requiredConf.setDGAGiraphProperty("-eif", DirectedIntCsvEdgeInputFormat.class.getCanonicalName()); + requiredConf.setDGAGiraphProperty("-vof", IntVertexOutputFormat.class.getCanonicalName()); + requiredConf.setDGAGiraphProperty("-mc", BfsTreeMasterCompute.class.getCanonicalName()); + requiredConf.setDGAGiraphProperty("-eip", inputPath); + requiredConf.setDGAGiraphProperty("-op", outputPath); + requiredConf.setDGAGiraphProperty("-vsd", outputPath); + requiredConf.setCustomProperty("giraph.messageCombinerClass", IntIntMaxMessageCombiner.class.getCanonicalName()); + DGAConfiguration minimalDefaults = new DGAConfiguration(); + minimalDefaults.setCustomProperty(DGAEdgeTTTOutputFormat.WRITE_VERTEX_VALUE, "true"); + DGAConfiguration finalConf = DGAConfiguration.coalesce(minimalDefaults, fileConf, commandLineConf, requiredConf); + + finalConf.setLibDir(libDir); + + String[] giraphArgs = finalConf.convertToCommandLineArguments(BfsTreeComputation.class.getCanonicalName()); + System.exit(ToolRunner.run(new GiraphRunner(), giraphArgs)); } + + + + } catch (Exception e) { logger.error("Unable to run analytic; ", e); } diff --git a/dga-giraph/src/main/java/com/soteradefense/dga/combiners/IntIntMaxMessageCombiner.java b/dga-giraph/src/main/java/com/soteradefense/dga/combiners/IntIntMaxMessageCombiner.java new file mode 100644 index 0000000..985db45 --- /dev/null +++ b/dga-giraph/src/main/java/com/soteradefense/dga/combiners/IntIntMaxMessageCombiner.java @@ -0,0 +1,28 @@ +package com.soteradefense.dga.combiners; + + +import org.apache.hadoop.io.IntWritable; +import org.apache.giraph.combiner.MessageCombiner; + +/** + * For IntWriteable Vertex Ids sending IntWritable messages where only a single message per vertex + * is desired. Simply use the message with the highest value, and throw away others. + * + * Created by ekimbrel on 9/24/15. + * + */ +public class IntIntMaxMessageCombiner implements MessageCombiner { + + public void combine(IntWritable vertexId, IntWritable originalMessage, IntWritable messageToCombine) { + int original = originalMessage.get(); + int other = messageToCombine.get(); + if (other > original){ + originalMessage.set(other); + } + } + + + public IntWritable createInitialMessage() { + return new IntWritable(Integer.MIN_VALUE); + } +} diff --git a/dga-giraph/src/main/java/com/soteradefense/dga/io/formats/DirectedIntCsvEdgeInputFormat.java b/dga-giraph/src/main/java/com/soteradefense/dga/io/formats/DirectedIntCsvEdgeInputFormat.java new file mode 100644 index 0000000..5997afa --- /dev/null +++ b/dga-giraph/src/main/java/com/soteradefense/dga/io/formats/DirectedIntCsvEdgeInputFormat.java @@ -0,0 +1,52 @@ +package com.soteradefense.dga.io.formats; + +import com.soteradefense.dga.DGALoggingUtil; +import org.apache.giraph.io.EdgeReader; +import org.apache.giraph.io.ReverseEdgeDuplicator; +import org.apache.giraph.io.formats.TextEdgeInputFormat; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.giraph.utils.IntPair; + +import java.io.IOException; + +/** + * Read a simple undirected Edge List in csv format. "VertexId,VertexId" no edge values. + */ +public class DirectedIntCsvEdgeInputFormat extends TextEdgeInputFormat{ + + public EdgeReader createEdgeReader(InputSplit split, TaskAttemptContext context) throws IOException { + return new DirectedIntCsvEdgeReader(); + } + + public class DirectedIntCsvEdgeReader extends TextEdgeReaderFromEachLineProcessed { + @Override + protected IntPair preprocessLine(Text line) throws IOException { + String[] tokens = line.toString().split(","); + return new IntPair(Integer.parseInt(tokens[0]),Integer.parseInt(tokens[1])); + } + + @Override + protected IntWritable getSourceVertexId(IntPair endpoints) + throws IOException { + return new IntWritable(endpoints.getFirst()); + } + + @Override + protected IntWritable getTargetVertexId(IntPair endpoints) + throws IOException { + return new IntWritable(endpoints.getSecond()); + } + + @Override + protected NullWritable getValue(IntPair endpoints) throws IOException { + return NullWritable.get(); + } + + + } + +} diff --git a/dga-giraph/src/main/java/com/soteradefense/dga/io/formats/IntVertexOutputFormat.java b/dga-giraph/src/main/java/com/soteradefense/dga/io/formats/IntVertexOutputFormat.java new file mode 100644 index 0000000..adaac24 --- /dev/null +++ b/dga-giraph/src/main/java/com/soteradefense/dga/io/formats/IntVertexOutputFormat.java @@ -0,0 +1,44 @@ +package com.soteradefense.dga.io.formats; + +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.formats.TextVertexOutputFormat; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import java.io.IOException; + +/** + * Created by ekimbrel on 8/21/15. + */ + + + +public class IntVertexOutputFormat extends TextVertexOutputFormat { + + @Override + public TextVertexOutputFormat.TextVertexWriter createVertexWriter(TaskAttemptContext context) throws IOException, + InterruptedException { + return new SimpleVertexWriter(); + } + + /** + * A simple vertex writer that writes the VertexId and Vertex Value + */ + public class SimpleVertexWriter extends TextVertexWriter { + + /** + * Writes a Vertex + * + * @param vertex Vertex to Write + * @throws IOException + * @throws InterruptedException + */ + public void writeVertex(Vertex vertex) throws IOException, InterruptedException { + getRecordWriter().write(new Text(vertex.getId().toString()), new Text(vertex.getValue().toString())); + } + + } + +} diff --git a/dga-giraph/src/main/java/com/soteradefense/dga/io/formats/PageRankVertexOutputFormat.java b/dga-giraph/src/main/java/com/soteradefense/dga/io/formats/PageRankVertexOutputFormat.java new file mode 100644 index 0000000..49e605d --- /dev/null +++ b/dga-giraph/src/main/java/com/soteradefense/dga/io/formats/PageRankVertexOutputFormat.java @@ -0,0 +1,40 @@ +package com.soteradefense.dga.io.formats; + +import com.soteradefense.dga.pr.PageRankData; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.formats.TextVertexOutputFormat; +import org.apache.hadoop.io.ArrayPrimitiveWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import java.util.Arrays; + +import java.io.IOException; + +public class PageRankVertexOutputFormat extends TextVertexOutputFormat { + + @Override + public TextVertexOutputFormat.TextVertexWriter createVertexWriter(TaskAttemptContext context) throws IOException, + InterruptedException { + return new SimpleVertexWriter(); + } + + /** + * A simple vertex writer that writes the Vertex and it's HighBetweenness Value. + */ + public class SimpleVertexWriter extends TextVertexWriter { + + /** + * Writes a Vertex + * + * @param vertex Vertex to Write + * @throws IOException + * @throws InterruptedException + */ + public void writeVertex(Vertex vertex) throws IOException, InterruptedException { + getRecordWriter().write(new Text(vertex.getId().toString()), new Text(Double.toString(vertex.getValue().rank))); + } + + } + +} diff --git a/dga-giraph/src/main/java/com/soteradefense/dga/io/formats/UndirectedIntCsvEdgeInputFormat.java b/dga-giraph/src/main/java/com/soteradefense/dga/io/formats/UndirectedIntCsvEdgeInputFormat.java new file mode 100644 index 0000000..c93480e --- /dev/null +++ b/dga-giraph/src/main/java/com/soteradefense/dga/io/formats/UndirectedIntCsvEdgeInputFormat.java @@ -0,0 +1,53 @@ +package com.soteradefense.dga.io.formats; + +import com.soteradefense.dga.DGALoggingUtil; +import org.apache.giraph.io.EdgeReader; +import org.apache.giraph.io.ReverseEdgeDuplicator; +import org.apache.giraph.io.formats.TextEdgeInputFormat; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.giraph.utils.IntPair; + +import java.io.IOException; + +/** + * Read a simple undirected Edge List in csv format. "VertexId,VertexId" no edge values. + */ +public class UndirectedIntCsvEdgeInputFormat extends TextEdgeInputFormat{ + + public EdgeReader createEdgeReader(InputSplit split, TaskAttemptContext context) throws IOException { + EdgeReader reader = new ReverseEdgeDuplicator(new UndirectedIntCsvEdgeReader()); + return reader; + } + + public class UndirectedIntCsvEdgeReader extends TextEdgeReaderFromEachLineProcessed { + @Override + protected IntPair preprocessLine(Text line) throws IOException { + String[] tokens = line.toString().split(","); + return new IntPair(Integer.parseInt(tokens[0]),Integer.parseInt(tokens[1])); + } + + @Override + protected IntWritable getSourceVertexId(IntPair endpoints) + throws IOException { + return new IntWritable(endpoints.getFirst()); + } + + @Override + protected IntWritable getTargetVertexId(IntPair endpoints) + throws IOException { + return new IntWritable(endpoints.getSecond()); + } + + @Override + protected NullWritable getValue(IntPair endpoints) throws IOException { + return NullWritable.get(); + } + + + } + +} diff --git a/dga-giraph/src/main/java/com/soteradefense/dga/pr/NormalizedPageRankComputation.java b/dga-giraph/src/main/java/com/soteradefense/dga/pr/NormalizedPageRankComputation.java new file mode 100644 index 0000000..c49237d --- /dev/null +++ b/dga-giraph/src/main/java/com/soteradefense/dga/pr/NormalizedPageRankComputation.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.soteradefense.dga.pr; + +import com.kenai.jffi.Array; +import com.soteradefense.dga.DGALoggingUtil; +import org.apache.giraph.comm.WorkerClientRequestProcessor; +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.GraphTaskManager; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.worker.WorkerContext; +import org.apache.giraph.worker.WorkerGlobalCommUsage; +import org.apache.hadoop.io.ArrayPrimitiveWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.io.IOException; + + +/** + * Normalized Pagerank implementaiton to match as closely as possible the built in graphX algorithm + */ +public class NormalizedPageRankComputation extends BasicComputation { + + private static final Logger logger = LoggerFactory.getLogger(PageRankComputation.class); + + public static final String MAX_EPSILON = "com.soteradefense.dga.max.epsilon"; + public static final String DAMPING_FACTOR = "damping.factor"; + public static final double DAMPING_FACTOR_DEFAULT_VALUE = 0.85; + + @Override + public void initialize(GraphState graphState, WorkerClientRequestProcessor workerClientRequestProcessor, GraphTaskManager graphTaskManager, WorkerGlobalCommUsage workerGlobalCommUsage, WorkerContext workerContext) { + super.initialize(graphState, workerClientRequestProcessor, graphTaskManager, workerGlobalCommUsage, workerContext); + DGALoggingUtil.setDGALogLevel(this.getConf()); + } + + @Override + public void compute(Vertex vertex, Iterable messages) throws IOException { + + double dampingFactor = this.getConf().getDouble(DAMPING_FACTOR, DAMPING_FACTOR_DEFAULT_VALUE); + + long step = getSuperstep(); + + if (step == 0) { + // initialize the starting page rank and pagerank delta values + vertex.setValue(new PageRankData(1.0,1.0)); + } else { + + PageRankData state = vertex.getValue(); + //if (state.delta > PageRankMasterCompute.EPSILON){ + double rank = 0; + for (DoubleWritable partial : messages) { + rank += partial.get(); + } + rank = ((1 - dampingFactor) ) + (dampingFactor * rank); + double delta = Math.abs(rank - state.rank); + aggregate(MAX_EPSILON, new DoubleWritable(delta)); + vertex.setValue(new PageRankData(rank,delta)); + //} + + } + distributeRank(vertex); + } + + + private void distributeRank(Vertex vertex) { + double rank = vertex.getValue().rank / vertex.getNumEdges(); + sendMessageToAllEdges(vertex, new DoubleWritable(rank)); + } + +} diff --git a/dga-giraph/src/main/java/com/soteradefense/dga/pr/PageRankComputation.java b/dga-giraph/src/main/java/com/soteradefense/dga/pr/PageRankComputation.java index f490d83..533318c 100644 --- a/dga-giraph/src/main/java/com/soteradefense/dga/pr/PageRankComputation.java +++ b/dga-giraph/src/main/java/com/soteradefense/dga/pr/PageRankComputation.java @@ -65,6 +65,7 @@ public void compute(Vertex vertex, Iterable{ + @Override + public void aggregate(ArrayPrimitiveWritable value) { + ArrayPrimitiveWritable aggregatedValue = getAggregatedValue(); + int[] scanIdValue = (int[]) value.get(); + int[] oldIdValue = (int[]) aggregatedValue.get(); + + if (scanIdValue[1] > oldIdValue[1]){ + aggregatedValue.set(value.get()); + } + } + + @Override + public ArrayPrimitiveWritable createInitialValue() { + return new ArrayPrimitiveWritable(new int[]{0,0}); + } + +} diff --git a/dga-giraph/src/main/java/com/soteradefense/dga/scan1/Scan1Computation.java b/dga-giraph/src/main/java/com/soteradefense/dga/scan1/Scan1Computation.java new file mode 100644 index 0000000..e831449 --- /dev/null +++ b/dga-giraph/src/main/java/com/soteradefense/dga/scan1/Scan1Computation.java @@ -0,0 +1,117 @@ +package com.soteradefense.dga.scan1; + +import com.soteradefense.dga.DGALoggingUtil; +import org.apache.giraph.comm.WorkerClientRequestProcessor; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.GraphTaskManager; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.worker.WorkerContext; +import org.apache.giraph.worker.WorkerGlobalCommUsage; +import org.apache.hadoop.io.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.validation.constraints.Null; +import java.io.IOException; +import java.util.HashSet; + +/** + * Created by ekimbrel on 8/20/15. + */ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + + +/** + * Scan1 determines the neighborhood size of each vertex. And reports the max value found. + * Neighborhood size here is defined as the number of edges in the local sub-graph made up of a node and its neighboring vertices. + + * + * I - Vertex id + * V - Vertex Data + * E - Edge Data + * M - Message type + * + */ +public class Scan1Computation extends BasicComputation { + + private static final Logger logger = LoggerFactory.getLogger(Scan1Computation.class); + static final String MAX_AGG = "com.soteradefense.dga.scan1.MAX_AGG"; + + @Override + public void initialize(GraphState graphState, WorkerClientRequestProcessor workerClientRequestProcessor, GraphTaskManager graphTaskManager, WorkerGlobalCommUsage workerGlobalCommUsage, WorkerContext workerContext) { + super.initialize(graphState, workerClientRequestProcessor, graphTaskManager, workerGlobalCommUsage, workerContext); + DGALoggingUtil.setDGALogLevel(this.getConf()); + } + + @Override + public void compute(Vertex vertex, Iterable messages) throws IOException { + try { + if (getSuperstep() == 0) { + broadcastNeighbors(vertex); + return; + } + + HashSet neighbors = new HashSet(); + + int thisNode = (int) vertex.getId().get(); + for (Edge edge : vertex.getEdges()) { + int otherNode = (int) edge.getTargetVertexId().get(); + if (thisNode != otherNode) neighbors.add(otherNode); + } + + double neighboorhoodSize = 0.0; + for (ArrayPrimitiveWritable incomingMessage : messages) { + neighboorhoodSize += 1; + int[] oneHopNeighbors = (int[]) incomingMessage.get(); + for (int node : oneHopNeighbors){ + //because these edges will be counted twice only count them as half + if (neighbors.contains(node)) neighboorhoodSize+=0.5; + } + } + + this.aggregate(MAX_AGG, new ArrayPrimitiveWritable(new int[]{thisNode,(int) neighboorhoodSize})); + vertex.setValue(new IntWritable( (int) neighboorhoodSize)); + + } catch (Exception e) { + System.err.print(e.toString()); + } + } + + + /** + * Send list of neighbors to all neighbors + * + * @param vertex The current vertex being operated on. + */ + private void broadcastNeighbors(Vertex vertex) { + + int [] neighbors = new int[vertex.getNumEdges()]; + int i = 0; + for (Edge edge : vertex.getEdges()) { + neighbors[i++] = edge.getTargetVertexId().get(); + } + sendMessageToAllEdges(vertex,new ArrayPrimitiveWritable(neighbors)); + + } + + +} diff --git a/dga-giraph/src/main/java/com/soteradefense/dga/scan1/Scan1MasterCompute.java b/dga-giraph/src/main/java/com/soteradefense/dga/scan1/Scan1MasterCompute.java new file mode 100644 index 0000000..12679b1 --- /dev/null +++ b/dga-giraph/src/main/java/com/soteradefense/dga/scan1/Scan1MasterCompute.java @@ -0,0 +1,82 @@ +package com.soteradefense.dga.scan1; + +import com.soteradefense.dga.DGALoggingUtil; +import org.apache.giraph.master.DefaultMasterCompute; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ArrayPrimitiveWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.Arrays; + +/** + * Created by ekimbrel on 8/20/15. + */ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + * Scan1 determines the neighborhood size of each vertex. And reports the max value found. + * Neighborhood size here is defined as the number of edges in the local sub-graph made up of a node and its neighboring vertices. + * + * The master compute class simple collects the aggreated max value and writes to an empty with + * the name _vertex_vertexId_value_maxNeighborhoodSize + * An empty file with data in the filename is used to avoid consuming an entire block in hdfs for a few bytes of storage. + * + */ +public class Scan1MasterCompute extends DefaultMasterCompute { + + + private static final Logger logger = LoggerFactory.getLogger(Scan1MasterCompute.class); + + @Override + public void initialize() throws InstantiationException, IllegalAccessException { + DGALoggingUtil.setDGALogLevel(this.getConf()); + registerAggregator(Scan1Computation.MAX_AGG, MaxScanAggregator.class); + } + + + @Override + public void compute() { + long step = this.getSuperstep(); + if (step == 2) { + int[] maxScanValue = (int[]) ((ArrayPrimitiveWritable) this.getAggregatedValue(Scan1Computation.MAX_AGG)).get(); + logger.info("max scan: {}", step, Arrays.toString(maxScanValue)); + + String dir = getConf().get("mapred.output.dir", getConf().get("mapreduce.output.fileoutputformat.outputdir")); + String path = dir + "/_vertex_"+maxScanValue[0]+"_value_"+maxScanValue[1]; + Path pt = new Path(path); + + try { + FileSystem fs = FileSystem.get(new Configuration()); + BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(pt, true))); + br.close(); + } catch (IOException e) { + e.printStackTrace(); + throw new IllegalStateException("Could not write to file: " + path); + }finally { + this.haltComputation(); + } + } + } + +} diff --git a/dga-giraph/src/main/java/com/soteradefense/dga/triangles/TriangleCountCombiner.java b/dga-giraph/src/main/java/com/soteradefense/dga/triangles/TriangleCountCombiner.java new file mode 100644 index 0000000..c7089e1 --- /dev/null +++ b/dga-giraph/src/main/java/com/soteradefense/dga/triangles/TriangleCountCombiner.java @@ -0,0 +1,25 @@ +package com.soteradefense.dga.triangles; + +import org.apache.giraph.combiner.MessageCombiner; +import org.apache.hadoop.io.ArrayPrimitiveWritable; +import org.apache.hadoop.io.IntWritable; + +/** + * Created by ekimbrel on 9/23/15. + */ +public class TriangleCountCombiner implements MessageCombiner{ + + public void combine(IntWritable vertexId, ArrayPrimitiveWritable originalMessage, ArrayPrimitiveWritable messageToCombine){ + int [] original = (int []) originalMessage.get(); + int[] other = (int []) messageToCombine.get(); + int[] combined = new int[ original.length + other.length]; + int i = 0; + for (int source: original) combined[i++] = source; + for (int source: other) combined[i++] = source; + originalMessage.set( combined ); + } + + public ArrayPrimitiveWritable createInitialMessage(){ + return new ArrayPrimitiveWritable(new int[0]); + } +} diff --git a/dga-giraph/src/main/java/com/soteradefense/dga/triangles/TriangleCountComputation.java b/dga-giraph/src/main/java/com/soteradefense/dga/triangles/TriangleCountComputation.java new file mode 100644 index 0000000..0bf4762 --- /dev/null +++ b/dga-giraph/src/main/java/com/soteradefense/dga/triangles/TriangleCountComputation.java @@ -0,0 +1,139 @@ +package com.soteradefense.dga.triangles; + + +import com.soteradefense.dga.DGALoggingUtil; +import org.apache.giraph.comm.WorkerClientRequestProcessor; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.GraphTaskManager; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.worker.WorkerContext; +import org.apache.giraph.worker.WorkerGlobalCommUsage; +import org.apache.hadoop.io.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.HashSet; + +/** + * Created by ekimbrel on 9/22/15. + */ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + + +/** + * Counts the number of triangles (or loops of length 3) in the Graph. Assumes an undirected Graph. + * + * This Algorithim will work for directed graph triangle counting, except that the number of triangles is diveded by two for each node + * in the final super step of this computation. This is because for undireted graphs each triangle is counted twice. + * + * + * + * I - Vertex id + * V - Vertex Data + * E - Edge Data + * M - Message type + * + */ +public class TriangleCountComputation extends BasicComputation { + + private static final Logger logger = LoggerFactory.getLogger(TriangleCountComputation.class); + + + @Override + public void initialize(GraphState graphState, WorkerClientRequestProcessor workerClientRequestProcessor, GraphTaskManager graphTaskManager, WorkerGlobalCommUsage workerGlobalCommUsage, WorkerContext workerContext) { + super.initialize(graphState, workerClientRequestProcessor, graphTaskManager, workerGlobalCommUsage, workerContext); + DGALoggingUtil.setDGALogLevel(this.getConf()); + } + + @Override + public void compute(Vertex vertex, Iterable messages) throws IOException { + + + long step = getSuperstep(); + IntWritable thisVertexId = vertex.getId(); + + // keep our neighbors in a set for fast searching and remove the effect of duplicate edges if they exist. + HashSet neighbors = new HashSet(); + for (Edge edge : vertex.getEdges()) { + neighbors.add(edge.getTargetVertexId().get()); + } + // remove self edges + neighbors.remove(thisVertexId.get()); + + + // step 0, send vertex.id to all neighbors + if (step == 0L) { + for (int target: neighbors){ + sendMessage(new IntWritable(target),thisVertexId); + } + } + + + // step 1, for each message "sourceId" forward the message to all neighbors except the source + else if (step == 1L) { + for (IntWritable source : messages) { + // create a new outMessage instead of reusing the input message because giraph does some funny business with object reuse of the input messages + IntWritable outMessage = new IntWritable(source.get()); + for (int target: neighbors){ + IntWritable destination = new IntWritable(target); + if ( source.get() != target ) { // ignore edges to the source + sendMessage(destination, outMessage); + } + } + } + } + + + // step 2, for each message forward it to the originating source + // if this vertex as an edge to the source + else if (step == 2L) { + + for (IntWritable message : messages) { + int source = message.get(); + if (neighbors.contains(source)){ + sendMessage(new IntWritable(source),new IntWritable(source)); + } + } + } + + + // step 3, count messages and aggregate total triangle count + else if (step == 3L) { + int numTriangles = 0; + for (IntWritable source : messages) { + numTriangles++; + } + + // because messages go in both directions we'll have counted each triangle twice at each node + numTriangles = numTriangles / 2; + vertex.setValue(new IntWritable(numTriangles)); + this.aggregate(TriangleCountMasterCompute.TRI_COUNT_AGG, new LongWritable(numTriangles)); + } + + else{ + vertex.voteToHalt(); + } + + } + + +} \ No newline at end of file diff --git a/dga-giraph/src/main/java/com/soteradefense/dga/triangles/TriangleCountComputationWithCombiner.java b/dga-giraph/src/main/java/com/soteradefense/dga/triangles/TriangleCountComputationWithCombiner.java new file mode 100644 index 0000000..923d46c --- /dev/null +++ b/dga-giraph/src/main/java/com/soteradefense/dga/triangles/TriangleCountComputationWithCombiner.java @@ -0,0 +1,159 @@ +package com.soteradefense.dga.triangles; + + +import com.soteradefense.dga.DGALoggingUtil; +import org.apache.giraph.comm.WorkerClientRequestProcessor; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.GraphTaskManager; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.worker.WorkerContext; +import org.apache.giraph.worker.WorkerGlobalCommUsage; +import org.apache.hadoop.io.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.HashSet; +import java.util.ArrayList; + + + +/** + * Created by ekimbrel on 9/22/15. + */ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + + +/** + * Counts the number of triangles (or loops of length 3) in the Graph. Assumes an undirected Graph. + * + * + * + * I - Vertex id + * V - Vertex Data + * E - Edge Data + * M - Message type + * + */ +public class TriangleCountComputationWithCombiner extends BasicComputation { + + private static final Logger logger = LoggerFactory.getLogger(TriangleCountComputation.class); + + + @Override + public void initialize(GraphState graphState, WorkerClientRequestProcessor workerClientRequestProcessor, GraphTaskManager graphTaskManager, WorkerGlobalCommUsage workerGlobalCommUsage, WorkerContext workerContext) { + super.initialize(graphState, workerClientRequestProcessor, graphTaskManager, workerGlobalCommUsage, workerContext); + DGALoggingUtil.setDGALogLevel(this.getConf()); + } + + @Override + public void compute(Vertex vertex, Iterable messages) throws IOException { + + + long step = getSuperstep(); + IntWritable thisVertexId = vertex.getId(); + + // keep our neighbors in a set for fast searching and remove the effect of duplicate edges if they exist. + HashSet neighbors = new HashSet(); + for (Edge edge : vertex.getEdges()) { + neighbors.add(edge.getTargetVertexId().get()); + } + // remove self edges + neighbors.remove(thisVertexId.get()); + + + // step 0, send vertex.id to all neighbors + if (step == 0L) { + ArrayPrimitiveWritable outMessage = new ArrayPrimitiveWritable( new int[] {thisVertexId.get()}); + for (int target: neighbors){ + sendMessage(new IntWritable(target),outMessage); + } + } + + + + // step 1, for each message "sourceId" forward the message to all neighbors except the source + else if (step == 1L) { + ArrayList sources = new ArrayList(); + for (ArrayPrimitiveWritable message : messages) { + int[] messageArray = ((int[]) message.get()); + for (int source: messageArray) sources.add(source); + } + int[] primitiveArray = new int[sources.size()]; + int i = 0; + for (int source: sources) primitiveArray[i++] = source; + ArrayPrimitiveWritable outMessage = new ArrayPrimitiveWritable(primitiveArray); + for (int target: neighbors) { + IntWritable destination = new IntWritable(target); + sendMessage(destination, outMessage); + } + + } + + + // step 2, for each message forward it to the originating source + // if this vertex as an edge to the source + else if (step == 2L) { + + // collect all sources recieved in a single array + int thisId = thisVertexId.get(); + ArrayList sources = new ArrayList(); + for (ArrayPrimitiveWritable message : messages) { + int[] messageArray = ((int[]) message.get()); + for (int sourceId: messageArray){ + if (sourceId != thisId) { + sources.add(sourceId); + } + } + } + + for (int source: sources) { + if (neighbors.contains(source)){ + sendMessage(new IntWritable(source),new ArrayPrimitiveWritable(new int[]{source})); + } + } + } + + + // step 3, count messages and aggregate total triangle count + else if (step == 3L) { + + int numTriangles = 0; + + for (ArrayPrimitiveWritable message : messages) { + int[] messageArray = ((int[]) message.get()); + numTriangles += messageArray.length; + } + + // because messages go in both directions we'll have counted each triangle twice at each node + numTriangles = numTriangles / 2; + vertex.setValue(new IntWritable(numTriangles)); + this.aggregate(TriangleCountMasterCompute.TRI_COUNT_AGG, new LongWritable(numTriangles)); + } + + else{ + vertex.voteToHalt(); + } + + } + + +} diff --git a/dga-giraph/src/main/java/com/soteradefense/dga/triangles/TriangleCountMasterCompute.java b/dga-giraph/src/main/java/com/soteradefense/dga/triangles/TriangleCountMasterCompute.java new file mode 100644 index 0000000..7936bb1 --- /dev/null +++ b/dga-giraph/src/main/java/com/soteradefense/dga/triangles/TriangleCountMasterCompute.java @@ -0,0 +1,79 @@ +package com.soteradefense.dga.triangles; + +import com.soteradefense.dga.DGALoggingUtil; +import org.apache.giraph.aggregators.LongSumAggregator; +import org.apache.giraph.master.DefaultMasterCompute; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.Arrays; + +/** + * Created by ekimbrel on 9/22/15. + */ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +public class TriangleCountMasterCompute extends DefaultMasterCompute { + + + private static final Logger logger = LoggerFactory.getLogger(TriangleCountMasterCompute.class); + public static final String TRI_COUNT_AGG = "com.soteradefense.dga.triangles.count_aggregator"; + + + @Override + public void initialize() throws InstantiationException, IllegalAccessException { + DGALoggingUtil.setDGALogLevel(this.getConf()); + registerAggregator(TRI_COUNT_AGG, LongSumAggregator.class); + } + + + @Override + public void compute() { + long step = this.getSuperstep(); + if (step == 4L) { + long totalTriangles = ((LongWritable) getAggregatedValue(TRI_COUNT_AGG)).get(); + double distinctTriangles = totalTriangles / 3.0; + + logger.info("triangles: {}", step, distinctTriangles); + + String dir = getConf().get("mapred.output.dir", getConf().get("mapreduce.output.fileoutputformat.outputdir")); + String path = dir + "/_triangles_"+distinctTriangles; + Path pt = new Path(path); + + try { + FileSystem fs = FileSystem.get(new Configuration()); + BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(pt, true))); + br.close(); + } catch (IOException e) { + e.printStackTrace(); + throw new IllegalStateException("Could not write to file: " + path); + }finally { + this.haltComputation(); + } + } + } + +} diff --git a/dga-giraph/src/main/resources/dga-yarn-giraph b/dga-giraph/src/main/resources/dga-yarn-giraph index c099d01..adc0013 100755 --- a/dga-giraph/src/main/resources/dga-yarn-giraph +++ b/dga-giraph/src/main/resources/dga-yarn-giraph @@ -4,6 +4,10 @@ CWD=$(pwd) LIB_DIR=$CWD/lib/ +ZK_LIST=locahost:2181 +WORKER_MEMORY=1024 +WORKER_CORES=1 + export HADOOP_USER_CLASSPATH_FIRST=true export HADOOP_CLASSPATH=lib/*:conf/ @@ -12,4 +16,5 @@ do export JARS="$JARS,$i" done -hadoop jar lib/dga-giraph-0.0.1.jar com.soteradefense.dga.DGAYarnRunner $LIB_DIR "${@:1}" -yj "$JARS" +hadoop jar lib/dga-giraph-0.0.1.jar com.soteradefense.dga.DGAYarnRunner $LIB_DIR "${@:1}" -yj "$JARS" -Dmapred.job.tracker=yarnRM -Dgiraph.zkList="$ZK_LIST" -Dmapreduce.map.memory.mb=$WORKER_MEMORY -Dmapreduce.map.cpu.vcores=$WORKER_CORES +