-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathReport.Rmd
More file actions
380 lines (286 loc) · 23.7 KB
/
Report.Rmd
File metadata and controls
380 lines (286 loc) · 23.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
---
title: "Spark Join Optimizations"
author: "Shabbir Hussain, Manthan Thakar, Tirthraj Parmar"
output:
pdf_document:
fig_crop: no
fig_width: 5
latex_engine: xelatex
geometry: left=1cm,right=1cm,top=1cm,bottom=2cm
---
```{r setup, echo=F, results='hide', message=F, warning=F, cache=F}
library("ggplot2")
library("tidyr")
library("knitr")
library("kableExtra")
library("scales")
library("plyr")
library("microbenchmark")
library("reshape2")
library("readr")
library("tm")
library("SnowballC")
library("wordcloud")
library("RColorBrewer")
library("grid")
library("gridBase")
library("gridExtra")
library("dplyr")
library("png")
g_legend<-function(aGplot){
tmp <- ggplot_gtable(ggplot_build(aGplot))
leg <- which(sapply(tmp$grobs, function(x) x$name) == "guide-box")
legend <- tmp$grobs[[leg]]
legend
}
```
```{r echo=F, message=F, warning=F, cache=F, results='hide'}
data1 <- read.csv('plugin-benchmark/results/com_timing.csv', sep=" ", header = FALSE)
data1 <- data1[, c("V1", "V2", "V3")]
data1 <- data1 %>% mutate(V1=ifelse(V1=="com_reg", "Control", "Plugin"))
colnames(data1) <- c("Type", "Metric", "Time")
data1$Type <- as.factor(data1$Type)
data1$Time <- as.numeric(data1$Time)
x = ggplot(data=data1, aes(x=Type, y=Time, fill="black")) +
geom_violin() +
labs(x="", y="Time (sec)", title="Fig 2: Compilation time for program with and without plugin") +
theme_bw() + theme(plot.title = element_text(size=8), legend.position = "none") +
scale_fill_grey() +
scale_color_grey()
```
```{r echo=F, cache=TRUE, results='hide'}
data <- read.csv('plugin-benchmark/results/run_timing.csv', sep="\t", header = FALSE)
data <- data[, c("V1", "V3", "V4")]
show_run_plots <- function(metricType, metricName, unit, scale=1, round=0, showx=TRUE, hColor){
dataRun <- subset(data, (substring(data$V1, 0, 3) == "run") & data$V3==metricType)
dataRun$V1 <- substring(dataRun$V1, 5)
dataRun %>%
group_by(V1) %>%
summarise(Mean=mean(V4), Median=median(V4), count = n()) %>%
separate("V1", into = paste("V", 1:3, sep = "_")) -> dataRun
colnames(dataRun) <- c("Type", "Tot", "Used", "Mean", "Median", "Count")
dataRun$Type <- as.factor(dataRun$Type)
dataRun$Tot <- as.integer(dataRun$Tot)
dataRun$Used <- as.integer(dataRun$Used)
dataRun <- subset(dataRun, Tot!=0)
dfReg <- subset(dataRun, Type=="reg")
dfPlu <- subset(dataRun, Type=="plugin")
# Individual HeatMaps
dataRun <- dataRun %>% mutate(Type=ifelse(Type=="reg", "Control", " Plugin"))
g1 <- ggplot(data=dataRun, aes(x=reorder(as.factor(Tot), Tot), y=reorder(as.factor(Used), Used), fill=Median)) +
geom_tile(width=1, height=1) +
scale_fill_gradient(low="white", high=hColor, name=metricName,
labels=function(x) dollar_format(suffix=unit, prefix = "")(round(x/scale, round))) +
label_both(element_blank()) +
facet_grid(.~Type) +
theme_bw() + theme(legend.position="left",
legend.title = element_blank(),
plot.title = element_blank(),
axis.title.x = element_blank(),
axis.title.y = element_blank(),
axis.text.y = element_blank(),
axis.ticks.y = element_blank())
# Combined HeatMap
dfAll <- merge(x = dfReg, y = dfPlu, by = c("Tot", "Used"), all = FALSE)
dfAll$Fill <- round(((-dfAll$Median.y + dfAll$Median.x)*100/dfAll$Median.x), 0)
g3 <- ggplot(data=dfAll, aes(x=reorder(as.factor(Tot), Tot), y=reorder(as.factor(Used), Used), fill=Fill)) +
geom_tile(width=1, height=1) +
scale_y_discrete(position = "right") +
scale_fill_gradient2(low=brewer.pal(3,"Set1")[1], mid="ivory", high=brewer.pal(11, "PiYG")[9],
midpoint=0, name="Delta",
labels=function(x) dollar_format(suffix = "%", prefix = "")(x)) +
label_both(element_blank()) +
geom_text(aes(label=paste0(Fill, "%")), size=2) +
facet_grid(.~ifelse(1==1, "Delta = (Plugin - Control)/Control",""), switch = "y") +
theme_bw() + theme(legend.position="none",
plot.title = element_blank(),
axis.title.y = element_blank(),
axis.title.x = element_blank())
if(showx==F){
g1 <- g1 + theme(axis.text.x = element_blank(), axis.ticks.x = element_blank())
g3 <- g3 + theme(axis.text.x = element_blank(), axis.ticks.x = element_blank())
}
t1 <- textGrob(metricName, gp = gpar(fontface = 3L, fontsize = 12), rot=90)
g1 <- arrangeGrob(t1, g1, ncol=2, widths=c(1, 30))
r <- arrangeGrob(g1, g3, ncol=2, widths=c(2,1))
return(r)
}
```
# Objective
Optimizing Spark joins on Resilient Distributed Datasets (RDDs) by employing column-pruning and / or broadcast join wherever appropriate.
# Join Optimization
Join operation on RDDs can be expensive. We suspect that one of the biggest factors that affects join performance is the amount of data shuffled in the process. In order to alleviate excessive shuffling of data, we propose and implement two types of optimization, namely, **Column Pruning** and **Broadcast Join**. In the following sections, we discuss both of these approaches and present our findings.
## Optimization 1: Column Pruning
**Hypothesis**: Pruning unused columns (values in Pair-RDD) of an RDD before performing a join reduces the amount of data shuffled for join and improves join performance.
### Approach
At a higher-level, we need following steps to perform column pruning on a spark join.
- **Identify target RDDs** Identify RDDs on which join is being performed.
- **Obtain column Usage** The columns from each RDD that are used after join operation.
- **Transform target RDDs** Using the column usage information, transform the target RDDs before join is performed.
In order to capture information required to perform steps above, we build a compiler-plugin for Scala compiler. Scala compiler plugin gives us the facility to access the abstract syntax tree (AST) of the source code after different phases of the compiler. This information would be either completely missing or hard to obtain inside Spark's DAG scheduler. Hence, we believe that a compiler plugin is a better choice for our purposes.
### Scala Compiler Plugin
Scala compiler can be modified by building compiler plugins. Scala compiler has 25 phases including phases like `parser`, `typer`, `erasure`, etc. The compiler plugin can be placed before or after any of these phases. It can then be used to analyze and modify AST from the previous phase. We use this capability and place our compiler plugin `JoinOptimizer` after Scala compiler's `parser` phase.
When a Scala program is compiled using `JoinOptimizer`, the plugin obtains the AST constructed by compiler's parser, at which point `JoinOptimizer` performs following steps:
- Identify join operations in a Scala program using Scala's quasiquotes.
- Iff a join operation is encountered:
- Capture the trees for target RDDs and the next `Transformation` that directly follows the `join`.
- Analyze this `Transformation`'s lambda and obtain column usage of target RDDs.
- Transform the tree before the `join` operation to insert a `mapValues` on target RDD that emits only used columns.
```{r echo=F, cache=T, results="hide", fig.width=8, fig.height=2}
code_image = rasterGrob(as.raster(readPNG("plugin-benchmark/results/code-snippet.png")))
grid.arrange(code_image, x, ncol=2)
```
```{r echo=F, cache=TRUE, results="hide", fig.height=3, fig.width=8, eval=F}
header1 = c("Processor", "RAM", "vCores", "Storage", "Spark", "Scala")
header2 = c("2.7 GHz Intel Core i5", "8GB", "4", "256GB", "2.2.0", "2.11.11")
config = data.frame(Config=header1, Value=header2)
tg = tableGrob(config,
theme = ttheme_default(core=list(bg_params = list(fill = brewer.pal(9, "Greys")[1:2],col=NA),
fg_params = list(fontface=3, fontsize=8))))
h <- grobHeight(tg)
w <- grobWidth(tg)
tableText = textGrob("Table 1: Machine Configuration", y=unit(0.5,"npc") + h, gp=gpar(fontsize=8))
grid.arrange(gTree(children=gList(tg, tableText)), x, ncol=2, widths=c(2,1), heights=c(10, 1))
```
The source code for optimized spark join that is yielded by following steps outlined above is shown in Fig 1.
Two PairRDDs `rdd1` and `rdd2` are defined in line 1 and 2 in fig 1, both with key data type of `Long` and value data type of `Tuple5[Long, ...]`. Line 5 contains code that performs `join` on `rdd1` and `rdd2` and then uses the first column from `rdd1` and second column from `rdd2` inside the lambda enclosed in `mapValues`. It is quite obvious that 4 columns in both RDDs are unused after `join`. Hence, these unused columns can be pruned.
Lines 8-10 show the code generated by our compiler plugin. It prunes the columns from both `rdd1` and `rdd2` before the `join` by adding an extra `mapValues` stage. Note that, for `rdd2` it uses a `null` value for first column since it is not used. To avoid rewriting user code `Transformation`s following joins we replace all unused indices in a `Tuple` with `null` values. We hypothesize that this would help save bytes and shuffles.
### Benchmarks
To measure the impact of `JoinOptimizer` plugin we compile same spark programs with and without the plugin. The spark program used for benchmarking performs joins between two RDDs, `rdd1` and `rdd2`. We tested multiple combinations of number of columns available in source RDD's and number columns used post `join` operation. Note that, these configurations are only varied for `rdd1` and for simplicity, we keep `rdd2` constant for all benchmarks. For all the configurations, we measure execution times and amount of data shuffled.
All the benchmarks are run on a local machine with configuration: _2.7 GHz Intel Core i5 processor, 8 GB RAM, 4 vCores, 256GB SSD Storage, Spark 2.2.0, Scala 2.11.11_
**Compilation Time**
Fig 2 shows the distribution of compilation time for the same spark program with and without the compiler plugin. From the violin plot, it can be seen that for majority of distribution, compiler plugin adds overhead of about 2 seconds. In the worst case (for the outliers), this overhead can be more than 5 seconds.
**Shuffled Data**
Fig 3 (shuffle) shows the amount of data shuffled for both spark programs compiled with (depicted as Plugin) and without (depicted as Control) `JoinOptimizer`. Each block in heatmaps shows median kb shuffled for different configurations of columns. X-axis shows the total number of columns before `join` and Y-axis shows the used number of columns used post `join`.
From the Control plot, it can be observed that the amount of data shuffled stays roughly the same for any value on X-axis. This indicates that **in Control shuffle seems to be a function of total number of columns** in the source RDDs. In contrast, for the Plugin plot, the shuffled data increases only when the used number columns increases. It bolsters our initial hypothesis. If number of columns used are constant, regardless of number of total columns shuffled data stays almost constant. Thus, **in Plugin shuffle becomes a function of used number columns** in source RDDs.
Fig 3 also shows the delta for shuffle data between both control and plugin benchmarks. The highest gain in network efficiency of $89\%$ is observed when total number of columns in `rdd1` is $22\%$ and none of the columns are used after join. This means that the optimized code shuffles $89\%$ less data than its unoptimized counterpart. Moreover, the percentage gain increases horizontally from left to right for each value on Y-axis indicating the decrease in shuffling compared to unoptimized version, when total number of columns increase but number of columns used remains constant. Surprisingly, when there's only one column in RDD and none of the columns are used, optimized version shuffles $23\%$ more data than unoptimized version which was unexpected. This could be a limitation in our plugin implementation that converts the RDD values and maps the values as `Tuple22` with $22 \times null$ values.
```{r fig.align="center", fig.height=3, fig.width=10, cache=F,echo=F, results = 'asis', warning=F, message=F}
tx <- textGrob("Total # of columns", gp = gpar(fontface = 3L, fontsize = 8))
ty <- textGrob("Used # of columns", gp = gpar(fontface = 3L, fontsize = 8), rot=90)
g1 <- show_run_plots("real", metricName="Time", unit="s", scale=1, round=1, hColor="maroon", showx=F)
g2 <- show_run_plots("shuffle", metricName="Shuffle", unit="kb ", scale=1024, round=0, hColor="royalblue", showx=T)
g3 <- arrangeGrob(g1, g2, tx, nrow=3, heights = c(10, 10, 1))
grid.arrange(g3, ty, ncol=2, widths=c(30, 1), bottom="Figure 3: Compiler Plugin Benchmarks")
```
**Execution Time**
Similarly, Fig 3 (Time) shows execution times for optimized (depicted as plugin) and unoptimized (depicted as control) programs. Each block in heatmaps shows median execution time in seconds for different configurations of columns in `rdd1`. The patterns observed for shuffled data in both control and plugin plots also exist for execution times. This indicates that there is high correlation between shuffled data and execution time.
```{r cache=TRUE,echo=F, results = 'asis', warning=F, message=F}
raw_to_plot_data_exec <- function(d, runType, mulFactor) {
colnames(d) <- c('ID', 'exec', 'Size')
d %>%
group_by(Size) %>%
summarise(TimeMean=mean(exec), TimeMedian=median(exec), Count = n()) -> plot_data
plot_data$Size <- (2^plot_data$Size)*mulFactor
runTypeCol <- cbind(rep(runType, count(plot_data)))
plot_data$RunType <- runTypeCol
return(plot_data)
}
plot_both_time <- function(plot_d, title, xLabel, yLabel) {
titleCol <- rep(title, count(plot_d))
plot_d$Title <- titleCol
ggplot(data = plot_d, aes(x = Size, y = TimeMedian, color = RunType, shape = RunType)) +
geom_point(size = 2.5) +
geom_line() +
scale_x_continuous(breaks=plot_d$Size, trans="log2",
labels = function(x) ifelse((x/1024)>=1,
paste(round(x/1024,1), "gb"),
paste(x, "mb"))) +
theme_bw() +
scale_shape_manual(values=c(17,17,15,15)) +
theme(panel.grid.major = element_blank(), panel.grid.minor = element_blank(),
plot.title = element_blank(),
#axis.title.x = element_blank(),
axis.title.x = element_text(margin = margin(t=10)),
plot.margin=margin(r=20),
axis.text=element_text(size=9),
axis.title=element_text(size=10),
strip.text = element_text(size = 12, face = "bold"),
legend.text=element_text(size=10),
legend.position="left",
legend.title = element_blank()) +
facet_grid(. ~ Title, switch = "y") +
labs(x = xLabel, y = yLabel) -> p
return(p)
}
plot_vs_time <- function(plot_d, title, xLabel, yLabel) {
titleCol <- rep(title, count(plot_d))
plot_d$Title <- titleCol
ggplot(data = plot_d, aes(x = Size, y = TimeMedian, color = RunType, shape = RunType)) +
geom_point(size = 2.5) +
geom_line() +
scale_x_continuous(breaks=plot_d$Size, trans="log2",
labels = function(x) paste(x, "mb")) +
theme_bw() +
scale_shape_manual(values=c(17,15)) +
theme(panel.grid.major = element_blank(), panel.grid.minor = element_blank(),
plot.title = element_blank(),
#axis.title.x = element_blank(),
axis.title.x = element_text(margin = margin(t=10)),
axis.text=element_text(size=9),
plot.margin=margin(l=20),
axis.title=element_text(size=10),
strip.text = element_text(size = 12, face = "bold"),
legend.text=element_text(size=10),
legend.title = element_blank(),
legend.position="right") +
facet_grid(. ~ Title, switch = "y") +
labs(x = xLabel, y = yLabel) -> p
return(p)
}
```
## Optimization 2: Broadcast Join
**Hypothesis**: Out of two RDDs on which join is to be performed, if one RDD is much smaller than the other and it can fit in memory, then broadcasting the small RDD to all the nodes and then performing map-side join reduces the amount of data being shuffled and improves join performance.
```{r cache=TRUE,echo=F, results = 'asis', warning=F, message=F}
data_path <- 'benchmarks/output/stats/'
big_bj_exec <- read.csv(paste0(data_path, 'vb_big_bj_exec'), sep = ";", header = FALSE)
big_shuffle_exec <- read.csv(paste0(data_path, 'vb_big_shuffle_exec'), sep = ";", header = FALSE)
rs_bj_exec <- read.csv(paste0(data_path, 'vb_rs_bj_exec'), sep = ";", header = FALSE)
rs_shuffle_exec <- read.csv(paste0(data_path, 'vb_rs_shuffle_exec'), sep = ";", header = FALSE)
big_bj_exec <- raw_to_plot_data_exec(big_bj_exec, "BB: BJ", 128)
big_shuffle_exec <- raw_to_plot_data_exec(big_shuffle_exec, "BB: Shuffle", 128)
rs_bj_exec <- raw_to_plot_data_exec(rs_bj_exec, "BS: BJ", 128)
rs_shuffle_exec <- raw_to_plot_data_exec(rs_shuffle_exec, "BS: Shuffle", 128)
vs_rs_bj_exec <- read.csv(paste0(data_path, 'vs_rs_bj_exec'), sep = ";", header = FALSE)
vs_rs_shuffle_exec <- read.csv(paste0(data_path, 'vs_rs_shuffle_exec'), sep = ";", header = FALSE)
vs_rs_bj_exec <- raw_to_plot_data_exec(vs_rs_bj_exec, "BJ", 1)
vs_rs_shuffle_exec <- raw_to_plot_data_exec(vs_rs_shuffle_exec, "Shuffle", 1)
```
### Approach
Spark's join function by default performs a reduce-side join. Meaning, it shuffles records from both target RDDs having same keys to one node and then combines their data. But in situations where one of the RDDs is much smaller, this smaller RDD can be grouped and broadcast to all the nodes. Once the smaller RDD is transferred to each node, both RDDs can be joined by adding map stages. This eliminates an extra reduce step that is required in reduce-side joins. Although, broadcast join includes sending data to all the nodes upfront, it saves the bigger RDD from being shuffled. This is equivalent to performing a map-side join instead of spark's reduce-side join.
The key components in this proposed approach are:
**1. RDD Size Estimation**
Estimating the size of target RDDs is instrumental in performing broadcast join. We have implemented a size estimator for RDDs that uses the following formula:
$estimated\_size = size(N) \times R / N$
Where $R$ is total number of rows in the RDD, and $size(N)$ gives total size of $N$ sample rows fetched from each partition.
Since this is an heuristic approach, it is prone to underestimation when rows contain variable number of columns. In such cases broadcasting an RDD might result in ungraceful job failure. Therefore, we keep the estimator configurable for the user, so that user can provide a custom size estimator according to data.
Moreover, broadcast join requires that the small RDD can fit in memory on all the nodes. We assumed this property to be a configuration setting of a cluster and we kept memory threshold configurable as well. If size of an RDD obtained using the given estimator is smaller than the memory threshold, we perform broadcast join; otherwise Spark's default shuffle-join is performed.
**2. Performing map-side join**
If one of the target RDDs is determined to be smaller than threshold by Size Estimator, we groupByKey and broadcast it to all nodes to be cached. Once cached, we only need to map through the bigger RDD to lookup its keys in the cache to join records. This in turn eliminates shuffling of bigger RDD and a reduce stage.
```{r cache=TRUE,echo=F, results = 'asis', warning=F, message=F, fig.align="center", fig.width=12, fig.height=2.8}
big_plot_d <- union(big_bj_exec, big_shuffle_exec)
r_plot_d <- union(rs_bj_exec, rs_shuffle_exec)
big_delta <- subset(big_bj_exec, select = -c(TimeMean, Count))
r_delta <- subset(rs_bj_exec, select = -c(TimeMean, Count))
big_delta$TimeMedian <- ((big_shuffle_exec$TimeMedian - big_bj_exec$TimeMedian) / big_shuffle_exec$TimeMedian)*100
r_delta$TimeMedian <- ((rs_shuffle_exec$TimeMedian - rs_bj_exec$TimeMedian) / rs_shuffle_exec$TimeMedian)*100
plot_both_time_d <- union(big_plot_d, r_plot_d)
plot_both_delta_d <- union(big_delta, r_delta)
plot_vs_time_d <- union(vs_rs_bj_exec, vs_rs_shuffle_exec)
bothTimePlot <- plot_both_time(plot_both_time_d, "a) Fixed Size Small RDD", "Big RDD Size", "Time (sec)")
vsTimePlot <- plot_vs_time(plot_vs_time_d, "b) Fixed Size Big RDD", "Small RDD Size", "Time (sec)")
grid.arrange(bothTimePlot, vsTimePlot, ncol=2, widths=c(1.3, 1), bottom="Figure 4: Broadcast Join Benchmarks")
```
**Benchmarks**
Figure 4 shows benchmark results for our broadcast join implementation for varying sizes of both small RDD (smaller than threshold) and big RDD. The varying RDD size is shown on X-axis and the other RDD is kept constant. Y-axis depicts the execution time taken to perform join.
**Fixed size small RDD**
To measure the impact of increasing size of bigger RDD on broadcast join, we keep the size of small RDD constant and vary big RDD. Figure 4(a) shows execution times of joins for this experiment in two different scenarios.
The **first scenario** is when one RDD is small and the other RDD is big marked as squares. `BS:Shuffle` shows the execution time for shuffle RDD join in this scenario for varying sizes of big RDD. `BS:BJ` similarly shows the execution time for broadcast join. This is an ideal scenario for a broadcast join. Therefore, as the size of the big RDD increases, broadcast join yields better performance. This is because, shuffle join has to send more data over network as the size of big RDD increases. We observed about $50\%$ of improvement in performance for broadcast join of small RDD to big RDD.
The **second scenario** is less ideal for broadcast join. In this scenario, both RDDs are big and therefore can't be broadcast. The execution times for this scenario are marked as triangles in Fig 4(a), where `BB:BJ` means execution times for broadcast join and `BB:Shuffle` means execution times for shuffle joins. Here, as the size of big RDD increases, shuffle join yields better performance. We believe that this is because of the estimation overhead incurred by the size estimator. We observed about $50\%$ of decrease in performance for broadcast join of small RDD to big RDD.
**Fixed size big RDD**
To understand the ideal broadcast size for small RDD, we keep big RDD constant and vary the size of small RDD along with broadcast memory threshold. We plot the execution time of join operation for different sizes of small RDD. Figure 4(b) shows execution times for this set up. Here, broadcast join times are marked as triangles and shuffle join times are marked as squares. With our local configuration, we observed that once the size of broadcast RDD increases more than $256 mb$, performance gains obtained using broadcast join starts to diminish.
# Conclusion
1. Why doesn't Spark perform column pruning on RDD joins? We started our project with this simple question. We found that for doing any optimization like this Spark needs access to source code and recompile lambdas and/or functions to an optimal code.
1. We also found Scala compiler plugin could prove to be a very useful Spark companion in optimizing job and with considerable shuffle saves for poorly written jobs. Provided appropriate documentations become available for Scala compiler plugin for new developers to get onboarded.
1. Broadcast Join seems to outperform shuffle join when one RDD is smaller than $256 mb$ in our tests. It could prove very useful in case of unknown RDD sizes, when actual RDD size is suspected to be small while writing the job.
# Future Work
1. To overcome restriction to view the source code. We would like to explore analyzing generated source from Byte Code decompiler. We could then move this whole logic to spark DAG creation as initially planned.