From 171fa5994a53bf9ce2b76edec5b230fdab2dfd47 Mon Sep 17 00:00:00 2001 From: Talha Sohail Date: Wed, 22 Oct 2025 22:59:27 +1100 Subject: [PATCH 1/3] FIFO TimedSemaphore --- .../lang3/concurrent/TimedSemaphore.java | 115 ++++++++++++------ 1 file changed, 78 insertions(+), 37 deletions(-) diff --git a/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java b/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java index 941f403997f..62b59378681 100644 --- a/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java +++ b/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java @@ -17,10 +17,7 @@ package org.apache.commons.lang3.concurrent; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.function.Supplier; import org.apache.commons.lang3.Validate; @@ -123,12 +120,12 @@ public static class Builder implements Supplier { private long period; private TimeUnit timeUnit; private int limit; + private boolean fair = false; /** * Constructs a new Builder. */ public Builder() { - // empty } @Override @@ -179,6 +176,17 @@ public Builder setTimeUnit(final TimeUnit timeUnit) { this.timeUnit = timeUnit; return this; } + + /** + * ADDED: Enables or disables FIFO fairness + * + * @param fair whether to enable fairness + * @return {@code this} instance. + */ + public Builder setFair(final boolean fair) { + this.fair = fair; + return this; + } } /** @@ -190,6 +198,7 @@ public Builder setTimeUnit(final TimeUnit timeUnit) { /** Constant for the thread pool size for the executor. */ private static final int THREAD_POOL_SIZE = 1; + /** * Constructs a new Builder. * @@ -235,10 +244,18 @@ public static Builder builder() { /** A flag whether shutdown() was called. */ private boolean shutdown; // @GuardedBy("this") + /** Fairness mode checker */ + private final boolean fair; + + /** Backing semaphore that enforces blocking and (optionally) FIFO fairness. */ + private final Semaphore semaphore; + + private TimedSemaphore(final Builder builder) { Validate.inclusiveBetween(1, Long.MAX_VALUE, builder.period, "Time period must be greater than 0."); period = builder.period; unit = builder.timeUnit; + fair = builder.fair; if (builder.service != null) { executorService = builder.service; ownExecutor = false; @@ -250,6 +267,7 @@ private TimedSemaphore(final Builder builder) { ownExecutor = true; } setLimit(builder.limit); + semaphore = new Semaphore(Math.max(0, this.limit), fair); } /** @@ -264,6 +282,7 @@ private TimedSemaphore(final Builder builder) { @Deprecated public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) { this(null, timePeriod, timeUnit, limit); + } /** @@ -290,41 +309,48 @@ public TimedSemaphore(final ScheduledExecutorService service, final long timePer * @throws InterruptedException if the thread gets interrupted. * @throws IllegalStateException if this semaphore is already shut down. */ - public synchronized void acquire() throws InterruptedException { + public void acquire() throws InterruptedException { prepareAcquire(); - boolean canPass; - do { - canPass = acquirePermit(); - if (!canPass) { - wait(); + if (getLimit() <= NO_LIMIT) { + synchronized (this) { + acquireCount++; + } + return; + } + if (semaphore.tryAcquire()) { + synchronized (this) { + acquireCount++; + } + return; + } + for (;;) { + synchronized (this) { + if (semaphore.tryAcquire()) { + acquireCount++; + return; + } + this.wait(); } - } while (!canPass); - } - - /** - * Internal helper method for acquiring a permit. This method checks whether currently a permit can be acquired and - if so - increases the internal - * counter. The return value indicates whether a permit could be acquired. This method must be called with the lock of this object held. - * - * @return a flag whether a permit could be acquired. - */ - private boolean acquirePermit() { - if (getLimit() <= NO_LIMIT || acquireCount < getLimit()) { - acquireCount++; - return true; } - return false; } /** * The current time period is finished. This method is called by the timer used internally to monitor the time period. It resets the counter and releases * the threads waiting for this barrier. */ - synchronized void endOfPeriod() { + void endOfPeriod() { lastCallsPerPeriod = acquireCount; totalAcquireCount += acquireCount; periodCount++; acquireCount = 0; - notifyAll(); + final int avail = semaphore.availablePermits(); + final int toRelease = Math.max(0, getLimit() - avail); + if (toRelease > 0) { + semaphore.release(toRelease); + } + synchronized (this) { + this.notifyAll(); + } } /** @@ -420,14 +446,15 @@ public synchronized boolean isShutdown() { * object held. */ private void prepareAcquire() { - if (isShutdown()) { - throw new IllegalStateException("TimedSemaphore is shut down!"); - } - if (task == null) { - task = startTimer(); + synchronized (this) { + if (isShutdown()) { + throw new IllegalStateException("TimedSemaphore is shut down!"); + } + if (task == null) { + task = startTimer(); + } } } - /** * Sets the limit. This is the number of times the {@link #acquire()} method can be called within the time period specified. If this limit is reached, * further invocations of {@link #acquire()} will block. Setting the limit to a value <= {@link #NO_LIMIT} will cause the limit to be disabled, i.e. an @@ -437,6 +464,13 @@ private void prepareAcquire() { */ public final synchronized void setLimit(final int limit) { this.limit = limit; + if (semaphore != null) { + final int target = Math.max(0, limit); + final int drained = semaphore.drainPermits(); + if (target > 0) { + semaphore.release(target); + } + } } /** @@ -446,8 +480,6 @@ public final synchronized void setLimit(final int limit) { public synchronized void shutdown() { if (!shutdown) { if (ownExecutor) { - // if the executor was created by this instance, it has - // to be shutdown getExecutorService().shutdownNow(); } if (task != null) { @@ -475,8 +507,17 @@ protected ScheduledFuture startTimer() { * @throws IllegalStateException if this semaphore is already shut down. * @since 3.5 */ - public synchronized boolean tryAcquire() { + public boolean tryAcquire() { prepareAcquire(); - return acquirePermit(); + if (getLimit() <= NO_LIMIT) { + synchronized (this) { acquireCount++; } + return true; + } + + final boolean ok = semaphore.tryAcquire(); + if (ok) { + synchronized (this) { acquireCount++; } + } + return ok; } } From f87f15693ff4851c4ebeed56b4b728e28aa3f47d Mon Sep 17 00:00:00 2001 From: Talha Sohail Date: Sun, 26 Oct 2025 19:32:08 +1100 Subject: [PATCH 2/3] TimedSemaphore adjustments + tests --- .../lang3/concurrent/TimedSemaphore.java | 19 +- .../lang3/concurrent/TimedSemaphoreTest.java | 215 ++++++++++++++++++ 2 files changed, 229 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java b/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java index 62b59378681..463a8e81e19 100644 --- a/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java +++ b/src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java @@ -17,7 +17,11 @@ package org.apache.commons.lang3.concurrent; -import java.util.concurrent.*; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.function.Supplier; import org.apache.commons.lang3.Validate; @@ -126,6 +130,7 @@ public static class Builder implements Supplier { * Constructs a new Builder. */ public Builder() { + // empty } @Override @@ -255,7 +260,7 @@ private TimedSemaphore(final Builder builder) { Validate.inclusiveBetween(1, Long.MAX_VALUE, builder.period, "Time period must be greater than 0."); period = builder.period; unit = builder.timeUnit; - fair = builder.fair; + this.fair = builder.fair; if (builder.service != null) { executorService = builder.service; ownExecutor = false; @@ -464,11 +469,15 @@ private void prepareAcquire() { */ public final synchronized void setLimit(final int limit) { this.limit = limit; + if (semaphore != null) { final int target = Math.max(0, limit); - final int drained = semaphore.drainPermits(); - if (target > 0) { - semaphore.release(target); + final int used = Math.max(0, acquireCount); + final int toSeed = Math.max(0, target - used); + + semaphore.drainPermits(); + if (toSeed > 0) { + semaphore.release(toSeed); } } } diff --git a/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java b/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java index 6cd28442651..f78eca6e27a 100644 --- a/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java +++ b/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java @@ -527,4 +527,219 @@ void testTryAcquireAfterShutdown() { semaphore.shutdown(); assertThrows(IllegalStateException.class, semaphore::tryAcquire); } + + /** + * A thread that records acquisition order by adding its id to a shared queue once acquire() returns. + */ + private static final class FairOrderThread extends Thread { + + private final TimedSemaphore semaphore; + private final String id; + private final java.util.concurrent.BlockingQueue order; + + FairOrderThread(final TimedSemaphore semaphore, final String id, final java.util.concurrent.BlockingQueue order) { + this.semaphore = semaphore; + this.id = id; + this.order = order; + setName("FairOrderThread-" + id); + } + + @Override + public void run() { + try { + semaphore.acquire(); + order.add(id); + } catch (final InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Verifies FIFO fairness when enabled: with limit=1 and manual period rollovers, + * threads that arrive earlier should acquire earlier across periods. + * + * Strategy: + * - Build semaphore with fair=true and limit=1. + * - Start T1; it should acquire immediately (first window). + * - Start T2 and T3; both will block until we call endOfPeriod() twice. + * - Check the order is T1, then T2, then T3 (arrival order). + */ + @Test + void testFairnessFIFOOrdering_acrossPeriods() throws Exception { + final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class); + final ScheduledFuture future = EasyMock.createMock(ScheduledFuture.class); + prepareStartTimer(service, future); + EasyMock.replay(service, future); + + final TimedSemaphore semaphore = TimedSemaphore.builder() + .setService(service) + .setPeriod(PERIOD_MILLIS) + .setTimeUnit(UNIT) + .setLimit(1) + .setFair(true) + .get(); + + final java.util.concurrent.ArrayBlockingQueue order = new java.util.concurrent.ArrayBlockingQueue<>(3); + + final FairOrderThread t1 = new FairOrderThread(semaphore, "T1", order); + t1.start(); + final String first = order.poll(3, TimeUnit.SECONDS); + assertEquals("T1", first, "First acquirer should be T1"); + + final FairOrderThread t2 = new FairOrderThread(semaphore, "T2", order); + final FairOrderThread t3 = new FairOrderThread(semaphore, "T3", order); + t2.start(); + ThreadUtils.sleepQuietly(Duration.ofMillis(10)); + t3.start(); + + semaphore.endOfPeriod(); + final String second = order.poll(3, TimeUnit.SECONDS); + assertEquals("T2", second, "Second acquirer should be T2 under FIFO fairness"); + + semaphore.endOfPeriod(); + final String third = order.poll(3, TimeUnit.SECONDS); + assertEquals("T3", third, "Third acquirer should be T3 under FIFO fairness"); + + t1.join(); + t2.join(); + t3.join(); + EasyMock.verify(service, future); + } + + /** + * Verifies that changing the limit mid-period resizes the active bucket immediately: + * - Start with limit=3; acquire two permits. + * - Reduce limit to 1 in the same period. + * - Further tryAcquire() must fail until period ends (since acquiredCount >= new limit). + * - After endOfPeriod(), only 1 permit should be available per period. + */ + @Test + void testSetLimitResizesBucketWithinPeriod() throws InterruptedException { + final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class); + final ScheduledFuture future = EasyMock.createMock(ScheduledFuture.class); + prepareStartTimer(service, future); + EasyMock.replay(service, future); + + final TimedSemaphore semaphore = TimedSemaphore.builder() + .setService(service) + .setPeriod(PERIOD_MILLIS) + .setTimeUnit(UNIT) + .setLimit(3) + .get(); + + assertTrue(semaphore.tryAcquire(), "1st acquire should succeed (limit=3)"); + assertTrue(semaphore.tryAcquire(), "2nd acquire should succeed (limit=3)"); + + semaphore.setLimit(1); + + assertFalse(semaphore.tryAcquire(), "Further acquires should fail after limit is reduced within the same period"); + semaphore.endOfPeriod(); + assertTrue(semaphore.tryAcquire(), "Exactly 1 acquire should succeed in the next period with limit=1"); + assertFalse(semaphore.tryAcquire(), "Second acquire in the same period must fail with limit=1"); + + EasyMock.verify(service, future); + } + + /** + * Ensures endOfPeriod() actually tops up permits and releases waiters under the new semaphore-backed design. + * - limit=1 + * - T1 acquires immediately; T2 blocks. + * - After endOfPeriod(), T2 should proceed promptly. + */ + @Test + void testEndOfPeriodTopUpReleasesBlockedThreads() throws Exception { + final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class); + final ScheduledFuture future = EasyMock.createMock(ScheduledFuture.class); + prepareStartTimer(service, future); + EasyMock.replay(service, future); + + final TimedSemaphore semaphore = TimedSemaphore.builder() + .setService(service) + .setPeriod(PERIOD_MILLIS) + .setTimeUnit(UNIT) + .setLimit(1) + .setFair(true) + .get(); + + final java.util.concurrent.ArrayBlockingQueue order = new java.util.concurrent.ArrayBlockingQueue<>(2); + final FairOrderThread t1 = new FairOrderThread(semaphore, "T1", order); + final FairOrderThread t2 = new FairOrderThread(semaphore, "T2", order); + + t1.start(); + assertEquals("T1", order.poll(3, TimeUnit.SECONDS), "T1 should acquire first"); + + t2.start(); + semaphore.endOfPeriod(); + assertEquals("T2", order.poll(3, TimeUnit.SECONDS), "T2 should acquire after rollover"); + + t1.join(); + t2.join(); + EasyMock.verify(service, future); + } + + /** + * Confirms NO_LIMIT short-circuits semaphore usage (no blocking even with fairness enabled). + * Mirrors testAcquireNoLimit but uses the builder and setFair(true) to ensure the new path + * coexists with fairness without interacting with the Semaphore. + */ + @Test + void testAcquireNoLimitWithFairnessEnabled() throws InterruptedException { + final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class); + final ScheduledFuture future = EasyMock.createMock(ScheduledFuture.class); + prepareStartTimer(service, future); + EasyMock.replay(service, future); + + final TimedSemaphore semaphore = TimedSemaphore.builder() + .setService(service) + .setPeriod(PERIOD_MILLIS) + .setTimeUnit(UNIT) + .setLimit(TimedSemaphore.NO_LIMIT) + .setFair(true) + .get(); + + final int count = 500; + final CountDownLatch latch = new CountDownLatch(count); + final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count); + t.start(); + assertTrue(latch.await(5, TimeUnit.SECONDS), "All acquires should complete without blocking under NO_LIMIT"); + EasyMock.verify(service, future); + } + + /** + * Ensures getAvailablePermits remains consistent after a dynamic limit increase within the same period: + * - Start with limit=1, acquire once. + * - Increase limit to 3; now 2 more should be available in this period. + * - Verify tryAcquire() succeeds exactly two more times. + */ + @Test + void testGetAvailablePermitsAfterLimitIncreaseWithinPeriod() throws InterruptedException { + final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class); + final ScheduledFuture future = EasyMock.createMock(ScheduledFuture.class); + prepareStartTimer(service, future); + EasyMock.replay(service, future); + + final TimedSemaphore semaphore = TimedSemaphore.builder() + .setService(service) + .setPeriod(PERIOD_MILLIS) + .setTimeUnit(UNIT) + .setLimit(1) + .get(); + + assertEquals(1, semaphore.getAvailablePermits(), "Initially, 1 permit available"); + assertTrue(semaphore.tryAcquire(), "First acquire should succeed"); + assertEquals(0, semaphore.getAvailablePermits(), "After 1 acquire with limit=1, none left"); + + semaphore.setLimit(3); + assertEquals(2, semaphore.getAvailablePermits(), "After increasing limit to 3, two more should be available this period"); + + assertTrue(semaphore.tryAcquire(), "Second acquire should now succeed"); + assertTrue(semaphore.tryAcquire(), "Third acquire should now succeed"); + assertFalse(semaphore.tryAcquire(), "Fourth acquire should fail within same period (limit=3)"); + + EasyMock.verify(service, future); + } + + + } From c9dbf1baa5e702163d90a953723c23b969f190db Mon Sep 17 00:00:00 2001 From: Talha Sohail Date: Sun, 26 Oct 2025 22:46:20 +1100 Subject: [PATCH 3/3] TimedSemaphore adjustments --- .../org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java b/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java index f78eca6e27a..c62d186a195 100644 --- a/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java +++ b/src/test/java/org/apache/commons/lang3/concurrent/TimedSemaphoreTest.java @@ -202,7 +202,7 @@ public void run() { * @param future the future */ private void prepareStartTimer(final ScheduledExecutorService service, - final ScheduledFuture future) { + final ScheduledFuture future) { service.scheduleAtFixedRate((Runnable) EasyMock.anyObject(), EasyMock.eq(PERIOD_MILLIS), EasyMock.eq(PERIOD_MILLIS), EasyMock.eq(UNIT)); EasyMock.expectLastCall().andReturn(future); }