diff --git a/.gitignore b/.gitignore index b4a29500a..c793d5897 100644 --- a/.gitignore +++ b/.gitignore @@ -179,3 +179,6 @@ miner_objects/miner_dashboard/*.tsbuildinfo # macOS files .DS_Store + +#vim +*.swp diff --git a/data_generator/base_data_service.py b/data_generator/base_data_service.py index 12b60786a..d9d555bc9 100644 --- a/data_generator/base_data_service.py +++ b/data_generator/base_data_service.py @@ -371,7 +371,7 @@ async def handle_msg(self, msg): def instantiate_not_pickleable_objects(self): raise NotImplementedError - def get_closes_websocket(self, trade_pairs: List[TradePair], trade_pair_to_last_order_time_ms) -> dict[str: PriceSource]: + def get_closes_websocket(self, trade_pairs: List[TradePair], time_ms) -> dict[str: PriceSource]: events = {} for trade_pair in trade_pairs: symbol = trade_pair.trade_pair @@ -379,14 +379,13 @@ def get_closes_websocket(self, trade_pairs: List[TradePair], trade_pair_to_last_ continue # Get the closest aligned event - time_ms = trade_pair_to_last_order_time_ms[trade_pair] symbol = trade_pair.trade_pair latest_event = self.trade_pair_to_recent_events[symbol].get_closest_event(time_ms) events[trade_pair] = latest_event return events - def get_closes_rest(self, trade_pairs: List[TradePair]) -> dict[str: float]: + def get_closes_rest(self, trade_pairs: List[TradePair], time_ms) -> dict[str: float]: pass def get_websocket_lag_for_trade_pair_s(self, tp: str, now_ms: int) -> float | None: diff --git a/data_generator/polygon_data_service.py b/data_generator/polygon_data_service.py index 9bdbb389c..dad45db69 100644 --- a/data_generator/polygon_data_service.py +++ b/data_generator/polygon_data_service.py @@ -392,12 +392,12 @@ def symbol_to_trade_pair(self, symbol: str): raise ValueError(f"Unknown symbol: {symbol}") return tp - def get_closes_rest(self, pairs: List[TradePair]) -> dict: + def get_closes_rest(self, trade_pairs: List[TradePair], time_ms, live=True) -> dict: all_trade_pair_closes = {} # Multi-threaded fetching of REST data over all requested trade pairs. Max parallelism is 5. with ThreadPoolExecutor(max_workers=5) as executor: # Dictionary to keep track of futures - future_to_trade_pair = {executor.submit(self.get_close_rest, p): p for p in pairs} + future_to_trade_pair = {executor.submit(self.get_close_rest, p, time_ms): p for p in trade_pairs} for future in as_completed(future_to_trade_pair): tp = future_to_trade_pair[future] @@ -931,7 +931,7 @@ def get_currency_conversion(self, trade_pair: TradePair=None, base: str=None, qu #if tp != TradePair.GBPUSD: # continue - print('getting close for', tp.trade_pair_id, ':', polygon_data_provider.get_close_rest(tp)) + print('getting close for', tp.trade_pair_id, ':', polygon_data_provider.get_close_rest(tp, TimeUtil.now_in_millis())) time.sleep(100000) @@ -951,4 +951,4 @@ def get_currency_conversion(self, trade_pair: TradePair=None, base: str=None, qu aggs.append(a) assert 0, aggs - """ \ No newline at end of file + """ diff --git a/data_generator/tiingo_data_service.py b/data_generator/tiingo_data_service.py index 5da4cc1b6..3db31c4b8 100644 --- a/data_generator/tiingo_data_service.py +++ b/data_generator/tiingo_data_service.py @@ -71,7 +71,7 @@ async def connect(self, handle_msg): # Get price data synchronously but don't block the event loop loop = asyncio.get_event_loop() price_sources = await loop.run_in_executor( - None, self._svc.get_closes_rest, trade_pairs_to_query, False + None, self._svc.get_closes_rest, trade_pairs_to_query, current_time * 1000 ) # Process each price source @@ -330,27 +330,27 @@ def symbol_to_trade_pair(self, symbol: str): raise ValueError(f"Unknown symbol: {symbol}") return tp - def get_closes_rest(self, pairs: List[TradePair], verbose=False) -> dict[TradePair: PriceSource]: - tp_equities = [tp for tp in pairs if tp.trade_pair_category == TradePairCategory.EQUITIES] - tp_crypto = [tp for tp in pairs if tp.trade_pair_category == TradePairCategory.CRYPTO] - tp_forex = [tp for tp in pairs if tp.trade_pair_category == TradePairCategory.FOREX] + def get_closes_rest(self, trade_pairs: List[TradePair], time_ms, live=True, verbose=False) -> dict[TradePair: PriceSource]: + tp_equities = [tp for tp in trade_pairs if tp.trade_pair_category == TradePairCategory.EQUITIES] + tp_crypto = [tp for tp in trade_pairs if tp.trade_pair_category == TradePairCategory.CRYPTO] + tp_forex = [tp for tp in trade_pairs if tp.trade_pair_category == TradePairCategory.FOREX] # Jobs to parallelize jobs = [] if tp_equities: - jobs.append((self.get_closes_equities, tp_equities, verbose)) + jobs.append((self.get_closes_equities, tp_equities, time_ms, live, verbose)) if tp_crypto: - jobs.append((self.get_closes_crypto, tp_crypto, verbose)) + jobs.append((self.get_closes_crypto, tp_crypto, time_ms, live, verbose)) if tp_forex: - jobs.append((self.get_closes_forex, tp_forex, verbose)) + jobs.append((self.get_closes_forex, tp_forex, time_ms, live, verbose)) tp_to_price = {} if len(jobs) == 0: return tp_to_price elif len(jobs) == 1: - func, tp_list, verbose = jobs[0] - return func(tp_list, verbose) + func, tp_list, target_time_ms, live_flag, verbose_flag = jobs[0] + return func(tp_list, target_time_ms, live_flag, verbose_flag) # Use ThreadPoolExecutor for parallelization if there are multiple jobs with ThreadPoolExecutor() as executor: @@ -366,8 +366,8 @@ def get_closes_rest(self, pairs: List[TradePair], verbose=False) -> dict[TradePa return tp_to_price @exception_handler_decorator() - def get_closes_equities(self, trade_pairs: List[TradePair], verbose=False, target_time_ms=None) -> dict[TradePair: PriceSource]: - if target_time_ms: + def get_closes_equities(self, trade_pairs: List[TradePair], target_time_ms: int, live: bool, verbose=False) -> dict[TradePair, PriceSource]: + if not live: raise Exception('TODO') tp_to_price = {} if not trade_pairs: @@ -432,10 +432,10 @@ def target_ms_to_start_end_formatted(self, target_time_ms): return start_day_formatted, end_day_formatted @exception_handler_decorator() - def get_closes_forex(self, trade_pairs: List[TradePair], verbose=False, target_time_ms=None) -> dict: + def get_closes_forex(self, trade_pairs: List[TradePair], target_time_ms: int, live: bool, verbose=False) -> dict: def tickers_to_tiingo_forex_url(tickers: List[str]) -> str: - if target_time_ms: + if not live: start_day_formatted, end_day_formatted = self.target_ms_to_start_end_formatted(target_time_ms) return f"https://api.tiingo.com/tiingo/fx/prices?tickers={','.join(tickers)}&startDate={start_day_formatted}&endDate={end_day_formatted}&resampleFreq=1min&token={self.config['api_key']}" return f"https://api.tiingo.com/tiingo/fx/top?tickers={','.join(tickers)}&token={self.config['api_key']}" @@ -457,7 +457,7 @@ def tickers_to_tiingo_forex_url(tickers: List[str]) -> str: lowest_delta = float('inf') for x in requestResponse.json(): tp = TradePair.get_latest_trade_pair_from_trade_pair_id(x['ticker'].upper()) - if target_time_ms: + if not live: # Rows look like {'close': 148.636, 'date': '2025-03-21T00:00:00.000Z', 'high': 148.6575, 'low': 148.5975, 'open': 148.6245, 'ticker': 'usdjpy'} attempting_previous_close = not self.is_market_open(tp, time_ms=target_time_ms) data_time_ms = TimeUtil.parse_iso_to_ms(x['date']) @@ -523,117 +523,75 @@ def tickers_to_tiingo_forex_url(tickers: List[str]) -> str: return tp_to_price @exception_handler_decorator() - def get_closes_crypto(self, trade_pairs: List[TradePair], verbose=False, target_time_ms=None) -> dict: - tp_to_price = {} - if not trade_pairs: - return tp_to_price - assert all(tp.trade_pair_category == TradePairCategory.CRYPTO for tp in trade_pairs), trade_pairs + def get_closes_crypto(self, trade_pairs: List[TradePair], target_time_ms: int, live: bool, verbose=False) -> dict: def tickers_to_crypto_url(tickers: List[str]) -> str: - if target_time_ms: + if not live: # YYYY-MM-DD format. start_day_formatted, end_day_formatted = self.target_ms_to_start_end_formatted(target_time_ms) # "https://api.tiingo.com/tiingo/crypto/prices?tickers=btcusd&startDate=2019-01-02&resampleFreq=5min&token=ffb55f7fdd167d4b8047539e6b62d82b92b25f91" return f"https://api.tiingo.com/tiingo/crypto/prices?tickers={','.join(tickers)}&startDate={start_day_formatted}&endDate={end_day_formatted}&resampleFreq=1min&token={self.config['api_key']}&exchanges={TIINGO_COINBASE_EXCHANGE_STR.upper()}" - return f"https://api.tiingo.com/tiingo/crypto/top?tickers={','.join(tickers)}&token={self.config['api_key']}&exchanges={TIINGO_COINBASE_EXCHANGE_STR.upper()}" + return f"https://api.tiingo.com/tiingo/crypto/prices?tickers={','.join(tickers)}&token={self.config['api_key']}&exchanges={TIINGO_COINBASE_EXCHANGE_STR.upper()}" + + tp_to_price = {} + if not trade_pairs: + return tp_to_price + + assert all(tp.trade_pair_category == TradePairCategory.CRYPTO for tp in trade_pairs), trade_pairs url = tickers_to_crypto_url([self.trade_pair_to_tiingo_ticker(x) for x in trade_pairs]) if verbose: print('hitting url', url) requestResponse = requests.get(url, headers={'Content-Type': 'application/json'}, timeout=5) + if requestResponse.status_code != 200: + return {} - if requestResponse.status_code == 200: - response_data = requestResponse.json() + response_data = requestResponse.json() + timespan_ms = self.timespan_to_ms['minute'] - if target_time_ms: - # Historical data has a different structure - the items are in data[0]['priceData'] - if not response_data or len(response_data) == 0: - return tp_to_price - for crypto_data in response_data: - ticker = crypto_data['ticker'] - - # Skip if no price data available - if not crypto_data.get('priceData') or len(crypto_data['priceData']) == 0: - continue + for crypto_data in response_data: + ticker = crypto_data['ticker'] + price_data = crypto_data.get('priceData', None) + if not price_data: + continue - # Find the closest price data point to target_time_ms - price_data = sorted(crypto_data['priceData'], - key=lambda x: TimeUtil.parse_iso_to_ms(x['date'])) - - closest_data = min(price_data, - key=lambda x: abs(TimeUtil.parse_iso_to_ms(x['date']) - target_time_ms)) - - data_time_ms = TimeUtil.parse_iso_to_ms(closest_data['date']) - price = float(closest_data['close']) - bid_price = ask_price = 0 # Bid/ask not provided in historical data - - tp = TradePair.get_latest_trade_pair_from_trade_pair_id(ticker.upper()) - source_name = f'{TIINGO_PROVIDER_NAME}_{TIINGO_COINBASE_EXCHANGE_STR}_historical' - exchange = TIINGO_COINBASE_EXCHANGE_STR - - # Create PriceSource - tp_to_price[tp] = PriceSource( - source=source_name, - timespan_ms=self.timespan_to_ms['minute'], - open=float(closest_data['open']), - close=price, - vwap=price, - high=float(closest_data['high']), - low=float(closest_data['low']), - start_ms=data_time_ms, - websocket=False, - lag_ms=target_time_ms - data_time_ms, - bid=bid_price, - ask=ask_price - ) - - if verbose: - self.log_price_info(tp, tp_to_price[tp], target_time_ms, data_time_ms, - closest_data['date'], price, exchange, closest_data) - else: - now_ms = TimeUtil.now_in_millis() - # Current data format (top endpoint) - for crypto_data in response_data: - ticker = crypto_data['ticker'] - if len(crypto_data['topOfBookData']) != 1: - print('Tiingo unexpected data', crypto_data) - continue + # Find the closest price data point to target_time_ms + # data time is start time, add timespan to match close price time + closest_data = min(price_data, + key=lambda x: abs(TimeUtil.parse_iso_to_ms(x['date']) + timespan_ms - target_time_ms)) + + data_time_ms = TimeUtil.parse_iso_to_ms(closest_data["date"]) + timespan_ms + price_close = float(closest_data['close']) + bid_price = ask_price = 0 # Bid/ask not provided in historical data + + tp = TradePair.get_latest_trade_pair_from_trade_pair_id(ticker.upper()) + source_name = f'{TIINGO_PROVIDER_NAME}_{TIINGO_COINBASE_EXCHANGE_STR}' + exchange = TIINGO_COINBASE_EXCHANGE_STR + + # Create PriceSource + tp_to_price[tp] = PriceSource( + source=source_name, + timespan_ms=timespan_ms, + open=float(closest_data['open']), + close=float(closest_data['close']), + vwap=price_close, + high=price_close, + low=float(closest_data['low']), + start_ms=data_time_ms, + websocket=False, + lag_ms=target_time_ms - data_time_ms, + bid=bid_price, + ask=ask_price, + ) - book_data = crypto_data['topOfBookData'][0] - - # Determine the data source and timestamp - data_time_ms, price, exchange, bid_price, ask_price = self.get_best_crypto_price_info( - book_data, now_ms, TIINGO_COINBASE_EXCHANGE_STR - ) - - # Create trade pair - tp = TradePair.get_latest_trade_pair_from_trade_pair_id(ticker.upper()) - price = float(price) - source_name = f'{TIINGO_PROVIDER_NAME}_{exchange}_rest' - - # Create PriceSource - tp_to_price[tp] = PriceSource( - source=source_name, - timespan_ms=self.timespan_to_ms['minute'], - open=price, - close=price, - vwap=price, - high=price, - low=price, - start_ms=data_time_ms, - websocket=False, - lag_ms=now_ms - data_time_ms, - bid=bid_price, - ask=ask_price - ) - - if verbose: - self.log_price_info(tp, tp_to_price[tp], now_ms, data_time_ms, - book_data['quoteTimestamp'], price, exchange, book_data) + if verbose: + self.log_price_info(tp, tp_to_price[tp], target_time_ms, data_time_ms, + closest_data["date"], price_close, exchange, closest_data) return tp_to_price + # Previously used for deprecated tiingo top-of-book rest endpoint - not used anymore (06/24/2025) def get_best_crypto_price_info(self, book_data, now_ms, preferred_exchange): """Helper function to determine the best price info from book data""" data_time_exchange_ms = TimeUtil.parse_iso_to_ms(book_data['lastSaleTimestamp']) @@ -684,14 +642,14 @@ def log_price_info(self, tp, price_source, now_ms, data_time_ms, timestamp, pric def get_close_rest( self, trade_pair: TradePair, - attempting_prev_close: bool = False, - target_time_ms: int | None = None) -> PriceSource | None: + timestamp_ms: int, + live=True) -> PriceSource | None: if trade_pair.trade_pair_category == TradePairCategory.EQUITIES: - ans = self.get_closes_equities([trade_pair], target_time_ms=target_time_ms).get(trade_pair) + ans = self.get_closes_equities([trade_pair], timestamp_ms, live).get(trade_pair) elif trade_pair.trade_pair_category == TradePairCategory.CRYPTO: - ans = self.get_closes_crypto([trade_pair], target_time_ms=target_time_ms).get(trade_pair) + ans = self.get_closes_crypto([trade_pair], timestamp_ms, live).get(trade_pair) elif trade_pair.trade_pair_category == TradePairCategory.FOREX: - ans = self.get_closes_forex([trade_pair], target_time_ms=target_time_ms).get(trade_pair) + ans = self.get_closes_forex([trade_pair], timestamp_ms, live).get(trade_pair) else: raise ValueError(f"Unknown trade pair category {trade_pair}") @@ -722,8 +680,9 @@ def get_websocket_event(self, trade_pair: TradePair) -> PriceSource | None: if __name__ == "__main__": secrets = ValiUtils.get_secrets() tds = TiingoDataService(api_key=secrets['tiingo_apikey'], disable_ws=True) - ps = tds.get_close_rest(TradePair.TAOUSD, target_time_ms=None) + ps = tds.get_close_rest(TradePair.TAOUSD, timestamp_ms=None, live=True) print('@@@@@', ps) + target_timestamp_ms = 1715288502999 time.sleep(10000) for trade_pair in TradePair: if not trade_pair.is_forex: @@ -731,14 +690,13 @@ def get_websocket_event(self, trade_pair: TradePair) -> PriceSource | None: # Get rest data if trade_pair.is_indices: continue - ps = tds.get_close_rest(trade_pair, target_time_ms=None) + ps = tds.get_close_rest(trade_pair, target_timestamp_ms, live=False) if ps: print(f"Got {ps} for {trade_pair}") else: print(f"No data for {trade_pair}") time.sleep(100000) #assert 0 - target_timestamp_ms = 1715288502999 client = TiingoClient({'api_key': secrets['tiingo_apikey']}) crypto_price = client.get_crypto_top_of_book(['BTCUSD']) @@ -746,7 +704,7 @@ def get_websocket_event(self, trade_pair: TradePair) -> PriceSource | None: # forex_price = client.get_(ticker='USDJPY')# startDate='2021-01-01', endDate='2021-01-02', frequency='daily') #tds = TiingoDataService(secrets['tiingo_apikey'], disable_ws=True) - tp_to_prices = tds.get_closes_rest([TradePair.BTCUSD, TradePair.USDJPY, TradePair.NVDA], verbose=True) + tp_to_prices = tds.get_closes_rest([TradePair.BTCUSD, TradePair.USDJPY, TradePair.NVDA], target_timestamp_ms, live=False) assert 0, {x.trade_pair_id: y for x, y in tp_to_prices.items()} diff --git a/meta/meta.json b/meta/meta.json index d7d16e0ec..69fe21da8 100644 --- a/meta/meta.json +++ b/meta/meta.json @@ -1,3 +1,3 @@ { - "subnet_version": "7.1.2" + "subnet_version": "7.1.3" } diff --git a/tests/shared_objects/mock_classes.py b/tests/shared_objects/mock_classes.py index 6d443bf92..a25249d79 100644 --- a/tests/shared_objects/mock_classes.py +++ b/tests/shared_objects/mock_classes.py @@ -60,12 +60,13 @@ def __init__(self, secrets, disable_ws): super().__init__(secrets=secrets, disable_ws=disable_ws) self.polygon_data_service = MockPolygonDataService(api_key=secrets["polygon_apikey"], disable_ws=disable_ws) - def get_sorted_price_sources_for_trade_pair(self, trade_pair, processed_ms): - return [PriceSource(open=1, high=1, close=1, low=1, bid=1, ask=1)] - def get_close_at_date(self, trade_pair, timestamp_ms, order=None, verbose=True): return PriceSource(open=1, high=1, close=1, low=1, bid=1, ask=1) + def get_sorted_price_sources_for_trade_pair(self, trade_pair, time_ms=None, live=True): + return [PriceSource(open=1, high=1, close=1, low=1, bid=1, ask=1)] + + class MockPolygonDataService(PolygonDataService): def __init__(self, api_key, disable_ws=True): super().__init__(api_key, disable_ws=disable_ws) diff --git a/vali_objects/utils/live_price_fetcher.py b/vali_objects/utils/live_price_fetcher.py index 58b581151..268ce9650 100644 --- a/vali_objects/utils/live_price_fetcher.py +++ b/vali_objects/utils/live_price_fetcher.py @@ -41,6 +41,9 @@ def sorted_valid_price_sources(self, price_events: List[PriceSource | None], cur if not valid_events: return None + if not current_time_ms: + current_time_ms = TimeUtil.now_in_millis() + best_event = PriceSource.get_winning_event(valid_events, current_time_ms) if not best_event: return None @@ -50,10 +53,7 @@ def sorted_valid_price_sources(self, price_events: List[PriceSource | None], cur return PriceSource.non_null_events_sorted(valid_events, current_time_ms) - def dual_rest_get( - self, - trade_pairs: List[TradePair] - ) -> Tuple[Dict[TradePair, PriceSource], Dict[TradePair, PriceSource]]: + def dual_rest_get(self, trade_pairs: List[TradePair], time_ms, live) -> Tuple[Dict[TradePair, PriceSource], Dict[TradePair, PriceSource]]: """ Fetch REST closes from both Polygon and Tiingo in parallel, using ThreadPoolExecutor to run both calls concurrently. @@ -62,8 +62,8 @@ def dual_rest_get( tiingo_results = {} with ThreadPoolExecutor(max_workers=2) as executor: # Submit both REST calls to the executor - poly_fut = executor.submit(self.polygon_data_service.get_closes_rest, trade_pairs) - tiingo_fut = executor.submit(self.tiingo_data_service.get_closes_rest, trade_pairs) + poly_fut = executor.submit(self.polygon_data_service.get_closes_rest, trade_pairs, time_ms, live) + tiingo_fut = executor.submit(self.tiingo_data_service.get_closes_rest, trade_pairs, time_ms, live) try: # Wait for both futures to complete with a 10s timeout @@ -87,37 +87,31 @@ def get_latest_price(self, trade_pair: TradePair, time_ms=None) -> Tuple[float, Gets the latest price for a single trade pair by utilizing WebSocket and possibly REST data sources. Tries to get the price as close to time_ms as possible. """ - if not time_ms: - time_ms = TimeUtil.now_in_millis() price_sources = self.get_sorted_price_sources_for_trade_pair(trade_pair, time_ms) winning_event = PriceSource.get_winning_event(price_sources, time_ms) return winning_event.parse_best_best_price_legacy(time_ms), price_sources - def get_sorted_price_sources_for_trade_pair(self, trade_pair: TradePair, time_ms:int) -> List[PriceSource] | None: - temp = self.get_tp_to_sorted_price_sources([trade_pair], {trade_pair: time_ms}) + def get_sorted_price_sources_for_trade_pair(self, trade_pair: TradePair, time_ms: int, live=True) -> List[PriceSource] | None: + temp = self.get_tp_to_sorted_price_sources([trade_pair], time_ms, live) return temp.get(trade_pair) - def get_tp_to_sorted_price_sources(self, trade_pairs: List[TradePair], - trade_pair_to_last_order_time_ms: Dict[TradePair, int] = None) -> Dict[TradePair, List[PriceSource]]: + def get_tp_to_sorted_price_sources(self, trade_pairs: List[TradePair], time_ms: int, live=True) -> Dict[TradePair, List[PriceSource]]: """ Retrieves the latest prices for multiple trade pairs, leveraging both WebSocket and REST APIs as needed. """ - if not trade_pair_to_last_order_time_ms: - current_time_ms = TimeUtil.now_in_millis() - trade_pair_to_last_order_time_ms = {tp: current_time_ms for tp in trade_pairs} - websocket_prices_polygon = self.polygon_data_service.get_closes_websocket(trade_pairs=trade_pairs, - trade_pair_to_last_order_time_ms=trade_pair_to_last_order_time_ms) - websocket_prices_tiingo_data = self.tiingo_data_service.get_closes_websocket(trade_pairs=trade_pairs, - trade_pair_to_last_order_time_ms=trade_pair_to_last_order_time_ms) + if not time_ms: + time_ms = TimeUtil.now_in_millis() + + websocket_prices_polygon = self.polygon_data_service.get_closes_websocket(trade_pairs, time_ms) + websocket_prices_tiingo_data = self.tiingo_data_service.get_closes_websocket(trade_pairs, time_ms) trade_pairs_needing_rest_data = [] results = {} # Initial check using WebSocket data for trade_pair in trade_pairs: - current_time_ms = trade_pair_to_last_order_time_ms[trade_pair] events = [websocket_prices_polygon.get(trade_pair), websocket_prices_tiingo_data.get(trade_pair)] - sources = self.sorted_valid_price_sources(events, current_time_ms, filter_recent_only=True) + sources = self.sorted_valid_price_sources(events, time_ms, filter_recent_only=True) if sources: results[trade_pair] = sources else: @@ -127,16 +121,15 @@ def get_tp_to_sorted_price_sources(self, trade_pairs: List[TradePair], if not trade_pairs_needing_rest_data: return results - rest_prices_polygon, rest_prices_tiingo_data = self.dual_rest_get(trade_pairs_needing_rest_data) + rest_prices_polygon, rest_prices_tiingo_data = self.dual_rest_get(trade_pairs_needing_rest_data, time_ms, live) for trade_pair in trade_pairs_needing_rest_data: - current_time_ms = trade_pair_to_last_order_time_ms[trade_pair] sources = self.sorted_valid_price_sources([ websocket_prices_polygon.get(trade_pair), websocket_prices_tiingo_data.get(trade_pair), rest_prices_polygon.get(trade_pair), rest_prices_tiingo_data.get(trade_pair) - ], current_time_ms, filter_recent_only=False) + ], time_ms, filter_recent_only=False) results[trade_pair] = sources return results @@ -288,12 +281,11 @@ def get_close_at_date(self, trade_pair, timestamp_ms, order=None, verbose=True): f"Fell back to Polygon get_date_minute_fallback for price of {trade_pair.trade_pair} at {TimeUtil.timestamp_ms_to_eastern_time_str(timestamp_ms)}, price_source: {price_source}") if price_source is None: - price_source = self.tiingo_data_service.get_close_rest(trade_pair=trade_pair, target_time_ms=timestamp_ms) + price_source = self.tiingo_data_service.get_close_rest(trade_pair=trade_pair, timestamp_ms=timestamp_ms, live=False) if verbose and price_source is not None: bt.logging.warning( f"Fell back to Tiingo get_date for price of {trade_pair.trade_pair} at {TimeUtil.timestamp_ms_to_eastern_time_str(timestamp_ms)}, ms: {timestamp_ms}") - """ if price is None: price, time_delta = self.polygon_data_service.get_close_in_past_hour_fallback(trade_pair=trade_pair, diff --git a/vali_objects/utils/mdd_checker.py b/vali_objects/utils/mdd_checker.py index 188781850..84280bad0 100644 --- a/vali_objects/utils/mdd_checker.py +++ b/vali_objects/utils/mdd_checker.py @@ -62,7 +62,7 @@ def get_sorted_price_sources(self, hotkey_positions) -> Dict[TradePair, List[Pri required_trade_pairs_for_candles.add(tp) now = TimeUtil.now_in_millis() - trade_pair_to_price_sources = self.live_price_fetcher.get_tp_to_sorted_price_sources(list(required_trade_pairs_for_candles)) + trade_pair_to_price_sources = self.live_price_fetcher.get_tp_to_sorted_price_sources(list(required_trade_pairs_for_candles), now) #bt.logging.info(f"Got candle data for {len(candle_data)} {candle_data}") for tp, sources in trade_pair_to_price_sources.items(): if sources and any(x and not x.websocket for x in sources): diff --git a/vali_objects/utils/position_manager.py b/vali_objects/utils/position_manager.py index 0ecdb445c..be0d43ad7 100644 --- a/vali_objects/utils/position_manager.py +++ b/vali_objects/utils/position_manager.py @@ -764,7 +764,7 @@ def close_open_orders_for_suspended_trade_pairs(self): if position.is_closed_position: continue if position.trade_pair in tps_to_eliminate: - price_sources = self.live_price_fetcher.get_sorted_price_sources_for_trade_pair(position.trade_pair, TARGET_MS) + price_sources = self.live_price_fetcher.get_sorted_price_sources_for_trade_pair(position.trade_pair, time_ms=TARGET_MS, live=False) live_price = price_sources[0].parse_appropriate_price(TARGET_MS, position.trade_pair.is_forex, OrderType.FLAT, position) flat_order = Order(price=live_price, price_sources=price_sources, diff --git a/vali_objects/utils/price_slippage_model.py b/vali_objects/utils/price_slippage_model.py index b798df31a..9b6d92d81 100644 --- a/vali_objects/utils/price_slippage_model.py +++ b/vali_objects/utils/price_slippage_model.py @@ -218,7 +218,7 @@ def get_features(cls, trade_pairs: list[TradePair], processed_ms: int, adv_lookb try: bars_df = cls.get_bars_with_features(trade_pair, processed_ms, adv_lookback_window, calc_vol_window) row_selected = bars_df.iloc[-1] - annualized_volatility = row_selected['annualized_vol'] + annualized_volatility = row_selected['annualized_vol'] # recalculate slippage false avg_daily_volume = row_selected[f'adv_last_{adv_lookback_window}_days'] tp_to_vol[trade_pair.trade_pair_id] = annualized_volatility @@ -305,12 +305,13 @@ def update_historical_slippage(self, positions_at_t_f): break bt.logging.info(f"updating order attributes {o}") + bid = o.bid ask = o.ask if self.fetch_slippage_data: - price_sources = self.live_price_fetcher.get_sorted_price_sources_for_trade_pair(trade_pair=o.trade_pair, time_ms=o.processed_ms) + price_sources = self.live_price_fetcher.get_sorted_price_sources_for_trade_pair(trade_pair=o.trade_pair, time_ms=o.processed_ms, live=False) if not price_sources: raise ValueError( f"Ignoring order for [{hk}] due to no live prices being found for trade_pair [{o.trade_pair}]. Please try again.") diff --git a/vali_objects/vali_dataclasses/price_source.py b/vali_objects/vali_dataclasses/price_source.py index 4a8267111..90d4dfcfc 100644 --- a/vali_objects/vali_dataclasses/price_source.py +++ b/vali_objects/vali_dataclasses/price_source.py @@ -4,6 +4,7 @@ import bittensor as bt from typing import Optional from pydantic import BaseModel +from time_util.time_util import TimeUtil from vali_objects.enums.order_type_enum import OrderType @@ -55,7 +56,9 @@ def end_ms(self): def get_start_time_ms(self): return self.start_ms - def time_delta_from_now_ms(self, now_ms: int) -> int: + def time_delta_from_now_ms(self, now_ms:int = None) -> int: + if not now_ms: + now_ms = TimeUtil.now_in_millis() if self.websocket: return abs(now_ms - self.start_ms) else: @@ -63,6 +66,8 @@ def time_delta_from_now_ms(self, now_ms: int) -> int: abs(now_ms - self.end_ms)) def parse_best_best_price_legacy(self, now_ms: int): + if not now_ms: + now_ms = TimeUtil.now_in_millis() if self.websocket: return self.open else: diff --git a/vali_objects/vali_dataclasses/recent_event_tracker.py b/vali_objects/vali_dataclasses/recent_event_tracker.py index f4d3155e0..d0be0882f 100644 --- a/vali_objects/vali_dataclasses/recent_event_tracker.py +++ b/vali_objects/vali_dataclasses/recent_event_tracker.py @@ -80,7 +80,11 @@ def get_closest_event(self, timestamp_ms): #print(f"Looking for event at {TimeUtil.millis_to_formatted_date_str(timestamp_ms)}") if self.count_events() == 0: return None + # Find the event closest to the given timestamp + if not timestamp_ms: + timestamp_ms = TimeUtil.now_in_millis() + idx = self.events.bisect_left((timestamp_ms,)) if idx == 0: return self.events[0][1]