Skip to content

Commit 2de36db

Browse files
committed
Merge branch 'router-events'
2 parents 1698c20 + fb5eaeb commit 2de36db

File tree

12 files changed

+681
-37
lines changed

12 files changed

+681
-37
lines changed

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/routing/v1/AbstractRoutingV1Operations.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.cloudfoundry.reactor.util.AbstractReactorOperations;
2222
import org.springframework.web.util.UriComponentsBuilder;
2323
import reactor.core.publisher.Mono;
24+
import reactor.ipc.netty.http.client.HttpClientResponse;
2425

2526
import java.util.function.Function;
2627

@@ -34,6 +35,10 @@ protected final <T> Mono<T> get(Class<T> responseType, Function<UriComponentsBui
3435
return doGet(responseType, uriTransformer, outbound -> outbound, inbound -> inbound);
3536
}
3637

38+
protected final Mono<HttpClientResponse> get(Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer) {
39+
return doGet(uriTransformer, outbound -> outbound, inbound -> inbound);
40+
}
41+
3742
protected final <T> Mono<T> post(Object request, Class<T> responseType, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer) {
3843
return doPost(request, responseType, uriTransformer, outbound -> outbound, inbound -> inbound);
3944
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
/*
2+
* Copyright 2013-2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.cloudfoundry.reactor.routing.v1.tcproutes;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.channel.ChannelHandlerContext;
21+
import io.netty.channel.ChannelInboundHandlerAdapter;
22+
import io.netty.handler.codec.http.DefaultHttpContent;
23+
24+
import java.nio.charset.Charset;
25+
26+
final class EventStreamDecoderChannelHandler extends ChannelInboundHandlerAdapter {
27+
28+
static final String DELIMITER = "DELIMITER";
29+
30+
private static final char[] COLON = new char[]{':', ' '};
31+
32+
private static final char[] CRLF = new char[]{'\r', '\n'};
33+
34+
private static final Charset UTF8 = Charset.forName("UTF-8");
35+
36+
private String characters;
37+
38+
private int colonPosition;
39+
40+
private int crlfPosition;
41+
42+
private ServerSentEvent.Builder event;
43+
44+
private int nameEndPosition;
45+
46+
private int nameStartPosition;
47+
48+
private int position;
49+
50+
private Stage stage;
51+
52+
private int valueEndPosition;
53+
54+
private int valueStartPosition;
55+
56+
EventStreamDecoderChannelHandler() {
57+
reset();
58+
}
59+
60+
@Override
61+
public void channelRead(ChannelHandlerContext context, Object message) throws Exception {
62+
if (!(message instanceof DefaultHttpContent)) {
63+
super.channelRead(context, message);
64+
return;
65+
}
66+
67+
ByteBuf byteBuf = ((DefaultHttpContent) message).content();
68+
this.characters = this.characters != null ? this.characters + byteBuf.toString(UTF8) : byteBuf.toString(UTF8);
69+
byteBuf.release();
70+
71+
while (this.position < this.characters.length()) {
72+
char c = this.characters.charAt(this.position);
73+
74+
switch (this.stage) {
75+
case COLON:
76+
colon(c);
77+
break;
78+
case COMMENT:
79+
comment(c);
80+
break;
81+
case CRLF:
82+
crlf(context, c);
83+
break;
84+
case NAME:
85+
name(c);
86+
break;
87+
case VALUE:
88+
value(c);
89+
break;
90+
}
91+
}
92+
93+
if (Stage.CRLF == this.stage) {
94+
crlf(context, '\0');
95+
}
96+
97+
if (Stage.NAME == this.stage) {
98+
reset();
99+
}
100+
}
101+
102+
private void colon(char c) {
103+
if (this.colonPosition < COLON.length) {
104+
if (COLON[this.colonPosition] == c) {
105+
this.colonPosition++;
106+
this.position++;
107+
} else {
108+
this.valueStartPosition = this.position;
109+
this.stage = Stage.VALUE;
110+
this.position++;
111+
}
112+
} else {
113+
this.valueStartPosition = this.position;
114+
this.stage = Stage.VALUE;
115+
this.position++;
116+
}
117+
}
118+
119+
private void comment(char c) {
120+
if (CRLF[0] == c) {
121+
this.nameStartPosition = this.position;
122+
this.nameEndPosition = this.position;
123+
this.valueStartPosition = this.position;
124+
this.valueEndPosition = this.position;
125+
this.stage = Stage.CRLF;
126+
this.crlfPosition = 1;
127+
this.position++;
128+
} else if (CRLF[1] == c) {
129+
this.nameStartPosition = this.position;
130+
this.nameEndPosition = this.position;
131+
this.valueStartPosition = this.position;
132+
this.valueEndPosition = this.position;
133+
this.stage = Stage.CRLF;
134+
this.crlfPosition = 2;
135+
this.position++;
136+
} else {
137+
this.position++;
138+
}
139+
}
140+
141+
private void crlf(ChannelHandlerContext context, char c) {
142+
if (this.crlfPosition < CRLF.length) {
143+
if (CRLF[this.crlfPosition] == c) {
144+
this.crlfPosition++;
145+
this.position++;
146+
} else {
147+
send(context);
148+
}
149+
} else {
150+
send(context);
151+
}
152+
}
153+
154+
private void name(char c) {
155+
if (this.nameStartPosition == this.position) {
156+
if (COLON[0] == c) {
157+
this.stage = Stage.COMMENT;
158+
this.position++;
159+
} else if (CRLF[0] == c) {
160+
this.nameEndPosition = this.position;
161+
this.valueStartPosition = this.position;
162+
this.valueEndPosition = this.position;
163+
this.stage = Stage.CRLF;
164+
this.crlfPosition = 1;
165+
this.position++;
166+
} else if (CRLF[1] == c) {
167+
this.nameEndPosition = this.position;
168+
this.valueStartPosition = this.position;
169+
this.valueEndPosition = this.position;
170+
this.stage = Stage.CRLF;
171+
this.crlfPosition = 2;
172+
this.position++;
173+
} else {
174+
this.position++;
175+
}
176+
} else if (COLON[0] == c) {
177+
this.nameEndPosition = this.position;
178+
this.stage = Stage.COLON;
179+
this.colonPosition = 1;
180+
this.position++;
181+
} else if (CRLF[0] == c) {
182+
this.nameEndPosition = this.position;
183+
this.valueStartPosition = this.position;
184+
this.valueEndPosition = this.position;
185+
this.stage = Stage.CRLF;
186+
this.crlfPosition = 1;
187+
this.position++;
188+
} else if (CRLF[1] == c) {
189+
this.nameEndPosition = this.position;
190+
this.valueStartPosition = this.position;
191+
this.valueEndPosition = this.position;
192+
this.stage = Stage.CRLF;
193+
this.crlfPosition = 2;
194+
this.position++;
195+
} else {
196+
this.position++;
197+
}
198+
}
199+
200+
private void reset() {
201+
this.characters = null;
202+
this.event = null;
203+
this.position = 0;
204+
this.nameStartPosition = 0;
205+
this.valueEndPosition = 0;
206+
this.stage = Stage.NAME;
207+
}
208+
209+
private void send(ChannelHandlerContext context) {
210+
if (this.nameStartPosition == this.valueEndPosition) {
211+
if (this.event != null) {
212+
context.fireChannelRead(this.event.build());
213+
this.event = null;
214+
}
215+
} else {
216+
String name = this.characters.substring(this.nameStartPosition, this.nameEndPosition);
217+
String value = this.characters.substring(this.valueStartPosition, this.valueEndPosition);
218+
219+
if ("id".equals(name)) {
220+
this.event = this.event != null ? this.event.id(value) : ServerSentEvent.builder().id(value);
221+
} else if ("event".equals(name)) {
222+
this.event = this.event != null ? this.event.eventType(value) : ServerSentEvent.builder().eventType(value);
223+
} else if ("data".equals(name)) {
224+
if (this.event != null) {
225+
ServerSentEvent event = this.event.build();
226+
String data = event.getData() == null ? value : String.format("%s\n%s", event.getData(), value);
227+
228+
this.event = ServerSentEvent.builder()
229+
.id(event.getId())
230+
.eventType(event.getEventType())
231+
.data(data)
232+
.retry(event.getRetry());
233+
} else {
234+
this.event = ServerSentEvent.builder().data(value);
235+
}
236+
} else if ("retry".equals(name)) {
237+
this.event = this.event != null ? this.event.retry(Integer.parseInt(value)) : ServerSentEvent.builder().retry(Integer.parseInt(value));
238+
}
239+
}
240+
241+
this.nameStartPosition = this.position;
242+
this.valueEndPosition = this.position;
243+
this.stage = Stage.NAME;
244+
}
245+
246+
private void value(char c) {
247+
if (CRLF[0] == c) {
248+
this.valueEndPosition = this.position;
249+
this.stage = Stage.CRLF;
250+
this.crlfPosition = 1;
251+
this.position++;
252+
} else if (CRLF[1] == c) {
253+
this.valueEndPosition = this.position;
254+
this.stage = Stage.CRLF;
255+
this.crlfPosition = 2;
256+
this.position++;
257+
} else {
258+
this.position++;
259+
}
260+
}
261+
262+
private enum Stage {
263+
264+
COLON,
265+
266+
COMMENT,
267+
268+
CRLF,
269+
270+
NAME,
271+
272+
VALUE
273+
274+
}
275+
276+
}

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/routing/v1/tcproutes/ReactorTcpRoutes.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,25 @@
2222
import org.cloudfoundry.routing.v1.tcproutes.CreateTcpRoutesRequest;
2323
import org.cloudfoundry.routing.v1.tcproutes.CreateTcpRoutesResponse;
2424
import org.cloudfoundry.routing.v1.tcproutes.DeleteTcpRoutesRequest;
25+
import org.cloudfoundry.routing.v1.tcproutes.EventType;
26+
import org.cloudfoundry.routing.v1.tcproutes.EventsRequest;
2527
import org.cloudfoundry.routing.v1.tcproutes.ListTcpRoutesRequest;
2628
import org.cloudfoundry.routing.v1.tcproutes.ListTcpRoutesResponse;
29+
import org.cloudfoundry.routing.v1.tcproutes.TcpRouteEvent;
2730
import org.cloudfoundry.routing.v1.tcproutes.TcpRoutes;
31+
import reactor.core.Exceptions;
32+
import reactor.core.publisher.Flux;
2833
import reactor.core.publisher.Mono;
2934

35+
import java.io.IOException;
36+
3037
/**
3138
* The Reactor-based implementation of {@link TcpRoutes}
3239
*/
3340
public class ReactorTcpRoutes extends AbstractRoutingV1Operations implements TcpRoutes {
3441

42+
private final ConnectionContext connectionContext;
43+
3544
/**
3645
* Creates an instance
3746
*
@@ -41,6 +50,7 @@ public class ReactorTcpRoutes extends AbstractRoutingV1Operations implements Tcp
4150
*/
4251
public ReactorTcpRoutes(ConnectionContext connectionContext, Mono<String> root, TokenProvider tokenProvider) {
4352
super(connectionContext, root, tokenProvider);
53+
this.connectionContext = connectionContext;
4454
}
4555

4656
@Override
@@ -53,6 +63,22 @@ public Mono<Void> delete(DeleteTcpRoutesRequest request) {
5363
return post(request, Void.class, builder -> builder.pathSegment("routing", "v1", "tcp_routes", "delete"));
5464
}
5565

66+
@Override
67+
public Flux<TcpRouteEvent> events(EventsRequest request) {
68+
return get(builder -> builder.pathSegment("routing", "v1", "tcp_routes", "events"))
69+
.flatMap(inbound -> inbound.addHandler(new EventStreamDecoderChannelHandler()).receiveObject())
70+
.cast(ServerSentEvent.class)
71+
.map(event -> {
72+
try {
73+
return this.connectionContext.getObjectMapper().readValue(event.getData(), TcpRouteEvent.Builder.class)
74+
.eventType(EventType.from(event.getEventType()))
75+
.build();
76+
} catch (IOException e) {
77+
throw Exceptions.propagate(e);
78+
}
79+
});
80+
}
81+
5682
@Override
5783
public Mono<ListTcpRoutesResponse> list(ListTcpRoutesRequest request) {
5884
return get(ListTcpRoutesResponse.class, builder -> builder.pathSegment("routing", "v1", "tcp_routes"));
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2013-2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.cloudfoundry.reactor.routing.v1.tcproutes;
18+
19+
import org.cloudfoundry.Nullable;
20+
import org.immutables.value.Value;
21+
22+
@Value.Immutable
23+
abstract class _ServerSentEvent {
24+
25+
@Nullable
26+
abstract String getData();
27+
28+
@Nullable
29+
abstract String getEventType();
30+
31+
@Nullable
32+
abstract String getId();
33+
34+
@Nullable
35+
abstract Integer getRetry();
36+
37+
}

0 commit comments

Comments
 (0)