Skip to content

Commit 54d5340

Browse files
authored
perf(rules): use worker pool to perform Rule execution (#1147)
1 parent ccea852 commit 54d5340

File tree

1 file changed

+52
-52
lines changed

1 file changed

+52
-52
lines changed

src/main/java/io/cryostat/rules/RuleService.java

Lines changed: 52 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.List;
2222
import java.util.Objects;
2323
import java.util.concurrent.BlockingQueue;
24-
import java.util.concurrent.ExecutionException;
2524
import java.util.concurrent.ExecutorService;
2625
import java.util.concurrent.Executors;
2726
import java.util.concurrent.PriorityBlockingQueue;
@@ -77,73 +76,74 @@ public class RuleService {
7776
private final BlockingQueue<ActivationAttempt> activations =
7877
new PriorityBlockingQueue<>(255, Comparator.comparing(t -> t.attempts.get()));
7978
private final ExecutorService activator = Executors.newSingleThreadExecutor();
79+
private final ExecutorService workers = Executors.newVirtualThreadPerTaskExecutor();
8080

81-
void onStart(@Observes StartupEvent ev) throws InterruptedException, ExecutionException {
81+
void onStart(@Observes StartupEvent ev) {
8282
logger.trace("RuleService started");
83-
activator
84-
.submit(
85-
() -> {
86-
for (Rule rule : enabledRules()) {
87-
try {
88-
QuarkusTransaction.requiringNew()
89-
.run(() -> applyRuleToMatchingTargets(rule));
90-
} catch (Exception e) {
91-
logger.error(e);
92-
}
93-
}
94-
})
95-
.get();
83+
activator.submit(
84+
() -> {
85+
for (Rule rule : enabledRules()) {
86+
try {
87+
QuarkusTransaction.requiringNew()
88+
.run(() -> applyRuleToMatchingTargets(rule));
89+
} catch (Exception e) {
90+
logger.error(e);
91+
}
92+
}
93+
});
9694
activator.submit(
9795
() -> {
9896
while (!activator.isShutdown()) {
9997
ActivationAttempt attempt = null;
10098
try {
10199
attempt = activations.take();
102-
logger.tracev(
103-
"Attempting to activate rule \"{0}\" for target {1} -"
104-
+ " attempt #{2}",
105-
attempt.ruleId, attempt.targetId, attempt.attempts);
106-
bus.request(RuleExecutor.class.getName(), attempt)
107-
.await()
108-
.atMost(connectionFailedTimeout);
109-
logger.tracev(
110-
"Activated rule \"{0}\" for target {1}",
111-
attempt.ruleId, attempt.targetId);
112100
} catch (InterruptedException ie) {
113101
logger.trace(ie);
114102
break;
115-
} catch (Exception e) {
116-
if (attempt != null) {
117-
final ActivationAttempt fAttempt = attempt;
118-
int count = attempt.incrementAndGet();
119-
int delay = (int) Math.pow(2, count);
120-
TimeUnit unit = TimeUnit.SECONDS;
121-
int limit = 5;
122-
if (count < limit) {
123-
logger.debugv(
124-
"Rule \"{0}\" activation attempt #{1} for target"
125-
+ " {2} failed, rescheduling in {3}{4} ...",
126-
attempt.ruleId,
127-
count - 1,
128-
attempt.targetId,
129-
delay,
130-
unit);
131-
Infrastructure.getDefaultWorkerPool()
132-
.schedule(() -> activations.add(fAttempt), delay, unit);
133-
} else {
134-
logger.errorv(
135-
"Rule \"{0}\" activation attempt #{1} failed for"
136-
+ " target {2} - limit ({3}) reached! Will not"
137-
+ " retry...",
138-
attempt.ruleId, count, attempt.targetId, limit);
139-
}
140-
}
141-
logger.error(e);
142103
}
104+
final ActivationAttempt fAttempt = attempt;
105+
workers.submit(() -> fireAttemptExecution(fAttempt));
143106
}
144107
});
145108
}
146109

110+
private void fireAttemptExecution(ActivationAttempt fAttempt) {
111+
try {
112+
logger.tracev(
113+
"Attempting to activate rule \"{0}\" for" + " target {1} - attempt #{2}",
114+
fAttempt.ruleId, fAttempt.targetId, fAttempt.attempts);
115+
bus.request(RuleExecutor.class.getName(), fAttempt)
116+
.await()
117+
.atMost(connectionFailedTimeout);
118+
logger.tracev(
119+
"Activated rule \"{0}\" for target {1}", fAttempt.ruleId, fAttempt.targetId);
120+
} catch (Exception e) {
121+
if (fAttempt != null) {
122+
int count = fAttempt.incrementAndGet();
123+
int delay = (int) Math.pow(2, count);
124+
TimeUnit unit = TimeUnit.SECONDS;
125+
int limit = 5;
126+
if (count < limit) {
127+
logger.debugv(
128+
"Rule \"{0}\" activation attempt"
129+
+ " #{1} for target {2} failed,"
130+
+ " rescheduling in {3}{4} ...",
131+
fAttempt.ruleId, count - 1, fAttempt.targetId, delay, unit);
132+
Infrastructure.getDefaultWorkerPool()
133+
.schedule(() -> activations.add(fAttempt), delay, unit);
134+
} else {
135+
logger.errorv(
136+
"Rule \"{0}\" activation attempt"
137+
+ " #{1} failed for target {2}"
138+
+ " - limit ({3}) reached! Will"
139+
+ " not retry...",
140+
fAttempt.ruleId, count, fAttempt.targetId, limit);
141+
}
142+
}
143+
logger.error(e);
144+
}
145+
}
146+
147147
void onStop(@Observes ShutdownEvent evt) throws SchedulerException {
148148
activator.shutdown();
149149
activations.clear();

0 commit comments

Comments
 (0)