Skip to content

Commit 6371f51

Browse files
committed
누출 버킷 알고리즘 구현해보기
1 parent 59d756c commit 6371f51

File tree

2 files changed

+82
-0
lines changed

2 files changed

+82
-0
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package io.github.gunkim.ratelimiter.bucket;
2+
3+
public interface Bucket {
4+
void request(Runnable request);
5+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package io.github.gunkim.ratelimiter.bucket;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
import java.util.Queue;
7+
import java.util.concurrent.ConcurrentLinkedQueue;
8+
import java.util.concurrent.Executors;
9+
import java.util.concurrent.ScheduledExecutorService;
10+
import java.util.concurrent.TimeUnit;
11+
12+
/**
13+
* ## 처리 제한 알고리즘, 누출 버킷
14+
* - 요청이 들어오면 Queue에 요청을 적재 (보통 Queue 사이즈는 버킷 사이즈)
15+
* - Queue가 가득 찼다면 요청은 Drop (대기시키도록 구현도 고려해볼 순 있음
16+
* - 일정 시간마다 일정량을 처리
17+
* <p>
18+
* 고정된 처리율로 안정적인 트래픽 처리엔 좋지만, 트래픽이 몰리게 되면 요청이 버려질 수 있음.
19+
*/
20+
public class LeakyBucket implements AutoCloseable, Bucket {
21+
private static final Logger logger = LoggerFactory.getLogger(LeakyBucket.class);
22+
23+
private final Queue<Runnable> queue;
24+
private final int bucketSize;
25+
private final int batchSize;
26+
27+
private final ScheduledExecutorService executorService;
28+
29+
public LeakyBucket(int bucketSize, long scheduleInterval, int batchSize) {
30+
this.bucketSize = bucketSize;
31+
this.batchSize = batchSize;
32+
33+
this.queue = new ConcurrentLinkedQueue<>();
34+
35+
this.executorService = Executors.newSingleThreadScheduledExecutor();
36+
this.executorService.scheduleAtFixedRate(this::processRequest, scheduleInterval, scheduleInterval, TimeUnit.MILLISECONDS);
37+
}
38+
39+
@Override
40+
public void request(Runnable request) {
41+
if (isFull()) {
42+
logger.trace("bucket is full!");
43+
return;
44+
}
45+
queue.add(request);
46+
}
47+
48+
private boolean isFull() {
49+
return queue.size() == bucketSize - 1;
50+
}
51+
52+
private void processRequest() {
53+
int queueSize = queue.size();
54+
55+
logger.debug("전체 {}개 중 {}개 처리", queueSize, this.batchSize);
56+
for (int i = 0; i < batchSize; i++) {
57+
queue.poll().run();
58+
}
59+
}
60+
61+
public void shutdown() {
62+
executorService.shutdown();
63+
try {
64+
if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) {
65+
executorService.shutdownNow();
66+
}
67+
} catch (InterruptedException ex) {
68+
executorService.shutdownNow();
69+
Thread.currentThread().interrupt();
70+
}
71+
}
72+
73+
@Override
74+
public void close() {
75+
shutdown();
76+
}
77+
}

0 commit comments

Comments
 (0)