@@ -166,12 +166,13 @@ def poll(
166166 start_block : int ,
167167 end_block : int ,
168168 return_first_result : bool = False ,
169+ return_first_result_function : Optional [callable ] = None ,
169170 initial_snapshot : bool = False ,
170171 return_type : str = "df"
171172 ):
172173
173174 return_dict_interface = {"data" : [], "module_name" : output_module , "data_block" : str (start_block ), "error" : None }
174- valid_return_types = ["dict" , "df" ]
175+ valid_return_types = ["dict" , "df" , "csv" ]
175176 results = []
176177 raw_results = defaultdict (lambda : {"data" : list (), "snapshots" : list ()})
177178
@@ -221,16 +222,25 @@ def poll(
221222 if len (parsed ) > 0 :
222223 parsed = [dict (item , ** {'block' :data ["clock" ]["number" ]}) for item in parsed ]
223224 if return_first_result is True :
224- break
225+ if callable (return_first_result_function ):
226+ func_result = return_first_result_function (parsed )
227+ if func_result is True :
228+ break
229+ else :
230+ continue
231+ else :
232+ break
225233 elif int (return_dict_interface ["data_block" ]) + 1 == end_block :
226234 results = return_dict_interface
227235
228236 if return_first_result is True and parsed :
229- return_dict_interface ["data" ] = parsed
230237 if return_type == "dict" :
231- results = return_dict_interface
238+ return_dict_interface [ "data" ] = parsed
232239 if return_type == "df" :
233- results = pd .DataFrame (parsed )
240+ return_dict_interface ["data" ] = pd .DataFrame (parsed )
241+ if return_type == "csv" :
242+ return_dict_interface ["data" ] = pd .DataFrame (parsed ).to_csv (index = False )
243+ results = return_dict_interface
234244 if return_first_result is False and raw_results :
235245 result = SubstreamOutput (module_name = output_module )
236246 data_dict : dict = raw_results .get (output_module )
@@ -242,6 +252,9 @@ def poll(
242252 if return_type == "dict" :
243253 return_dict_interface ["data" ] = results .to_dict ()
244254 results = return_dict_interface
255+ if return_type == "csv" :
256+ return_dict_interface ["data" ] = results .to_csv (index = False )
257+ results = return_dict_interface
245258 except Exception as err :
246259 error_to_pass = err
247260 if isinstance (err , Exception ):
0 commit comments