Skip to content

Commit 31929f7

Browse files
committed
FEAT: Add Operation throughput and latency metrics by mbean.
1 parent 132254c commit 31929f7

File tree

8 files changed

+317
-0
lines changed

8 files changed

+317
-0
lines changed

src/main/java/net/spy/memcached/MemcachedConnection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import net.spy.memcached.compat.SpyObject;
5151
import net.spy.memcached.compat.log.LoggerFactory;
5252
import net.spy.memcached.internal.ReconnDelay;
53+
import net.spy.memcached.metrics.OpLatencyMonitor;
54+
import net.spy.memcached.metrics.OpThroughputMonitor;
5355
import net.spy.memcached.ops.KeyedOperation;
5456
import net.spy.memcached.ops.MultiOperationCallback;
5557
import net.spy.memcached.ops.Operation;
@@ -990,6 +992,7 @@ private void handleReads(MemcachedNode qa)
990992
throw new IllegalStateException("No read operation.");
991993
}
992994
currentOp.readFromBuffer(rbuf);
995+
OpLatencyMonitor.getInstance().recordLatency(currentOp.getStartTime());
993996
if (currentOp.getState() == OperationState.COMPLETE) {
994997
getLogger().debug("Completed read op: %s and giving the next %d bytes",
995998
currentOp, rbuf.remaining());
@@ -1519,6 +1522,7 @@ public String toString() {
15191522
* @param op
15201523
*/
15211524
public static void opTimedOut(Operation op) {
1525+
OpThroughputMonitor.getInstance().addTimeoutOps();
15221526
MemcachedConnection.setTimeout(op, true);
15231527
}
15241528

@@ -1528,6 +1532,7 @@ public static void opTimedOut(Operation op) {
15281532
* @param ops
15291533
*/
15301534
public static void opsTimedOut(Collection<Operation> ops) {
1535+
OpThroughputMonitor.getInstance().addTimeoutOps(ops.size());
15311536
Collection<String> timedOutNodes = new HashSet<>();
15321537
for (Operation op : ops) {
15331538
try {
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package net.spy.memcached.metrics;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.concurrent.TimeUnit;
6+
import java.util.concurrent.atomic.AtomicInteger;
7+
import java.util.concurrent.atomic.AtomicReference;
8+
import java.util.concurrent.atomic.AtomicReferenceArray;
9+
10+
import net.spy.memcached.ArcusMBeanServer;
11+
12+
public final class OpLatencyMonitor implements OpLatencyMonitorMBean {
13+
private AtomicReferenceArray<Long> latencies;
14+
private AtomicInteger currentIndex;
15+
private AtomicInteger count;
16+
private boolean enabled = true;
17+
18+
// 캐시된 메트릭을 원자적으로 관리
19+
private final AtomicReference<MetricsSnapshot> cachedMetrics =
20+
new AtomicReference<>(new MetricsSnapshot(0, 0, 0, 0, 0, 0));
21+
22+
private static final OpLatencyMonitor INSTANCE = new OpLatencyMonitor();
23+
private static final long CACHE_DURATION = 2000; // 2초 캐시
24+
private static final int WINDOW_SIZE = 10_000;
25+
private static final MetricsSnapshot EMPTY_METRICS = new MetricsSnapshot(0, 0, 0, 0, 0, 0);
26+
27+
private OpLatencyMonitor() {
28+
if (System.getProperty("arcus.mbean", "false").toLowerCase().equals("false")) {
29+
enabled = false;
30+
return;
31+
}
32+
this.latencies = new AtomicReferenceArray<>(WINDOW_SIZE);
33+
this.currentIndex = new AtomicInteger(0);
34+
this.count = new AtomicInteger(0);
35+
36+
for (int i = 0; i < WINDOW_SIZE; i++) {
37+
latencies.set(i, 0L);
38+
}
39+
40+
try {
41+
ArcusMBeanServer mbs = ArcusMBeanServer.getInstance();
42+
mbs.registMBean(this, this.getClass().getPackage().getName()
43+
+ ":type=" + this.getClass().getSimpleName());
44+
} catch (Exception e) {
45+
throw new RuntimeException("Failed to register MBean", e);
46+
}
47+
}
48+
49+
public static OpLatencyMonitor getInstance() {
50+
return INSTANCE;
51+
}
52+
53+
public void recordLatency(long startNanos) {
54+
if (!enabled) {
55+
return;
56+
}
57+
long latencyMicros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanos);
58+
int index = currentIndex.getAndUpdate(i -> (i + 1) % WINDOW_SIZE);
59+
latencies.lazySet(index, latencyMicros);
60+
61+
if (count.get() < WINDOW_SIZE) {
62+
count.incrementAndGet();
63+
}
64+
}
65+
66+
// 모든 메트릭을 한 번에 계산하고 캐시하는 메서드
67+
private MetricsSnapshot computeMetrics() {
68+
int currentCount = count.get();
69+
if (currentCount == 0) {
70+
return EMPTY_METRICS;
71+
}
72+
73+
// 현재 데이터를 배열로 복사
74+
List<Long> sortedLatencies = new ArrayList<>(currentCount);
75+
int startIndex = currentIndex.get();
76+
77+
for (int i = 0; i < currentCount; i++) {
78+
int idx = (startIndex - i + WINDOW_SIZE) % WINDOW_SIZE;
79+
long value = latencies.get(idx);
80+
if (value != 0) {
81+
sortedLatencies.add(value);
82+
}
83+
}
84+
85+
if (sortedLatencies.isEmpty()) {
86+
return EMPTY_METRICS;
87+
}
88+
89+
sortedLatencies.sort(Long::compareTo);
90+
91+
// 모든 메트릭을 한 번에 계산
92+
long avg = sortedLatencies.stream().mapToLong(Long::longValue).sum() / currentCount;
93+
long min = sortedLatencies.get(0);
94+
long max = sortedLatencies.get(currentCount - 1);
95+
long p25 = sortedLatencies.get((int) Math.ceil((currentCount * 25.0) / 100.0) - 1);
96+
long p50 = sortedLatencies.get((int) Math.ceil((currentCount * 50.0) / 100.0) - 1);
97+
long p75 = sortedLatencies.get((int) Math.ceil((currentCount * 75.0) / 100.0) - 1);
98+
99+
return new MetricsSnapshot(avg, min, max, p25, p50, p75);
100+
}
101+
102+
// 캐시된 메트릭을 가져오거나 필요시 새로 계산
103+
private MetricsSnapshot getMetricsSnapshot() {
104+
MetricsSnapshot current = cachedMetrics.get();
105+
long now = System.currentTimeMillis();
106+
107+
// 캐시가 유효한지 확인
108+
if (now - current.timestamp < CACHE_DURATION) {
109+
return current;
110+
}
111+
112+
// 새로운 메트릭 계산 및 캐시 업데이트
113+
MetricsSnapshot newMetrics = computeMetrics();
114+
cachedMetrics.set(newMetrics);
115+
return newMetrics;
116+
}
117+
118+
@Override
119+
public long getAverageLatencyMicros() {
120+
return getMetricsSnapshot().avgLatency;
121+
}
122+
123+
@Override
124+
public long getMinLatencyMicros() {
125+
return getMetricsSnapshot().minLatency;
126+
}
127+
128+
@Override
129+
public long getMaxLatencyMicros() {
130+
return getMetricsSnapshot().maxLatency;
131+
}
132+
133+
@Override
134+
public long get25thPercentileLatencyMicros() {
135+
return getMetricsSnapshot().p25Latency;
136+
}
137+
138+
@Override
139+
public long get50thPercentileLatencyMicros() {
140+
return getMetricsSnapshot().p50Latency;
141+
}
142+
143+
@Override
144+
public long get75thPercentileLatencyMicros() {
145+
return getMetricsSnapshot().p75Latency;
146+
}
147+
148+
@Override
149+
public void resetStatistics() {
150+
count.set(0);
151+
currentIndex.set(0);
152+
cachedMetrics.set(EMPTY_METRICS);
153+
}
154+
155+
// 캐시된 메트릭을 저장할 불변 클래스
156+
private final static class MetricsSnapshot {
157+
private final long avgLatency;
158+
private final long minLatency;
159+
private final long maxLatency;
160+
private final long p25Latency;
161+
private final long p50Latency;
162+
private final long p75Latency;
163+
private final long timestamp; // 캐시 생성 시간
164+
165+
private MetricsSnapshot(long avg, long min, long max, long p25, long p50, long p75) {
166+
this.avgLatency = avg;
167+
this.minLatency = min;
168+
this.maxLatency = max;
169+
this.p25Latency = p25;
170+
this.p50Latency = p50;
171+
this.p75Latency = p75;
172+
this.timestamp = System.currentTimeMillis();
173+
}
174+
}
175+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package net.spy.memcached.metrics;
2+
3+
public interface OpLatencyMonitorMBean {
4+
long getAverageLatencyMicros();
5+
6+
long getMaxLatencyMicros();
7+
8+
long getMinLatencyMicros();
9+
10+
long get25thPercentileLatencyMicros();
11+
12+
long get50thPercentileLatencyMicros();
13+
14+
long get75thPercentileLatencyMicros();
15+
16+
void resetStatistics();
17+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package net.spy.memcached.metrics;
2+
3+
import java.util.concurrent.atomic.AtomicLong;
4+
import java.util.concurrent.atomic.LongAdder;
5+
6+
import net.spy.memcached.ArcusMBeanServer;
7+
8+
public final class OpThroughputMonitor implements OpThroughputMonitorMBean {
9+
private LongAdder completeOps;
10+
private LongAdder cancelOps;
11+
private LongAdder timeOutOps;
12+
private AtomicLong lastResetTime;
13+
private boolean enabled = true;
14+
15+
private static final OpThroughputMonitor INSTANCE = new OpThroughputMonitor();
16+
17+
private OpThroughputMonitor() {
18+
if (System.getProperty("arcus.mbean", "false").toLowerCase().equals("false")) {
19+
enabled = false;
20+
return;
21+
}
22+
23+
this.completeOps = new LongAdder();
24+
this.cancelOps = new LongAdder();
25+
this.timeOutOps = new LongAdder();
26+
this.lastResetTime = new AtomicLong(System.currentTimeMillis());
27+
28+
try {
29+
ArcusMBeanServer mbs = ArcusMBeanServer.getInstance();
30+
mbs.registMBean(this, this.getClass().getPackage().getName()
31+
+ ":type=" + this.getClass().getSimpleName());
32+
} catch (Exception e) {
33+
throw new RuntimeException("Failed to register Throughput MBean", e);
34+
}
35+
}
36+
37+
public static OpThroughputMonitor getInstance() {
38+
return INSTANCE;
39+
}
40+
41+
public void addCompleteOps() {
42+
if (!enabled) {
43+
return;
44+
}
45+
completeOps.increment();
46+
}
47+
48+
public void addCancelOps() {
49+
if (!enabled) {
50+
return;
51+
}
52+
cancelOps.increment();
53+
}
54+
55+
public void addTimeoutOps() {
56+
if (!enabled) {
57+
return;
58+
}
59+
timeOutOps.increment();
60+
}
61+
62+
public void addTimeoutOps(int count) {
63+
if (!enabled) {
64+
return;
65+
}
66+
timeOutOps.add(count);
67+
}
68+
69+
@Override
70+
public long getCompletedOps() {
71+
return getThroughput(completeOps);
72+
}
73+
74+
@Override
75+
public long getCanceledOps() {
76+
return getThroughput(cancelOps);
77+
}
78+
79+
@Override
80+
public long getTimeoutOps() {
81+
return getThroughput(timeOutOps);
82+
}
83+
84+
private long getThroughput(LongAdder ops) {
85+
long currentTime = System.currentTimeMillis();
86+
long lastTime = lastResetTime.get();
87+
long countValue = ops.sum();
88+
89+
// 경과 시간 계산 (초 단위)
90+
long elapsedSeconds = (long) ((currentTime - lastTime) / 1000.0);
91+
92+
return countValue / elapsedSeconds;
93+
}
94+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package net.spy.memcached.metrics;
2+
3+
public interface OpThroughputMonitorMBean {
4+
long getCompletedOps();
5+
6+
long getCanceledOps();
7+
8+
long getTimeoutOps();
9+
}

src/main/java/net/spy/memcached/ops/Operation.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,8 @@ public interface Operation {
134134
/* ENABLE_MIGRATION end */
135135

136136
APIType getAPIType();
137+
138+
void setStartTime(long startTime);
139+
140+
long getStartTime();
137141
}

src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import net.spy.memcached.MemcachedReplicaGroup;
2727
import net.spy.memcached.RedirectHandler;
2828
import net.spy.memcached.compat.SpyObject;
29+
import net.spy.memcached.metrics.OpThroughputMonitor;
2930
import net.spy.memcached.ops.APIType;
3031
import net.spy.memcached.ops.CancelledOperationStatus;
3132
import net.spy.memcached.ops.OperationCallback;
@@ -58,6 +59,15 @@ public abstract class BaseOperationImpl extends SpyObject {
5859
private OperationType opType = OperationType.UNDEFINED;
5960
private APIType apiType = APIType.UNDEFINED;
6061

62+
private long startTime;
63+
64+
public void setStartTime(long startTime) {
65+
this.startTime = startTime;
66+
}
67+
public long getStartTime() {
68+
return startTime;
69+
}
70+
6171
/* ENABLE_MIGRATION if */
6272
private RedirectHandler redirectHandler = null;
6373
/* ENABLE_MIGRATION end */
@@ -95,6 +105,7 @@ public final OperationException getException() {
95105
public final boolean cancel(String cause) {
96106
if (callbacked.compareAndSet(false, true)) {
97107
cancelled = true;
108+
OpThroughputMonitor.getInstance().addCancelOps();
98109
if (handlingNode != null) {
99110
cause += " @ " + handlingNode.getNodeName();
100111
}
@@ -222,6 +233,7 @@ protected final void transitionState(OperationState newState) {
222233
}
223234
if (state == OperationState.COMPLETE &&
224235
callbacked.compareAndSet(false, true)) {
236+
OpThroughputMonitor.getInstance().addCompleteOps();
225237
callback.complete();
226238
}
227239
}

src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ private Operation getNextWritableOp() {
316316
Operation cancelledOp = removeCurrentWriteOp();
317317
assert o == cancelledOp;
318318
} else {
319+
o.setStartTime(System.nanoTime());
319320
o.writing();
320321
readQ.add(o);
321322
return o;

0 commit comments

Comments
 (0)