From 93dfb7f91d224f5fd410e541d0668fb693abca9d Mon Sep 17 00:00:00 2001 From: xsalefter Date: Fri, 31 Oct 2025 07:19:41 +0700 Subject: [PATCH] add initialDelay to SSE scheduler to give server's handshake processing time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit LogsSseHandler scheduled its drain loop with an initial delay of zero, so the background thread could publish events while the servlet container was still completing the SSE handshake. In that window the channel remains in its blocking phase (asyncStarted could be `true`, but the response isn’t committed yet), and the container raises exception (IllegalStateException: api=BLOCKED in Jetty) when we write. --- .../billing/osgi/bundles/logger/LogsSseHandler.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/osgi-bundles/bundles/logger/src/main/java/org/killbill/billing/osgi/bundles/logger/LogsSseHandler.java b/osgi-bundles/bundles/logger/src/main/java/org/killbill/billing/osgi/bundles/logger/LogsSseHandler.java index 812eee2ff..c2e90d36a 100644 --- a/osgi-bundles/bundles/logger/src/main/java/org/killbill/billing/osgi/bundles/logger/LogsSseHandler.java +++ b/osgi-bundles/bundles/logger/src/main/java/org/killbill/billing/osgi/bundles/logger/LogsSseHandler.java @@ -34,6 +34,9 @@ public class LogsSseHandler implements Sse.Handler, Closeable { + private static final long INITIAL_DELAY_MS = 200; + private static final long PERIOD_MS = 1000; + private final LogEntriesManager logEntriesManager; private final ScheduledExecutorService scheduledExecutorService; @@ -54,7 +57,7 @@ public void handle(final Request req, final Sse sse) { final UUID cacheId = UUID.fromString(sse.id()); logEntriesManager.subscribe(cacheId, lastEventId); - final AtomicReference lastLogId = new AtomicReference(); + final AtomicReference lastLogId = new AtomicReference<>(); final ScheduledFuture future = scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { @@ -71,7 +74,7 @@ public void run() { } } } - }, 0, 1, TimeUnit.SECONDS); + }, INITIAL_DELAY_MS, PERIOD_MS, TimeUnit.MILLISECONDS); sse.onClose(new Throwing.Runnable() { @Override