Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 93 additions & 31 deletions src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

Expand Down Expand Up @@ -123,6 +124,7 @@ public static class Builder implements Supplier<TimedSemaphore> {
private long period;
private TimeUnit timeUnit;
private int limit;
private boolean fair;

/**
* Constructs a new Builder.
Expand Down Expand Up @@ -179,6 +181,17 @@ public Builder setTimeUnit(final TimeUnit timeUnit) {
this.timeUnit = timeUnit;
return this;
}

/**
* Sets the fairness mode.
*
* @param fair whether to enable fairness.
* @return {@code this} instance.
*/
public Builder setFair(final boolean fair) {
this.fair = fair;
return this;
}
}

/**
Expand Down Expand Up @@ -235,10 +248,17 @@ public static Builder builder() {
/** A flag whether shutdown() was called. */
private boolean shutdown; // @GuardedBy("this")

/** Fairness mode checker */
private final boolean fair;

/** Backing semaphore that enforces blocking and (optionally) FIFO fairness. */
private final Semaphore semaphore;

private TimedSemaphore(final Builder builder) {
Validate.inclusiveBetween(1, Long.MAX_VALUE, builder.period, "Time period must be greater than 0.");
period = builder.period;
unit = builder.timeUnit;
this.fair = builder.fair;
if (builder.service != null) {
executorService = builder.service;
ownExecutor = false;
Expand All @@ -249,7 +269,8 @@ private TimedSemaphore(final Builder builder) {
executorService = stpe;
ownExecutor = true;
}
setLimit(builder.limit);
updateLimit(builder.limit);
semaphore = new Semaphore(this.limit, fair);
}

/**
Expand Down Expand Up @@ -290,29 +311,33 @@ public TimedSemaphore(final ScheduledExecutorService service, final long timePer
* @throws InterruptedException if the thread gets interrupted.
* @throws IllegalStateException if this semaphore is already shut down.
*/
public synchronized void acquire() throws InterruptedException {
public void acquire() throws InterruptedException {
prepareAcquire();
boolean canPass;
do {
canPass = acquirePermit();
if (!canPass) {
wait();
if (getLimit() <= NO_LIMIT) {
synchronized (this) {
acquireCount++;
}
} while (!canPass);
return;
}
if (fairTryAcquire()) {
synchronized (this) {
acquireCount++;
}
return;
}
for (;;) {
synchronized (this) {
if (fairTryAcquire()) {
acquireCount++;
return;
}
this.wait();
}
}
}

/**
* Internal helper method for acquiring a permit. This method checks whether currently a permit can be acquired and - if so - increases the internal
* counter. The return value indicates whether a permit could be acquired. This method must be called with the lock of this object held.
*
* @return a flag whether a permit could be acquired.
*/
private boolean acquirePermit() {
if (getLimit() <= NO_LIMIT || acquireCount < getLimit()) {
acquireCount++;
return true;
}
return false;
private boolean fairTryAcquire() throws InterruptedException {
return fair ? semaphore.tryAcquire(0, TimeUnit.SECONDS) : semaphore.tryAcquire();
}

/**
Expand All @@ -324,7 +349,12 @@ synchronized void endOfPeriod() {
totalAcquireCount += acquireCount;
periodCount++;
acquireCount = 0;
notifyAll();
final int avail = semaphore.availablePermits();
final int toRelease = getLimit() - avail;
if (toRelease > 0) {
semaphore.release(toRelease);
}
this.notifyAll();
}

/**
Expand Down Expand Up @@ -420,14 +450,20 @@ public synchronized boolean isShutdown() {
* object held.
*/
private void prepareAcquire() {
if (isShutdown()) {
throw new IllegalStateException("TimedSemaphore is shut down!");
}
if (task == null) {
task = startTimer();
synchronized (this) {
if (isShutdown()) {
throw new IllegalStateException("TimedSemaphore is shut down!");
}
if (task == null) {
task = startTimer();
}
}
}

private void updateLimit(int value) {
limit = Math.max(0, value);
}

/**
* Sets the limit. This is the number of times the {@link #acquire()} method can be called within the time period specified. If this limit is reached,
* further invocations of {@link #acquire()} will block. Setting the limit to a value &lt;= {@link #NO_LIMIT} will cause the limit to be disabled, i.e. an
Expand All @@ -436,7 +472,15 @@ private void prepareAcquire() {
* @param limit the limit.
*/
public final synchronized void setLimit(final int limit) {
this.limit = limit;
updateLimit(limit);

final int used = Math.max(0, acquireCount);
final int toSeed = Math.max(0, limit - used);

semaphore.drainPermits();
if (toSeed > 0) {
semaphore.release(toSeed);
}
}

/**
Expand Down Expand Up @@ -469,14 +513,32 @@ protected ScheduledFuture<?> startTimer() {

/**
* Tries to acquire a permit from this semaphore. If the limit of this semaphore has not yet been reached, a permit is acquired, and this method returns
* <strong>true</strong>. Otherwise, this method returns immediately with the result <strong>false</strong>.
* <strong>true</strong>. Otherwise, this method returns immediately with the result <strong>false</strong>. Even when this semaphore has been set to use
* a fair ordering policy, a call to tryAcquire() will immediately acquire a permit if one is available, whether or not other threads are currently waiting.
* This "barging" behavior can be useful in certain circumstances, even though it breaks fairness. If you want to honor the fairness setting, then use
* tryAcquire(0, TimeUnit.SECONDS) which is almost equivalent (it also detects interruption).
*
* @return <strong>true</strong> if a permit could be acquired; <strong>false</strong> otherwise.
* @return <strong>true</strong> if a permit could be acquired;
* <strong>false</strong> otherwise.
* @throws IllegalStateException if this semaphore is already shut down.
* @throws InterruptedException if the current thread is interrupted while waiting.
* @since 3.5
*/
public synchronized boolean tryAcquire() {
public boolean tryAcquire() throws InterruptedException {
prepareAcquire();
return acquirePermit();
if (getLimit() <= NO_LIMIT) {
synchronized (this) {
acquireCount++;
}
return true;
}

final boolean ok = fairTryAcquire();
if (ok) {
synchronized (this) {
acquireCount++;
}
}
return ok;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void run() {
* @param future the future
*/
private void prepareStartTimer(final ScheduledExecutorService service,
final ScheduledFuture<?> future) {
final ScheduledFuture<?> future) {
service.scheduleAtFixedRate((Runnable) EasyMock.anyObject(), EasyMock.eq(PERIOD_MILLIS), EasyMock.eq(PERIOD_MILLIS), EasyMock.eq(UNIT));
EasyMock.expectLastCall().andReturn(future);
}
Expand Down Expand Up @@ -527,4 +527,167 @@ void testTryAcquireAfterShutdown() {
semaphore.shutdown();
assertThrows(IllegalStateException.class, semaphore::tryAcquire);
}

/**
* A thread that records acquisition order by adding its id to a shared queue once acquire() returns.
*/
private static final class FairOrderThread extends Thread {

private final TimedSemaphore semaphore;
private final String id;
private final java.util.concurrent.BlockingQueue<String> order;

FairOrderThread(final TimedSemaphore semaphore, final String id, final java.util.concurrent.BlockingQueue<String> order) {
this.semaphore = semaphore;
this.id = id;
this.order = order;
setName("FairOrderThread-" + id);
}

@Override
public void run() {
try {
semaphore.acquire();
order.add(id);
} catch (final InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}

/**
* Verifies that changing the limit mid-period resizes the active bucket immediately:
* - Start with limit=3; acquire two permits.
* - Reduce limit to 1 in the same period.
* - Further tryAcquire() must fail until period ends (since acquiredCount >= new limit).
* - After endOfPeriod(), only 1 permit should be available per period.
*/
@Test
void testSetLimitResizesBucketWithinPeriod() throws InterruptedException {
final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
prepareStartTimer(service, future);
EasyMock.replay(service, future);

final TimedSemaphore semaphore = TimedSemaphore.builder()
.setService(service)
.setPeriod(PERIOD_MILLIS)
.setTimeUnit(UNIT)
.setLimit(3)
.get();

assertTrue(semaphore.tryAcquire(), "1st acquire should succeed (limit=3)");
assertTrue(semaphore.tryAcquire(), "2nd acquire should succeed (limit=3)");

semaphore.setLimit(1);

assertFalse(semaphore.tryAcquire(), "Further acquires should fail after limit is reduced within the same period");
semaphore.endOfPeriod();
assertTrue(semaphore.tryAcquire(), "Exactly 1 acquire should succeed in the next period with limit=1");
assertFalse(semaphore.tryAcquire(), "Second acquire in the same period must fail with limit=1");

EasyMock.verify(service, future);
}

/**
* Ensures endOfPeriod() actually tops up permits and releases waiters under the new semaphore-backed design.
* - limit=1
* - T1 acquires immediately; T2 blocks.
* - After endOfPeriod(), T2 should proceed promptly.
*/
@Test
void testEndOfPeriodTopUpReleasesBlockedThreads() throws Exception {
final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
prepareStartTimer(service, future);
EasyMock.replay(service, future);

final TimedSemaphore semaphore = TimedSemaphore.builder()
.setService(service)
.setPeriod(PERIOD_MILLIS)
.setTimeUnit(UNIT)
.setLimit(1)
.setFair(true)
.get();

final java.util.concurrent.ArrayBlockingQueue<String> order = new java.util.concurrent.ArrayBlockingQueue<>(2);
final FairOrderThread t1 = new FairOrderThread(semaphore, "T1", order);
final FairOrderThread t2 = new FairOrderThread(semaphore, "T2", order);

t1.start();
assertEquals("T1", order.poll(3, TimeUnit.SECONDS), "T1 should acquire first");

t2.start();
semaphore.endOfPeriod();
assertEquals("T2", order.poll(3, TimeUnit.SECONDS), "T2 should acquire after rollover");

t1.join();
t2.join();
EasyMock.verify(service, future);
}

/**
* Confirms NO_LIMIT short-circuits semaphore usage (no blocking even with fairness enabled).
* Mirrors testAcquireNoLimit but uses the builder and setFair(true) to ensure the new path
* coexists with fairness without interacting with the Semaphore.
*/
@Test
void testAcquireNoLimitWithFairnessEnabled() throws InterruptedException {
final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
prepareStartTimer(service, future);
EasyMock.replay(service, future);

final TimedSemaphore semaphore = TimedSemaphore.builder()
.setService(service)
.setPeriod(PERIOD_MILLIS)
.setTimeUnit(UNIT)
.setLimit(TimedSemaphore.NO_LIMIT)
.setFair(true)
.get();

final int count = 500;
final CountDownLatch latch = new CountDownLatch(count);
final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
t.start();
assertTrue(latch.await(5, TimeUnit.SECONDS), "All acquires should complete without blocking under NO_LIMIT");
EasyMock.verify(service, future);
}

/**
* Ensures getAvailablePermits remains consistent after a dynamic limit increase within the same period:
* - Start with limit=1, acquire once.
* - Increase limit to 3; now 2 more should be available in this period.
* - Verify tryAcquire() succeeds exactly two more times.
*/
@Test
void testGetAvailablePermitsAfterLimitIncreaseWithinPeriod() throws InterruptedException {
final ScheduledExecutorService service = EasyMock.createMock(ScheduledExecutorService.class);
final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
prepareStartTimer(service, future);
EasyMock.replay(service, future);

final TimedSemaphore semaphore = TimedSemaphore.builder()
.setService(service)
.setPeriod(PERIOD_MILLIS)
.setTimeUnit(UNIT)
.setLimit(1)
.get();

assertEquals(1, semaphore.getAvailablePermits(), "Initially, 1 permit available");
assertTrue(semaphore.tryAcquire(), "First acquire should succeed");
assertEquals(0, semaphore.getAvailablePermits(), "After 1 acquire with limit=1, none left");

semaphore.setLimit(3);
assertEquals(2, semaphore.getAvailablePermits(), "After increasing limit to 3, two more should be available this period");

assertTrue(semaphore.tryAcquire(), "Second acquire should now succeed");
assertTrue(semaphore.tryAcquire(), "Third acquire should now succeed");
assertFalse(semaphore.tryAcquire(), "Fourth acquire should fail within same period (limit=3)");

EasyMock.verify(service, future);
}



}