4848import org .hypertrace .core .attribute .service .v1 .UpdateMetadataRequest ;
4949import org .hypertrace .core .attribute .service .v1 .UpdateMetadataResponse ;
5050import org .hypertrace .core .attribute .service .validator .AttributeMetadataValidator ;
51+ import org .hypertrace .core .documentstore .CloseableIterator ;
5152import org .hypertrace .core .documentstore .Collection ;
5253import org .hypertrace .core .documentstore .Datastore ;
5354import org .hypertrace .core .documentstore .DatastoreProvider ;
@@ -106,22 +107,8 @@ public AttributeServiceImpl(Config config, PlatformServiceLifecycle platformServ
106107 private Datastore initDataStore (
107108 Config config , PlatformServiceLifecycle platformServiceLifecycle ) {
108109 final Config docStoreConfig = config .getConfig (DOC_STORE_CONFIG_KEY );
109- final String dataStoreType = docStoreConfig .getString (DATA_STORE_TYPE );
110110 final DatastoreConfig datastoreConfig =
111- TypesafeConfigDatastoreConfigExtractor .from (docStoreConfig , DATA_STORE_TYPE )
112- .hostKey (dataStoreType + ".host" )
113- .portKey (dataStoreType + ".port" )
114- .keysForEndpoints (dataStoreType + ".endpoints" , "host" , "port" )
115- .authDatabaseKey (dataStoreType + ".authDb" )
116- .replicaSetKey (dataStoreType + ".replicaSet" )
117- .databaseKey (dataStoreType + ".database" )
118- .usernameKey (dataStoreType + ".user" )
119- .passwordKey (dataStoreType + ".password" )
120- .applicationNameKey ("appName" )
121- .poolMaxConnectionsKey ("maxPoolSize" )
122- .poolConnectionAccessTimeoutKey ("connectionAccessTimeout" )
123- .poolConnectionSurrenderTimeoutKey ("connectionIdleTime" )
124- .extract ();
111+ TypesafeConfigDatastoreConfigExtractor .from (docStoreConfig , DATA_STORE_TYPE ).extract ();
125112
126113 final Datastore datastore = DatastoreProvider .getDatastore (datastoreConfig );
127114 new DocStoreMetricsRegistry (datastore )
@@ -185,10 +172,8 @@ public void updateSourceMetadata(
185172 return ;
186173 }
187174
188- try {
189- // Fetch attributes by FQN
190- Iterator <Document > documents =
191- collection .search (getQueryByTenantIdAndFQN (tenantId .get (), request .getFqn ()));
175+ try (final CloseableIterator <Document > documents =
176+ collection .search (getQueryByTenantIdAndFQN (tenantId .get (), request .getFqn ()))) {
192177 // For each attribute matching the FQN update the source metadata
193178 boolean status =
194179 StreamSupport .stream (Spliterators .spliteratorUnknownSize (documents , 0 ), false )
@@ -243,9 +228,8 @@ public void delete(
243228 return ;
244229 }
245230
246- try {
247- Iterator <Document > documents =
248- collection .search (getQueryForFilter (tenantId .get (), modifiedRequest ));
231+ try (final CloseableIterator <Document > documents =
232+ collection .search (getQueryForFilter (tenantId .get (), modifiedRequest ))) {
249233 boolean status =
250234 StreamSupport .stream (Spliterators .spliteratorUnknownSize (documents , 0 ), false )
251235 .map (Document ::toJson )
@@ -299,10 +283,8 @@ public void deleteSourceMetadata(
299283 return ;
300284 }
301285
302- try {
303- // Fetch attributes by FQN
304- Iterator <Document > documents =
305- collection .search (getQueryByTenantIdAndFQN (tenantId .get (), request .getFqn ()));
286+ try (final CloseableIterator <Document > documents =
287+ collection .search (getQueryByTenantIdAndFQN (tenantId .get (), request .getFqn ()))) {
306288 // For each attribute matching the FQN update the source metadata
307289 boolean status =
308290 StreamSupport .stream (Spliterators .spliteratorUnknownSize (documents , 0 ), false )
@@ -354,8 +336,8 @@ public void findAttributes(
354336 return ;
355337 }
356338
357- try {
358- Iterator < Document > documents = collection .search (getQueryForFilter (tenantId .get (), request ));
339+ try ( final CloseableIterator < Document > documents =
340+ collection .search (getQueryForFilter (tenantId .get (), request ))) {
359341 sendResult (documents , responseObserver );
360342 } catch (Exception e ) {
361343 LOGGER .error ("Error finding attributes with filter:" + request , e );
@@ -371,12 +353,11 @@ public void findAll(Empty request, StreamObserver<AttributeMetadata> responseObs
371353 return ;
372354 }
373355
374- try {
375- // query with filter on Tenant id
376- Query query = new Query ();
377- query .setFilter (getTenantIdInFilter (TenantUtils .getTenantHierarchy (tenantId .get ())));
356+ // query with filter on Tenant id
357+ Query query = new Query ();
358+ query .setFilter (getTenantIdInFilter (TenantUtils .getTenantHierarchy (tenantId .get ())));
378359
379- Iterator <Document > documents = collection .search (query );
360+ try ( final CloseableIterator <Document > documents = collection .search (query )) {
380361 sendResult (documents , responseObserver );
381362 } catch (Exception e ) {
382363 LOGGER .error ("Error finding all attributes" , e );
@@ -393,15 +374,21 @@ public void getAttributes(
393374 return ;
394375 }
395376
396- List <AttributeMetadata > attributes =
397- Streams .stream (collection .search (this .getQueryForFilter (tenantId , request .getFilter ())))
398- .map (converter ::convert )
399- .flatMap (Optional ::stream )
400- .collect (Collectors .toUnmodifiableList ());
377+ try (final CloseableIterator <Document > iterator =
378+ collection .search (this .getQueryForFilter (tenantId , request .getFilter ()))) {
379+ List <AttributeMetadata > attributes =
380+ Streams .stream (iterator )
381+ .map (converter ::convert )
382+ .flatMap (Optional ::stream )
383+ .collect (Collectors .toUnmodifiableList ());
401384
402- responseObserver .onNext (
403- GetAttributesResponse .newBuilder ().addAllAttributes (attributes ).build ());
404- responseObserver .onCompleted ();
385+ responseObserver .onNext (
386+ GetAttributesResponse .newBuilder ().addAllAttributes (attributes ).build ());
387+ responseObserver .onCompleted ();
388+ } catch (Exception e ) {
389+ LOGGER .error ("Error getting attributes" , e );
390+ responseObserver .onError (e );
391+ }
405392 }
406393
407394 @ Override
0 commit comments