88
99import opensearchpy
1010from django .core .exceptions import FieldError
11- from django .core .management import BaseCommand
11+ from django .core .management import BaseCommand , CommandError
1212from django .db .models import Q
13+ from opensearchpy import OpenSearch
14+ from opensearchpy .connection .connections import connections
1315
1416from ...apps import DODConfig
1517from ...enums import CommandAction
1618from ...registries import registry
1719from ..types import Values , parse
1820
1921
22+ def connection (using : str = "default" ) -> opensearchpy .OpenSearch :
23+ """Return the OpenSearch connection for the given alias."""
24+ try :
25+ return connections .get_connection (using )
26+ except KeyError :
27+ raise CommandError (
28+ f"No OpenSearch connection found for alias '{ using } ', known connections are: { list (connections ._kwargs .keys ())} "
29+ )
30+
31+
2032class Command (BaseCommand ):
2133 """Manage indices and documents."""
2234
@@ -46,20 +58,20 @@ def wrap(value: str) -> tuple[str, Values]:
4658 f"manage.py index: error: invalid filter: '{ value } ' (filter must be formatted as "
4759 f"'[Field Lookups]=[value]')\n " ,
4860 )
49- exit ( 1 )
61+ raise CommandError
5062 return lookup , v # noqa
5163
5264 return wrap
5365
54- def __list_index (self , ** options : Any ) -> None : # noqa pragma: no cover
66+ def __list_index (self , using : OpenSearch , ** options : Any ) -> None : # noqa pragma: no cover
5567 """List all known index and indicate whether they are created or not."""
5668 indices = registry .get_indices ()
5769 result = defaultdict (list )
5870 for index in indices :
5971 module = index ._doc_types [0 ].__module__ .split ("." )[- 2 ] # noqa
60- exists = index .exists ()
72+ exists = index .exists (using = using )
6173 checkbox = f"[{ 'X' if exists else ' ' } ]"
62- count = f" ({ index .search ().count ()} documents)" if exists else ""
74+ count = f" ({ index .search (using = using ).count ()} documents)" if exists else ""
6375 result [module ].append (f"{ checkbox } { index ._name } { count } " )
6476 for app , indice_names in result .items ():
6577 self .stdout .write (self .style .MIGRATE_LABEL (app ))
@@ -72,6 +84,7 @@ def _manage_index(
7284 force : bool ,
7385 verbosity : int ,
7486 ignore_error : bool ,
87+ using : OpenSearch ,
7588 ** options : Any ,
7689 ) -> None :
7790 """Manage the creation and deletion of indices."""
@@ -85,7 +98,7 @@ def _manage_index(
8598 unknown = set (indices ) - set (known_name )
8699 if unknown :
87100 self .stderr .write (f"Unknown indices '{ list (unknown )} ', choices are: '{ known_name } '" )
88- exit ( 1 )
101+ raise CommandError
89102
90103 # Only keep given indices
91104 given_indices = [i for i in known if i ._name in indices ]
@@ -107,7 +120,7 @@ def _manage_index(
107120 self .stdout .write ("" )
108121 break
109122 elif p .lower () in ["no" , "n" ]:
110- exit ( 1 )
123+ raise CommandError
111124
112125 pp = action .present_participle .title ()
113126 for index in given_indices :
@@ -119,24 +132,24 @@ def _manage_index(
119132 self .stdout .flush ()
120133 try :
121134 if action == CommandAction .CREATE :
122- index .create ()
135+ index .create (using = using )
123136 elif action == CommandAction .DELETE :
124- index .delete ()
137+ index .delete (using = using )
125138 elif action == CommandAction .UPDATE :
126- index .put_mapping (body = index .to_dict ()["mappings" ])
139+ index .put_mapping (using = using , body = index .to_dict ()["mappings" ])
127140 else :
128141 try :
129- index .delete ()
142+ index .delete (using = using )
130143 except opensearchpy .exceptions .NotFoundError :
131144 pass
132- index .create ()
145+ index .create (using = using )
133146 except opensearchpy .exceptions .TransportError as e :
134147 if verbosity or not ignore_error :
135148 error = self .style .ERROR (f"Error: { e .error } - { e .info } " )
136149 self .stderr .write (f"{ pp } index '{ index ._name } '...\n { error } " ) # noqa
137150 if not ignore_error :
138151 self .stderr .write ("exiting..." )
139- exit ( 1 )
152+ raise CommandError
140153 else :
141154 if verbosity :
142155 self .stdout .write (f"{ pp } index '{ index ._name } '... { self .style .SUCCESS ('OK' )} " ) # noqa
@@ -153,6 +166,8 @@ def _manage_document(
153166 count : bool ,
154167 refresh : bool ,
155168 missing : bool ,
169+ using : OpenSearch ,
170+ database : str ,
156171 ** options : Any ,
157172 ) -> None :
158173 """Manage the creation and deletion of indices."""
@@ -168,19 +183,19 @@ def _manage_document(
168183 unknown = set (indices ) - set (known_name )
169184 if unknown :
170185 self .stderr .write (f"Unknown indices '{ list (unknown )} ', choices are: '{ known_name } '" )
171- exit ( 1 )
186+ raise CommandError
172187
173188 # Only keep given indices
174189 given_indices = list (filter (lambda i : i ._name in indices , known )) # type: ignore[arg-type]
175190 else :
176191 given_indices = list (known )
177192
178193 # Ensure every indices needed are created
179- not_created = [i ._name for i in given_indices if not i .exists ()] # noqa
194+ not_created = [i ._name for i in given_indices if not i .exists (using = using )] # noqa
180195 if not_created :
181196 self .stderr .write (f"The following indices are not created : { not_created } " )
182197 self .stderr .write ("Use 'python3 manage.py opensearch list' to list indices' state." )
183- exit ( 1 )
198+ raise CommandError
184199
185200 # Check field, preparing to display expected actions
186201 s = f"The following documents will be { action .past } :"
@@ -189,17 +204,17 @@ def _manage_document(
189204 # Handle --missing
190205 exclude_ = exclude
191206 if missing and action == CommandAction .INDEX :
192- q = Q (pk__in = [h .meta .id for h in index .search ().extra (stored_fields = []).scan ()])
207+ q = Q (pk__in = [h .meta .id for h in index .search (using = using ).extra (stored_fields = []).scan ()])
193208 exclude_ = exclude_ & q if exclude_ is not None else q
194209
195210 document = index ._doc_types [0 ]() # noqa
196211 try :
197212 kwargs_list .append ({"filter_" : filter_ , "exclude" : exclude_ , "count" : count })
198- qs = document .get_queryset (filter_ = filter_ , exclude = exclude_ , count = count ).count ()
213+ qs = document .get_queryset (filter_ = filter_ , exclude = exclude_ , count = count , alias = database ).count ()
199214 except FieldError as e :
200215 model = index ._doc_types [0 ].django .model .__name__ # noqa
201216 self .stderr .write (f"Error while filtering on '{ model } ' (from index '{ index ._name } '):\n { e } '" ) # noqa
202- exit ( 1 )
217+ raise CommandError
203218 else :
204219 s += f"\n \t - { qs } { document .django .model .__name__ } ."
205220
@@ -215,14 +230,16 @@ def _manage_document(
215230 self .stdout .write ("\n " )
216231 break
217232 elif p .lower () in ["no" , "n" ]:
218- exit ( 1 )
233+ raise CommandError
219234
220235 result = "\n "
221236 for index , kwargs in zip (given_indices , kwargs_list ):
222237 document = index ._doc_types [0 ]() # noqa
223- qs = document .get_indexing_queryset (stdout = self .stdout ._out , verbose = verbosity , action = action , ** kwargs )
238+ qs = document .get_indexing_queryset (
239+ stdout = self .stdout ._out , verbose = verbosity , action = action , alias = database , ** kwargs
240+ )
224241 success , errors = document .update (
225- qs , parallel = parallel , refresh = refresh , action = action , raise_on_error = False
242+ qs , parallel = parallel , refresh = refresh , action = action , raise_on_error = False , using = using
226243 )
227244
228245 success_str = self .style .SUCCESS (success ) if success else success
@@ -256,6 +273,12 @@ def add_arguments(self, parser: argparse.ArgumentParser) -> None:
256273 description = "Show all available indices (and their state) for the current project." ,
257274 )
258275 subparser .set_defaults (func = self .__list_index )
276+ subparser .add_argument (
277+ "--using" ,
278+ type = connection ,
279+ default = connection ("default" ),
280+ help = "Alias of the OpenSearch connection to use. Default to 'default'." ,
281+ )
259282
260283 # 'manage' subcommand
261284 subparser = subparsers .add_parser (
@@ -264,6 +287,12 @@ def add_arguments(self, parser: argparse.ArgumentParser) -> None:
264287 description = "Manage the creation an deletion of indices." ,
265288 )
266289 subparser .set_defaults (func = self ._manage_index )
290+ subparser .add_argument (
291+ "--using" ,
292+ type = connection ,
293+ default = connection ("default" ),
294+ help = "Alias of the OpenSearch connection to use. Default to 'default'." ,
295+ )
267296 subparser .add_argument (
268297 "action" ,
269298 type = str ,
@@ -308,6 +337,13 @@ def add_arguments(self, parser: argparse.ArgumentParser) -> None:
308337 CommandAction .UPDATE .value ,
309338 ],
310339 )
340+ subparser .add_argument (
341+ "--using" ,
342+ type = connection ,
343+ default = connection ("default" ),
344+ help = "Alias of the OpenSearch connection to use. Default to 'default'." ,
345+ )
346+ subparser .add_argument ("-d" , "--database" , default = None , dest = "database" , help = "Nominates a database." )
311347 subparser .add_argument (
312348 "-f" ,
313349 "--filters" ,
@@ -383,6 +419,6 @@ def handle(self, *args: Any, **options: Any) -> None:
383419 if "func" not in options : # pragma: no cover
384420 self .stderr .write (self .usage )
385421 self .stderr .write (f"manage.py opensearch: error: no subcommand provided." )
386- exit ( 1 )
422+ raise CommandError
387423
388424 options ["func" ](** options )
0 commit comments