|
11 | 11 | import reactor.core.publisher.Flux; |
12 | 12 | import reactor.core.publisher.Mono; |
13 | 13 | import reactor.core.publisher.Sinks; |
| 14 | +import reactor.core.scheduler.Schedulers; |
14 | 15 |
|
15 | 16 | @Slf4j |
16 | 17 | public class ChatWebSocketHandler implements WebSocketHandler { |
@@ -40,15 +41,19 @@ public Mono<Void> handle(WebSocketSession webSocketSession) { |
40 | 41 | Mono<Void> inputMessage = webSocketSession.receive() |
41 | 42 | .flatMap(webSocketMessage -> redisChatMessagePublisher.publishChatMessage(webSocketMessage.getPayloadAsText())) |
42 | 43 | .doOnSubscribe(subscription -> { |
43 | | - long activeUserCount = activeUserCounter.incrementAndGet(); |
44 | | - log.info("User '{}' Connected. Total Active Users: {}", webSocketSession.getId(), activeUserCount); |
45 | | - chatMessageSink.tryEmitNext(new ChatMessage(0, "CONNECTED", "CONNECTED", activeUserCount)); |
| 44 | + Mono.fromRunnable(() -> { |
| 45 | + long activeUserCount = activeUserCounter.incrementAndGet(); |
| 46 | + log.info("User '{}' Connected. Total Active Users: {}", webSocketSession.getId(), activeUserCount); |
| 47 | + chatMessageSink.tryEmitNext(new ChatMessage(0, "CONNECTED", "CONNECTED", activeUserCount)); |
| 48 | + }).subscribeOn(Schedulers.boundedElastic()).subscribe(); |
46 | 49 | }) |
47 | 50 | .doOnError(throwable -> log.error("Error Occurred while sending message to Redis.", throwable)) |
48 | 51 | .doFinally(signalType -> { |
49 | | - long activeUserCount = activeUserCounter.decrementAndGet(); |
50 | | - log.info("User '{}' Disconnected. Total Active Users: {}", webSocketSession.getId(), activeUserCount); |
51 | | - chatMessageSink.tryEmitNext(new ChatMessage(0, "DISCONNECTED", "DISCONNECTED", activeUserCount)); |
| 52 | + Mono.fromRunnable(() -> { |
| 53 | + long activeUserCount = activeUserCounter.decrementAndGet(); |
| 54 | + log.info("User '{}' Disconnected. Total Active Users: {}", webSocketSession.getId(), activeUserCount); |
| 55 | + chatMessageSink.tryEmitNext(new ChatMessage(0, "DISCONNECTED", "DISCONNECTED", activeUserCount)); |
| 56 | + }).subscribeOn(Schedulers.boundedElastic()).subscribe(); |
52 | 57 | }) |
53 | 58 | .then(); |
54 | 59 |
|
|
0 commit comments