@@ -84,7 +84,6 @@ final class StreamProcessor implements DataSource {
8484 @VisibleForTesting final URI streamUri;
8585 @VisibleForTesting final Duration initialReconnectDelay;
8686 private final DiagnosticAccumulator diagnosticAccumulator;
87- private final EventSourceCreator eventSourceCreator;
8887 private final int threadPriority;
8988 private final DataStoreStatusProvider.StatusListener statusListener;
9089 private volatile EventSource es;
@@ -94,34 +93,9 @@ final class StreamProcessor implements DataSource {
9493
9594 ConnectionErrorHandler connectionErrorHandler = createDefaultConnectionErrorHandler(); // exposed for testing
9695
97- static final class EventSourceParams {
98- final EventHandler handler;
99- final URI streamUri;
100- final Duration initialReconnectDelay;
101- final ConnectionErrorHandler errorHandler;
102- final Headers headers;
103- final HttpConfiguration httpConfig;
104-
105- EventSourceParams(EventHandler handler, URI streamUri, Duration initialReconnectDelay,
106- ConnectionErrorHandler errorHandler, Headers headers, HttpConfiguration httpConfig) {
107- this.handler = handler;
108- this.streamUri = streamUri;
109- this.initialReconnectDelay = initialReconnectDelay;
110- this.errorHandler = errorHandler;
111- this.headers = headers;
112- this.httpConfig = httpConfig;
113- }
114- }
115-
116- @FunctionalInterface
117- static interface EventSourceCreator {
118- EventSource createEventSource(EventSourceParams params);
119- }
120-
12196 StreamProcessor(
12297 HttpConfiguration httpConfig,
12398 DataSourceUpdates dataSourceUpdates,
124- EventSourceCreator eventSourceCreator,
12599 int threadPriority,
126100 DiagnosticAccumulator diagnosticAccumulator,
127101 URI streamUri,
@@ -130,7 +104,6 @@ static interface EventSourceCreator {
130104 this.dataSourceUpdates = dataSourceUpdates;
131105 this.httpConfig = httpConfig;
132106 this.diagnosticAccumulator = diagnosticAccumulator;
133- this.eventSourceCreator = eventSourceCreator != null ? eventSourceCreator : this::defaultEventSourceCreator;
134107 this.threadPriority = threadPriority;
135108 this.streamUri = streamUri;
136109 this.initialReconnectDelay = initialReconnectDelay;
@@ -202,13 +175,26 @@ public Future<Void> start() {
202175 };
203176
204177 EventHandler handler = new StreamEventHandler(initFuture);
205-
206- es = eventSourceCreator.createEventSource(new EventSourceParams(handler,
207- concatenateUriPath(streamUri, STREAM_URI_PATH),
208- initialReconnectDelay,
209- wrappedConnectionErrorHandler,
210- headers,
211- httpConfig));
178+ URI endpointUri = concatenateUriPath(streamUri, STREAM_URI_PATH);
179+
180+ EventSource.Builder builder = new EventSource.Builder(handler, endpointUri)
181+ .threadPriority(threadPriority)
182+ .loggerBaseName(Loggers.DATA_SOURCE_LOGGER_NAME)
183+ .clientBuilderActions(new EventSource.Builder.ClientConfigurer() {
184+ public void configure(OkHttpClient.Builder builder) {
185+ configureHttpClientBuilder(httpConfig, builder);
186+ }
187+ })
188+ .connectionErrorHandler(wrappedConnectionErrorHandler)
189+ .headers(headers)
190+ .reconnectTime(initialReconnectDelay)
191+ .readTimeout(DEAD_CONNECTION_INTERVAL);
192+ // Note that this is not the same read timeout that can be set in LDConfig. We default to a smaller one
193+ // there because we don't expect long delays within any *non*-streaming response that the LD client gets.
194+ // A read timeout on the stream will result in the connection being cycled, so we set this to be slightly
195+ // more than the expected interval between heartbeat signals.
196+
197+ es = builder.build();
212198 esStarted = System.currentTimeMillis();
213199 es.start();
214200 return initFuture;
@@ -356,27 +342,6 @@ public void onError(Throwable throwable) {
356342 }
357343 }
358344
359- private EventSource defaultEventSourceCreator(EventSourceParams params) {
360- EventSource.Builder builder = new EventSource.Builder(params.handler, params.streamUri)
361- .threadPriority(threadPriority)
362- .loggerBaseName(Loggers.DATA_SOURCE_LOGGER_NAME)
363- .clientBuilderActions(new EventSource.Builder.ClientConfigurer() {
364- public void configure(OkHttpClient.Builder builder) {
365- configureHttpClientBuilder(params.httpConfig, builder);
366- }
367- })
368- .connectionErrorHandler(params.errorHandler)
369- .headers(params.headers)
370- .reconnectTime(params.initialReconnectDelay)
371- .readTimeout(DEAD_CONNECTION_INTERVAL);
372- // Note that this is not the same read timeout that can be set in LDConfig. We default to a smaller one
373- // there because we don't expect long delays within any *non*-streaming response that the LD client gets.
374- // A read timeout on the stream will result in the connection being cycled, so we set this to be slightly
375- // more than the expected interval between heartbeat signals.
376-
377- return builder.build();
378- }
379-
380345 private static Map.Entry<DataKind, String> getKindAndKeyFromStreamApiPath(String path) throws StreamInputException {
381346 if (path == null) {
382347 throw new StreamInputException("missing item path");
0 commit comments