1- import io
21import sys
32import time
43from collections import deque
54from functools import partial
6- from typing import Iterable , Optional
5+ from typing import Any , Callable , Iterable , Optional , TextIO , Union
76
7+ import opensearchpy
88from django .db import models
99from django .db .models import Q , QuerySet
1010from opensearchpy .helpers import bulk , parallel_bulk
1111from opensearchpy .helpers .document import Document as DSLDocument
1212
1313from . import fields
1414from .apps import DODConfig
15+ from .enums import BulkAction , CommandAction
1516from .exceptions import ModelFieldNotMappedError
16- from .management .enums import OpensearchAction
1717from .search import Search
1818from .signals import post_index
1919
20- model_field_class_to_field_class = {
20+ model_field_class_to_field_class : dict [ type [ models . Field ], type [ fields . DODField ]] = {
2121 models .AutoField : fields .IntegerField ,
2222 models .BigAutoField : fields .LongField ,
2323 models .BigIntegerField : fields .LongField ,
4848class Document (DSLDocument ):
4949 """Allow the definition of Opensearch' index using Django `Model`."""
5050
51- _prepared_fields = []
51+ _prepared_fields : list [ tuple [ str , fields . DODField , Callable [[ models . Model ], Any ]]] = []
5252
53- def __init__ (self , related_instance_to_ignore = None , ** kwargs ) :
53+ def __init__ (self , related_instance_to_ignore : Any = None , ** kwargs : Any ) -> None :
5454 super (Document , self ).__init__ (** kwargs )
5555 # related instances to ignore is required to remove the instance
5656 # from related models on deletion.
5757 self ._related_instance_to_ignore = related_instance_to_ignore
5858 self ._prepared_fields = self .init_prepare ()
5959
6060 @classmethod
61- def search (cls , using = None , index = None ):
62- """Return a `Search` object parametrized with the index' information."""
61+ def search (cls , using : str = None , index : str = None ) -> opensearchpy . Search :
62+ """Return a `Search` object parametrized with the index information."""
6363 return Search (
6464 using = cls ._get_using (using ),
6565 index = cls ._default_index (index ),
6666 doc_type = [cls ],
6767 model = cls .django .model ,
6868 )
6969
70- def get_queryset (
71- self ,
72- filter_ : Optional [Q ] = None ,
73- exclude : Optional [Q ] = None ,
74- count : int = None ,
75- ) -> QuerySet :
70+ def get_queryset (self , filter_ : Optional [Q ] = None , exclude : Optional [Q ] = None , count : int = None ) -> QuerySet :
7671 """Return the queryset that should be indexed by this doc type."""
7772 qs = self .django .model .objects .all ()
7873
@@ -85,7 +80,7 @@ def get_queryset(
8580
8681 return qs
8782
88- def _eta (self , start , done , total ) : # pragma: no cover
83+ def _eta (self , start : float , done : int , total : int ) -> str : # pragma: no cover
8984 if done == 0 :
9085 return "~"
9186 eta = round ((time .time () - start ) / done * (total - done ))
@@ -101,43 +96,44 @@ def get_indexing_queryset(
10196 filter_ : Optional [Q ] = None ,
10297 exclude : Optional [Q ] = None ,
10398 count : int = None ,
104- action : OpensearchAction = OpensearchAction .INDEX ,
105- stdout : io . FileIO = sys .stdout ,
99+ action : CommandAction = CommandAction .INDEX ,
100+ stdout : TextIO = sys .stdout ,
106101 ) -> Iterable :
107102 """Divide the queryset into chunks."""
108103 chunk_size = self .django .queryset_pagination
109104 qs = self .get_queryset (filter_ = filter_ , exclude = exclude , count = count )
110105 qs = qs .order_by ("pk" ) if not qs .query .is_sliced else qs
111- count = qs .count ()
106+ total = qs .count ()
112107 model = self .django .model .__name__
113108 action = action .present_participle .title ()
114109
115110 i = 0
116111 done = 0
117112 start = time .time ()
118113 if verbose :
119- stdout .write (f"{ action } { model } : 0% ({ self ._eta (start , done , count )} )\r " )
120- while done < count :
114+ stdout .write (f"{ action } { model } : 0% ({ self ._eta (start , done , total )} )\r " )
115+ while done < total :
121116 if verbose :
122- stdout .write (f"{ action } { model } : { round (i / count * 100 )} % ({ self ._eta (start , done , count )} )\r " )
117+ stdout .write (f"{ action } { model } : { round (i / total * 100 )} % ({ self ._eta (start , done , total )} )\r " )
123118
124119 for obj in qs [i : i + chunk_size ]:
125120 done += 1
126121 yield obj
127122
128- i = min (i + chunk_size , count )
123+ i = min (i + chunk_size , total )
129124
130125 if verbose :
131- stdout .write (f"{ action } { count } { model } : OK \n " )
126+ stdout .write (f"{ action } { total } { model } : OK \n " )
132127
133- def init_prepare (self ):
128+ def init_prepare (self ) -> list [ tuple [ str , fields . DODField , Callable [[ models . Model ], Any ]]] :
134129 """Initialise the data model preparers once here.
135130
136131 Extracts the preparers from the model and generate a list of callables
137132 to avoid doing that work on every object instance over.
138133 """
139- index_fields = getattr (self , "_fields" , {})
134+ index_fields : dict [ str , fields . DODField ] = getattr (self , "_fields" , {})
140135 preparers = []
136+ fn : Callable [[models .Model ], Any ]
141137 for name , field in iter (index_fields .items ()):
142138 if not isinstance (field , fields .DODField ): # pragma: no cover
143139 continue
@@ -162,13 +158,13 @@ def init_prepare(self):
162158
163159 return preparers
164160
165- def prepare (self , instance ) :
161+ def prepare (self , instance : models . Model ) -> dict [ str , Any ] :
166162 """Generate the opensearch's document from `instance` based on defined fields."""
167163 data = {name : prep_func (instance ) for name , field , prep_func in self ._prepared_fields }
168164 return data
169165
170166 @classmethod
171- def to_field (cls , field_name , model_field ) :
167+ def to_field (cls , field_name : str , model_field : models . Field ) -> fields . DODField :
172168 """Return the opensearch field instance mapped to the model field class.
173169
174170 This is a good place to hook into if you have more complex
@@ -179,14 +175,16 @@ def to_field(cls, field_name, model_field):
179175 except KeyError : # pragma: no cover
180176 raise ModelFieldNotMappedError (f"Cannot convert model field { field_name } to an Opensearch field!" )
181177
182- def bulk (self , actions , using = None , ** kwargs ):
178+ def bulk (
179+ self , actions : Iterable [dict [str , Any ]], using : str = None , ** kwargs : Any
180+ ) -> Union [tuple [int , int ], tuple [int , list ]]:
183181 """Execute given actions in bulk."""
184182 response = bulk (client = self ._get_connection (using ), actions = actions , ** kwargs )
185183 # send post index signal
186184 post_index .send (sender = self .__class__ , instance = self , actions = actions , response = response )
187185 return response
188186
189- def parallel_bulk (self , actions , using = None , ** kwargs ) :
187+ def parallel_bulk (self , actions : Iterable [ dict [ str , Any ]], using : str = None , ** kwargs : Any ) -> tuple [ int , list ] :
190188 """Parallel version of `bulk`."""
191189 kwargs .setdefault ("chunk_size" , self .django .queryset_pagination )
192190 bulk_actions = parallel_bulk (client = self ._get_connection (using ), actions = actions , ** kwargs )
@@ -199,7 +197,7 @@ def parallel_bulk(self, actions, using=None, **kwargs):
199197 return 1 , []
200198
201199 @classmethod
202- def generate_id (cls , object_instance ) :
200+ def generate_id (cls , object_instance : models . Model ) -> Any :
203201 """Generate the opensearch's _id from a Django `Model` instance.
204202
205203 The default behavior is to use the Django object's pk (id) as the
@@ -208,47 +206,52 @@ def generate_id(cls, object_instance):
208206 """
209207 return object_instance .pk
210208
211- def _prepare_action (self , object_instance , action ) :
209+ def _prepare_action (self , object_instance : models . Model , action : BulkAction ) -> dict [ str , Any ] :
212210 return {
213211 "_op_type" : action ,
214212 "_index" : self ._index ._name , # noqa
215213 "_id" : self .generate_id (object_instance ),
216214 "_source" if action != "update" else "doc" : (self .prepare (object_instance ) if action != "delete" else None ),
217215 }
218216
219- def _get_actions (self , object_list , action ) :
217+ def _get_actions (self , object_list : Iterable [ models . Model ] , action : BulkAction ) -> Iterable [ dict [ str , Any ]] :
220218 for object_instance in object_list :
221219 if action == "delete" or self .should_index_object (object_instance ):
222220 yield self ._prepare_action (object_instance , action )
223221
224- def _bulk (self , * args , parallel = False , using = None , ** kwargs ):
222+ def _bulk (
223+ self , actions : Iterable [dict [str , Any ]], parallel : bool = False , using : str = None , ** kwargs : Any
224+ ) -> Union [tuple [int , int ], tuple [int , list ]]:
225225 """Allow switching between normal and parallel bulk operation."""
226226 if parallel :
227- return self .parallel_bulk (* args , using = using , ** kwargs )
228- return self .bulk (* args , using = using , ** kwargs )
227+ return self .parallel_bulk (actions , using = using , ** kwargs )
228+ return self .bulk (actions , using = using , ** kwargs )
229229
230- def should_index_object (self , obj ) :
230+ def should_index_object (self , object_instance : models . Model ) -> bool :
231231 """Whether given object should be indexed.
232232
233233 Overwriting this method and returning a boolean value should determine
234234 whether the object should be indexed.
235235 """
236236 return True
237237
238- def update (self , thing , action , * args , refresh = None , using = None , ** kwargs ): # noqa
238+ def update ( # type: ignore[override] # noqa
239+ self ,
240+ thing : Union [models .Model , Iterable [models .Model ]],
241+ action : BulkAction ,
242+ refresh : bool = None ,
243+ parallel : bool = None ,
244+ using : str = None ,
245+ ** kwargs : Any ,
246+ ) -> Union [tuple [int , int ], tuple [int , list ]]:
239247 """Update document in OS for a model, iterable of models or queryset."""
240248 if refresh is None :
241249 refresh = getattr (self .Index , "auto_refresh" , DODConfig .auto_refresh_enabled ())
250+ if parallel is None :
251+ parallel = DODConfig .parallel_enabled ()
242252
243- if isinstance (thing , models .Model ):
244- object_list = [thing ]
245- else :
246- object_list = thing
253+ object_list = [thing ] if isinstance (thing , models .Model ) else thing
247254
248255 return self ._bulk (
249- self ._get_actions (object_list , action ),
250- * args ,
251- refresh = refresh ,
252- using = using ,
253- ** kwargs ,
256+ self ._get_actions (object_list , action ), parallel = parallel , refresh = refresh , using = using , ** kwargs
254257 )
0 commit comments