Skip to content

Commit be0abe4

Browse files
committed
chore(rules): use worker pool to perform Rule execution
1 parent e1239ff commit be0abe4

File tree

1 file changed

+52
-52
lines changed

1 file changed

+52
-52
lines changed

src/main/java/io/cryostat/rules/RuleService.java

Lines changed: 52 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.List;
2222
import java.util.Objects;
2323
import java.util.concurrent.BlockingQueue;
24-
import java.util.concurrent.ExecutionException;
2524
import java.util.concurrent.ExecutorService;
2625
import java.util.concurrent.Executors;
2726
import java.util.concurrent.PriorityBlockingQueue;
@@ -78,72 +77,73 @@ public class RuleService {
7877
new PriorityBlockingQueue<>(255, Comparator.comparing(t -> t.attempts.get()));
7978
private final ExecutorService activator = Executors.newSingleThreadExecutor();
8079

81-
void onStart(@Observes StartupEvent ev) throws InterruptedException, ExecutionException {
80+
void onStart(@Observes StartupEvent ev) {
8281
logger.trace("RuleService started");
83-
activator
84-
.submit(
85-
() -> {
86-
for (Rule rule : enabledRules()) {
87-
try {
88-
QuarkusTransaction.requiringNew()
89-
.run(() -> applyRuleToMatchingTargets(rule));
90-
} catch (Exception e) {
91-
logger.error(e);
92-
}
93-
}
94-
})
95-
.get();
82+
activator.submit(
83+
() -> {
84+
for (Rule rule : enabledRules()) {
85+
try {
86+
QuarkusTransaction.requiringNew()
87+
.run(() -> applyRuleToMatchingTargets(rule));
88+
} catch (Exception e) {
89+
logger.error(e);
90+
}
91+
}
92+
});
9693
activator.submit(
9794
() -> {
9895
while (!activator.isShutdown()) {
9996
ActivationAttempt attempt = null;
10097
try {
10198
attempt = activations.take();
102-
logger.tracev(
103-
"Attempting to activate rule \"{0}\" for target {1} -"
104-
+ " attempt #{2}",
105-
attempt.ruleId, attempt.targetId, attempt.attempts);
106-
bus.request(RuleExecutor.class.getName(), attempt)
107-
.await()
108-
.atMost(connectionFailedTimeout);
109-
logger.tracev(
110-
"Activated rule \"{0}\" for target {1}",
111-
attempt.ruleId, attempt.targetId);
11299
} catch (InterruptedException ie) {
113100
logger.trace(ie);
114101
break;
115-
} catch (Exception e) {
116-
if (attempt != null) {
117-
final ActivationAttempt fAttempt = attempt;
118-
int count = attempt.incrementAndGet();
119-
int delay = (int) Math.pow(2, count);
120-
TimeUnit unit = TimeUnit.SECONDS;
121-
int limit = 5;
122-
if (count < limit) {
123-
logger.debugv(
124-
"Rule \"{0}\" activation attempt #{1} for target"
125-
+ " {2} failed, rescheduling in {3}{4} ...",
126-
attempt.ruleId,
127-
count - 1,
128-
attempt.targetId,
129-
delay,
130-
unit);
131-
Infrastructure.getDefaultWorkerPool()
132-
.schedule(() -> activations.add(fAttempt), delay, unit);
133-
} else {
134-
logger.errorv(
135-
"Rule \"{0}\" activation attempt #{1} failed for"
136-
+ " target {2} - limit ({3}) reached! Will not"
137-
+ " retry...",
138-
attempt.ruleId, count, attempt.targetId, limit);
139-
}
140-
}
141-
logger.error(e);
142102
}
103+
final ActivationAttempt fAttempt = attempt;
104+
Infrastructure.getDefaultWorkerPool()
105+
.submit(() -> fireAttemptExecution(fAttempt));
143106
}
144107
});
145108
}
146109

110+
private void fireAttemptExecution(ActivationAttempt fAttempt) {
111+
try {
112+
logger.tracev(
113+
"Attempting to activate rule \"{0}\" for" + " target {1} - attempt #{2}",
114+
fAttempt.ruleId, fAttempt.targetId, fAttempt.attempts);
115+
bus.request(RuleExecutor.class.getName(), fAttempt)
116+
.await()
117+
.atMost(connectionFailedTimeout);
118+
logger.tracev(
119+
"Activated rule \"{0}\" for target {1}", fAttempt.ruleId, fAttempt.targetId);
120+
} catch (Exception e) {
121+
if (fAttempt != null) {
122+
int count = fAttempt.incrementAndGet();
123+
int delay = (int) Math.pow(2, count);
124+
TimeUnit unit = TimeUnit.SECONDS;
125+
int limit = 5;
126+
if (count < limit) {
127+
logger.debugv(
128+
"Rule \"{0}\" activation attempt"
129+
+ " #{1} for target {2} failed,"
130+
+ " rescheduling in {3}{4} ...",
131+
fAttempt.ruleId, count - 1, fAttempt.targetId, delay, unit);
132+
Infrastructure.getDefaultWorkerPool()
133+
.schedule(() -> activations.add(fAttempt), delay, unit);
134+
} else {
135+
logger.errorv(
136+
"Rule \"{0}\" activation attempt"
137+
+ " #{1} failed for target {2}"
138+
+ " - limit ({3}) reached! Will"
139+
+ " not retry...",
140+
fAttempt.ruleId, count, fAttempt.targetId, limit);
141+
}
142+
}
143+
logger.error(e);
144+
}
145+
}
146+
147147
void onStop(@Observes ShutdownEvent evt) throws SchedulerException {
148148
activator.shutdown();
149149
activations.clear();

0 commit comments

Comments
 (0)