-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathBenchmarkTrianglesNetflowABC.java
More file actions
273 lines (237 loc) · 8.8 KB
/
BenchmarkTrianglesNetflowABC.java
File metadata and controls
273 lines (237 loc) · 8.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
package edu.cu.boulder.cs.flink.triangles;
import org.apache.commons.cli.*;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* This benchmarks finding triangles, A->B, B->C, C->A, where the times of
* the edges are strictly increasing. The edges are netflows, and the
* netflow representation is kept throughout (similar to the SAM
* implementation).
*
* The approach is to use an interval join
* (https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#interval-join)
* which allows you to specify an interval of time from an event.
* The interval join is applied between the netflow stream and itself to create
* triads (two edges), and then between the triads and the netflow stream to
* find the triangles.
*/
public class BenchmarkTrianglesNetflow {
/**
* Class to grab the source of the edge. Used by the dataflow below
* to join edges together to form a triad.
*/
private static class SourceKeySelector
implements KeySelector<Netflow, String>
{
@Override
public String getKey(Netflow edge) {
return edge.sourceIp;
}
}
/**
* Class to grab the destination of the edge. Used by the data pipelin
* below to join edges together to form a triad.
*/
private static class DestKeySelector
implements KeySelector<Netflow, String>
{
@Override
public String getKey(Netflow edge) {
return edge.destIp;
}
}
/**
* Key selector that returns a tuple with the target of the edge
* followed by the source of the edge.
*/
private static class LastEdgeKeySelector
implements KeySelector<Netflow, Tuple2<String, String>>
{
@Override
public Tuple2<String, String> getKey(Netflow e1)
{
return new Tuple2<String, String>(e1.destIp, e1.sourceIp);
}
}
/**
* A triad is two edges connected with a common vertex. The common
* vertex is not enforced by this class, but with the logic defined
* in the dataflow.
*/
private static class Triad
{
Netflow e1;
Netflow e2;
public Triad(Netflow e1, Netflow e2) {
this.e1 = e1;
this.e2 = e2;
}
public String toString()
{
String str = e1.toString() + " " + e2.toString();
return str;
}
}
/**
* Key selector that returns a tuple with the source of the first edge and the
* destination of the second edge.
*/
private static class TriadKeySelector
implements KeySelector<Triad, Tuple2<String, String>>
{
@Override
public Tuple2<String, String> getKey(Triad triad)
{
return new Tuple2<String, String>(triad.e1.sourceIp, triad.e2.destIp);
}
}
/**
* A triangle is three edes where vertex A->B->C->A.
* The topological and temporal constraints are again handled
* by the data flow defined below.
*/
private static class Triangle
{
Netflow e1;
Netflow e2;
Netflow e3;
public Triangle(Netflow e1, Netflow e2, Netflow e3)
{
this.e1 = e1;
this.e2 = e2;
this.e3 = e3;
}
public String toString()
{
String str = e1.toString() + " " + e2.toString() + " " + e3.toString();
return str;
}
}
/**
* Joins two edges together to form triads.
*/
private static class EdgeJoiner
extends ProcessJoinFunction<Netflow, Netflow, Triad>
{
private double queryWindow;
public EdgeJoiner(double queryWindow)
{
this.queryWindow = queryWindow;
}
@Override
public void processElement(Netflow e1, Netflow e2, Context ctx, Collector<Triad> out)
{
if (e1.timeSeconds < e2.timeSeconds) {
if (e2.timeSeconds - e1.timeSeconds <= queryWindow) {
out.collect(new Triad(e1, e2));
}
}
}
}
private static class TriadJoiner extends ProcessJoinFunction<Triad, Netflow, Triangle>
{
private double queryWindow;
public TriadJoiner(double queryWindow)
{
this.queryWindow = queryWindow;
}
@Override
public void processElement(Triad triad, Netflow e3, Context ctx, Collector<Triangle> out)
{
if (triad.e2.timeSeconds < e3.timeSeconds) {
if (e3.timeSeconds - triad.e1.timeSeconds <= queryWindow) {
out.collect(new Triangle(triad.e1, triad.e2, e3));
}
}
}
}
public static void main(String[] args) throws Exception {
Options options = new Options();
Option parseFile = new Option("file", "parseFile", true,
"Input the CSV for a netflow file");
/*Option numNetflowsOption = new Option("nn", "numNetflows", true,
"Number of netflows to create per source.");
Option numIpsOption = new Option("nip", "numIps", true,
"Number of ips in the pool.");
Option rateOption = new Option("r", "rate", true,
"The rate that netflows are generated.");
Option numSourcesOption = new Option("ns", "numSources", true,
"The number of netflow sources.");*/
Option queryWindowOption = new Option("qw", "queryWindow", true,
"The length of the query in seconds.");
Option outputFileOption = new Option("out", "outputFile", true,
"Where the output should go.");
Option outputNetflowOption = new Option("net", "outputNetflow", true,
"Where the netflows should go (optional).");
Option outputTriadOption = new Option("triad", "outputTriads", true,
"Where the triads should go (optional).");
parseFile.setRequired(true);
queryWindowOption.setRequired(true);
outputFileOption.setRequired(true);
options.addOption(parseFile);
options.addOption(queryWindowOption);
options.addOption(outputFileOption);
options.addOption(outputNetflowOption);
options.addOption(outputTriadOption);
CommandLineParser parser = new DefaultParser();
HelpFormatter formatter = new HelpFormatter();
CommandLine cmd = null;
try {
cmd = parser.parse(options, args);
} catch (ParseException e) {
System.out.println(e.getMessage());
formatter.printHelp("utility-name", options);
System.exit(1);
}
String fileToParse = cmd.getOptionValue("parseFile");
double queryWindow = Double.parseDouble(cmd.getOptionValue("queryWindow"));
String outputFile = cmd.getOptionValue("outputFile");
String outputNetflowFile = cmd.getOptionValue("outputNetflow");
String outputTriadFile = cmd.getOptionValue("outputTriads");
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(numSources);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Get a stream of netflows from the NetflowSource
NetflowSource netflowSource = new NetflowSource(numEvents, numIps, rate);
DataStreamSource<Netflow> netflows = env.addSource(netflowSource);
// If specified, we write out the raw data we see to a file.
if (outputNetflowFile != null) {
netflows.writeAsText(outputNetflowFile, FileSystem.WriteMode.OVERWRITE);
}
// Transforms the netflows into a stream of triads
DataStream<Triad> triads = netflows
//.keyBy(new DestKeySelector())
//.intervalJoin(netflows.keyBy(new SourceKeySelector()))
.keyBy(new SourceKeySelector())
.intervalJoin(netflows.keyBy(new DestKeySelector()))
.between(Time.milliseconds(0), Time.milliseconds((long) queryWindow * 1000))
.process(new EdgeJoiner(queryWindow));
// If specified, we spit out the triads we found to disk.
if (outputTriadFile != null) {
triads.writeAsText(outputTriadFile, FileSystem.WriteMode.OVERWRITE);
}
// Transforms the stream of triads into triangles.
DataStream<Triangle> triangles = triads
.keyBy(new TriadKeySelector())
.intervalJoin(netflows.keyBy(new LastEdgeKeySelector()))
.between(Time.milliseconds(0), Time.milliseconds((long) queryWindow * 1000))
.process(new TriadJoiner(queryWindow));
// Write the triangles we found to disk.
triangles.writeAsText(outputFile, FileSystem.WriteMode.OVERWRITE);
env.execute();
}
}