Skip to content

Commit 605499c

Browse files
committed
[ZEPPELIN-6340] Add ZeppelinEventBus
- Add ZeppelinEventBus - Update NotebookServer to handle NoteRemoveEvent using EventBus
1 parent ae68cf5 commit 605499c

File tree

19 files changed

+332
-30
lines changed

19 files changed

+332
-30
lines changed

conf/zeppelin-site.xml.template

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,4 +824,10 @@
824824
<description>fields to be excluded from being saved in note files, with Paragraph prefix mean the fields in Paragraph, e.g. Paragraph.results</description>
825825
</property>
826826

827+
<property>
828+
<name>zeppelin.eventbus.enabled</name>
829+
<value>false</value>
830+
<description>Enables the new event-driven architecture using an in-process EventBus</description>
831+
</property>
832+
827833
</configuration>

zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -895,6 +895,8 @@ public boolean isPrometheusMetricEnabled() {
895895
return getBoolean(ConfVars.ZEPPELIN_METRIC_ENABLE_PROMETHEUS);
896896
}
897897

898+
public boolean isEventBusEnabled() { return getBoolean(ConfVars.ZEPPELIN_EVENTBUS_ENABLED); }
899+
898900
public DEFAULT_UI getDefaultUi() {
899901
return DEFAULT_UI.valueOf(getString(ConfVars.ZEPPELIN_DEFAULT_UI).toUpperCase());
900902
}
@@ -1131,7 +1133,8 @@ public enum ConfVars {
11311133
ZEPPELIN_SPARK_ONLY_YARN_CLUSTER("zeppelin.spark.only_yarn_cluster", false),
11321134
ZEPPELIN_SESSION_CHECK_INTERVAL("zeppelin.session.check_interval", 60 * 10 * 1000),
11331135
ZEPPELIN_NOTE_CACHE_THRESHOLD("zeppelin.note.cache.threshold", 50),
1134-
ZEPPELIN_NOTE_FILE_EXCLUDE_FIELDS("zeppelin.note.file.exclude.fields", "");
1136+
ZEPPELIN_NOTE_FILE_EXCLUDE_FIELDS("zeppelin.note.file.exclude.fields", ""),
1137+
ZEPPELIN_EVENTBUS_ENABLED("zeppelin.eventbus.enabled", false);
11351138

11361139
private String varName;
11371140
private Class<?> varClass;

zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@
6464
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
6565
import org.apache.zeppelin.conf.ZeppelinConfiguration.DEFAULT_UI;
6666
import org.apache.zeppelin.display.AngularObjectRegistryListener;
67+
import org.apache.zeppelin.event.EventBus;
68+
import org.apache.zeppelin.event.NoOpEventBus;
69+
import org.apache.zeppelin.event.ZeppelinEventBus;
6770
import org.apache.zeppelin.healthcheck.HealthChecks;
6871
import org.apache.zeppelin.helium.ApplicationEventListener;
6972
import org.apache.zeppelin.helium.Helium;
@@ -178,6 +181,11 @@ protected void configure() {
178181
bind(storage).to(ConfigStorage.class);
179182
bindAsContract(PluginManager.class).in(Singleton.class);
180183
bind(GsonNoteParser.class).to(NoteParser.class).in(Singleton.class);
184+
if (zConf.isEventBusEnabled()) {
185+
bind(ZeppelinEventBus.class).to(EventBus.class).in(Singleton.class);
186+
} else {
187+
bind(NoOpEventBus.class).to(EventBus.class).in(Singleton.class);
188+
}
181189
bindAsContract(InterpreterFactory.class).in(Singleton.class);
182190
bindAsContract(NotebookRepoSync.class).to(NotebookRepo.class).in(Singleton.class);
183191
bindAsContract(Helium.class).in(Singleton.class);

zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import java.util.concurrent.ExecutorService;
4141
import java.util.concurrent.Executors;
4242
import java.util.concurrent.atomic.AtomicReference;
43+
import io.reactivex.rxjava3.disposables.Disposable;
44+
import jakarta.annotation.PreDestroy;
4345
import jakarta.inject.Inject;
4446
import jakarta.inject.Provider;
4547
import jakarta.websocket.CloseReason;
@@ -62,6 +64,9 @@
6264
import org.apache.zeppelin.display.AngularObjectRegistryListener;
6365
import org.apache.zeppelin.display.GUI;
6466
import org.apache.zeppelin.display.Input;
67+
import org.apache.zeppelin.event.EventBus;
68+
import org.apache.zeppelin.event.NoteEvent;
69+
import org.apache.zeppelin.event.NoteRemoveEvent;
6570
import org.apache.zeppelin.helium.ApplicationEventListener;
6671
import org.apache.zeppelin.helium.HeliumPackage;
6772
import org.apache.zeppelin.interpreter.InterpreterGroup;
@@ -156,6 +161,8 @@ String getKey() {
156161
private AuthorizationService authorizationService;
157162
private Provider<ConfigurationService> configurationServiceProvider;
158163
private Provider<JobManagerService> jobManagerServiceProvider;
164+
private Disposable disposable;
165+
159166

160167
public NotebookServer() {
161168
NotebookServer.self.set(this);
@@ -167,6 +174,26 @@ public void setZeppelinConfiguration(ZeppelinConfiguration zConf) {
167174
this.zConf = zConf;
168175
}
169176

177+
@Inject
178+
public void registerEventBus(EventBus eventBus, ZeppelinConfiguration zConf) {
179+
if (!zConf.isEventBusEnabled()) {
180+
LOGGER.debug("ZeppelinEventBus is disabled");
181+
return;
182+
}
183+
184+
this.disposable = eventBus.observe(NoteEvent.class)
185+
.subscribe(this::handleNoteEvent);
186+
}
187+
188+
@PreDestroy
189+
public void cleanup() {
190+
if (disposable != null && !disposable.isDisposed()) {
191+
disposable.dispose();
192+
}
193+
executorService.shutdown();
194+
}
195+
196+
170197
@Inject
171198
public void setNoteParser(Provider<NoteParser> noteParser) {
172199
this.noteParser = noteParser;
@@ -1876,19 +1903,12 @@ public void onParagraphRemove(Paragraph p) {
18761903

18771904
@Override
18781905
public void onNoteRemove(Note note, AuthenticationInfo subject) {
1879-
try {
1880-
broadcastUpdateNoteJobInfo(note, System.currentTimeMillis() - 5000);
1881-
} catch (IOException e) {
1882-
LOGGER.warn("can not broadcast for job manager: {}", e.getMessage(), e);
1883-
}
1884-
1885-
try {
1886-
getJobManagerService().removeNoteJobInfo(note.getId(), null,
1887-
new JobManagerServiceCallback());
1888-
} catch (IOException e) {
1889-
LOGGER.warn("can not broadcast for job manager: {}", e.getMessage(), e);
1906+
if (zConf.isEventBusEnabled()) {
1907+
LOGGER.debug("ZeppelinEventBus is enabed");
1908+
return;
18901909
}
18911910

1911+
handleNoteRemove(note);
18921912
}
18931913

18941914
@Override
@@ -2330,4 +2350,28 @@ public void onFailure(Exception ex, ServiceContext context) throws IOException {
23302350
}
23312351
}
23322352
}
2353+
2354+
private void handleNoteEvent(NoteEvent event) {
2355+
if (event instanceof NoteRemoveEvent) {
2356+
Note note = event.getNote();
2357+
handleNoteRemove(note);
2358+
} else {
2359+
LOGGER.warn("Unknown event type: {}", event.getClass().getName());
2360+
}
2361+
}
2362+
2363+
private void handleNoteRemove(Note note) {
2364+
try {
2365+
broadcastUpdateNoteJobInfo(note, System.currentTimeMillis() - 5000);
2366+
} catch (IOException e) {
2367+
LOGGER.warn("can not broadcast for job manager: {}", e.getMessage(), e);
2368+
}
2369+
2370+
try {
2371+
getJobManagerService().removeNoteJobInfo(note.getId(), null,
2372+
new JobManagerServiceCallback());
2373+
} catch (IOException e) {
2374+
LOGGER.warn("can not broadcast for job manager: {}", e.getMessage(), e);
2375+
}
2376+
}
23332377
}

zeppelin-server/src/test/java/org/apache/zeppelin/MiniZeppelinServer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public MiniZeppelinServer(String classname, String zeppelinConfiguration) throws
8989
zConf = ZeppelinConfiguration.load(zeppelinConfiguration);
9090
zConf.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(),
9191
zeppelinHome.getAbsoluteFile().toString());
92+
zConf.setProperty(ConfVars.ZEPPELIN_EVENTBUS_ENABLED.getVarName(), "true");
9293
Optional<File> webWar = getWebWar();
9394
Optional<File> webAngularWar = getWebAngularWar();
9495
if (webWar.isPresent()) {

zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646

4747
import org.apache.commons.lang3.StringUtils;
4848
import org.apache.zeppelin.conf.ZeppelinConfiguration;
49+
import org.apache.zeppelin.event.EventBus;
50+
import org.apache.zeppelin.event.ZeppelinEventBus;
4951
import org.apache.zeppelin.interpreter.Interpreter;
5052
import org.apache.zeppelin.interpreter.Interpreter.FormType;
5153
import org.apache.zeppelin.interpreter.InterpreterFactory;
@@ -102,6 +104,7 @@ void setUp(TestInfo testInfo) throws Exception {
102104
ZeppelinConfiguration zConf = ZeppelinConfiguration.load();
103105
zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(),
104106
notebookDir.getAbsolutePath());
107+
zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_EVENTBUS_ENABLED.getVarName(), "true");
105108
// enable cron for testNoteUpdate method
106109
if ("testNoteUpdate()".equals(testInfo.getDisplayName())){
107110
confDir = Files.createTempDirectory("confDir").toAbsolutePath().toFile();
@@ -138,6 +141,7 @@ void setUp(TestInfo testInfo) throws Exception {
138141
NoteManager noteManager = new NoteManager(notebookRepo, zConf);
139142
AuthorizationService authorizationService =
140143
new AuthorizationService(noteManager, zConf, storage);
144+
EventBus eventBus = new ZeppelinEventBus();
141145
notebook =
142146
new Notebook(
143147
zConf,
@@ -147,7 +151,7 @@ void setUp(TestInfo testInfo) throws Exception {
147151
mockInterpreterFactory,
148152
mockInterpreterSettingManager,
149153
credentials,
150-
null);
154+
eventBus);
151155
searchService = new LuceneSearch(zConf, notebook);
152156
QuartzSchedulerService schedulerService = new QuartzSchedulerService(zConf, notebook);
153157
notebook.initNotebook();

zeppelin-zengine/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,12 @@
291291
<groupId>org.apache.hadoop</groupId>
292292
<artifactId>hadoop-client-runtime</artifactId>
293293
</dependency>
294+
295+
<dependency>
296+
<groupId>io.reactivex.rxjava3</groupId>
297+
<artifactId>rxjava</artifactId>
298+
<version>3.1.10</version>
299+
</dependency>
294300
</dependencies>
295301

296302
<build>
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package org.apache.zeppelin.event;
2+
3+
import io.reactivex.rxjava3.core.Observable;
4+
5+
public interface EventBus {
6+
7+
void post(Object event);
8+
9+
<T> Observable<T> observe(Class<T> eventType);
10+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.apache.zeppelin.event;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import io.reactivex.rxjava3.core.Observable;
6+
import jakarta.inject.Inject;
7+
8+
public class NoOpEventBus implements EventBus {
9+
10+
private static final Logger LOGGER = LoggerFactory.getLogger(NoOpEventBus.class);
11+
12+
@Inject
13+
public NoOpEventBus() {
14+
LOGGER.info("Starting NoOpEventBus");
15+
}
16+
17+
@Override
18+
public void post(Object event) {
19+
LOGGER.debug("Posting event: {}", event.getClass().getName());
20+
}
21+
22+
@Override
23+
public <T> io.reactivex.rxjava3.core.Observable<T> observe(Class<T> eventType) {
24+
return Observable.empty();
25+
}
26+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.apache.zeppelin.event;
2+
3+
import org.apache.zeppelin.notebook.Note;
4+
import org.apache.zeppelin.user.AuthenticationInfo;
5+
6+
public interface NoteEvent {
7+
8+
Note getNote();
9+
10+
AuthenticationInfo getSubject();
11+
}

0 commit comments

Comments
 (0)