diff --git a/SmartApi/smartWebSocketV2.py b/SmartApi/smartWebSocketV2.py index 08a0fb65..7d8e4f2e 100644 --- a/SmartApi/smartWebSocketV2.py +++ b/SmartApi/smartWebSocketV2.py @@ -1,11 +1,10 @@ +import json +import logging +import ssl import struct -import threading import time -import ssl -import json + import websocket -from datetime import datetime, timedelta -from threading import Timer class SmartWebSocketV2(object): @@ -82,28 +81,23 @@ def _sanity_check(self): # return False # return True - def _on_message(self, wsapp, message): - print("message--->", message) - if message != "pong": - parsed_message = self._parse_binary_data(message) - self.on_message(wsapp, parsed_message) - else: - self.on_message(wsapp, message) - def _on_data(self, wsapp, data, data_type, continue_flag): if data_type == 2: parsed_message = self._parse_binary_data(data) self.on_data(wsapp, parsed_message) - else: - self.on_data(wsapp, data) + # else: + # self.on_data(wsapp, data) def _on_open(self, wsapp): if self.RESUBSCRIBE_FLAG: - self.resubscribe() - self.RESUBSCRIBE_FLAG = False # Add this line to prevent resubscription on subsequent reconnects - else: - self.on_open(wsapp) + try: + self.resubscribe() + self.RESUBSCRIBE_FLAG = False # Add this line to prevent resubscription on subsequent reconnects + except Exception as e: + logging.exception("exception while resubscribing") + self.on_error(wsapp, e) + self.on_open(wsapp) def _on_pong(self, wsapp, data): if data == self.HEART_BEAT_MESSAGE: @@ -111,9 +105,9 @@ def _on_pong(self, wsapp, data): formatted_timestamp = time.strftime("%d-%m-%y %H:%M:%S", time.localtime(timestamp)) print(f"In on pong function ==> {data}, Timestamp: {formatted_timestamp}") self.last_pong_timestamp = timestamp - else: - # Handle the received feed data here - self.on_data(wsapp, data) + # else: + # Handle the received feed data here + # self.on_data(wsapp, data) def _on_ping(self, wsapp, data): timestamp = time.time() @@ -123,28 +117,11 @@ def _on_ping(self, wsapp, data): def check_connection_status(self): current_time = time.time() - if self.last_pong_timestamp is not None and current_time - self.last_pong_timestamp > 2*self.HEART_BEAT_MESSAGE: + if self.last_pong_timestamp is not None and current_time - self.last_pong_timestamp > 2 * self.HEART_BEAT_MESSAGE: # Stale connection detected, take appropriate action self.close_connection() self.connect() - def start_ping_timer(self): - def send_ping(): - try: - current_time = datetime.now() - if self.last_pong_timestamp is None or self.last_pong_timestamp < current_time - timedelta(self.HEART_BEAT_MESSAGE): - # print("stale connection detected") - # self.wsapp.close() - self.connect() - else: - self.last_ping_timestamp = time.time() - except Exception as e: - self.wsapp.close() - self.resubscribe() - - ping_timer = Timer(5, send_ping) - ping_timer.start() - def subscribe(self, correlation_id, mode, token_list): """ This Function subscribe the price data for the given token @@ -186,8 +163,12 @@ def subscribe(self, correlation_id, mode, token_list): "tokenList": token_list } } + if mode == 4: + for token in token_list: + if token.get('exchangeType') != 1: + raise ValueError("Invalid ExchangeType: Please check the exchange type and try again") - if self.input_request_dict.get(mode) is None: + if self.input_request_dict.get(mode, None) is None: self.input_request_dict[mode] = {} for token in token_list: @@ -242,33 +223,19 @@ def unsubscribe(self, correlation_id, mode, token_list): tokens: list of string """ try: - total_tokens = sum(len(token["tokens"]) for token in token_list) - quota_limit = 50 - if total_tokens > quota_limit: - raise Exception("Quota exceeded: You can subscribe to a maximum of {} tokens.".format(quota_limit)) - else: - request_data = { - "correlationID": correlation_id, - "action": self.SUBSCRIBE_ACTION, - "params": { - "mode": mode, - "tokenList": token_list - } + request_data = { + "correlationID": correlation_id, + "action": self.UNSUBSCRIBE_ACTION, + "params": { + "mode": mode, + "tokenList": token_list } + } - if self.input_request_dict.get(mode, None) is None: - self.input_request_dict[mode] = {} - - for token in token_list: - if token['exchangeType'] in self.input_request_dict[mode]: - self.input_request_dict[mode][token['exchangeType']].extend(token["tokens"]) - else: - self.input_request_dict[mode][token['exchangeType']] = token["tokens"] - self.wsapp.send(json.dumps(request_data)) - self.RESUBSCRIBE_FLAG = True - + self.input_request_dict.update(request_data) + self.wsapp.send(json.dumps(request_data)) + self.RESUBSCRIBE_FLAG = True except Exception as e: - print("Error:", e) raise e def resubscribe(self): @@ -310,7 +277,6 @@ def connect(self): on_pong=self._on_pong) self.wsapp.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, ping_interval=self.HEART_BEAT_INTERVAL, ping_payload=self.HEART_BEAT_MESSAGE) - # self.start_ping_timer() except Exception as e: raise e @@ -318,43 +284,35 @@ def close_connection(self): """ Closes the connection """ - self.RESUBSCRIBE_FLAG = False + # self.RESUBSCRIBE_FLAG = False self.DISCONNECT_FLAG = True # self.HB_THREAD_FLAG = False if self.wsapp: self.wsapp.close() - # def run(self): - # while True: - # if not self.HB_THREAD_FLAG: - # break - # self.send_heart_beat() - # time.sleep(self.HEAR_BEAT_INTERVAL) - - def send_heart_beat(self): - try: - self.wsapp.send(self.HEART_BEAT_MESSAGE) - except Exception as e: - raise e - def _on_error(self, wsapp, error): # self.HB_THREAD_FLAG = False + self.on_error(wsapp, error) self.RESUBSCRIBE_FLAG = True if self.current_retry_attempt < self.MAX_RETRY_ATTEMPT: - print("Attempting to resubscribe/reconnect...") self.current_retry_attempt += 1 + sleep_seconds = self.current_retry_attempt * 10 + logging.info("Attempting to resubscribe/reconnect... sleeping for %d ", sleep_seconds) + time.sleep(sleep_seconds) + logging.info("coming back from sleep") try: self.close_connection() self.connect() except Exception as e: - print("Error occurred during resubscribe/reconnect:", str(e)) + logging.exception("Error occurred during resubscribe/reconnect") + else: self.close_connection() - def _on_close(self, wsapp): + def _on_close(self, wsapp, close_status_code, close_msg): # self.HB_THREAD_FLAG = False # print(self.wsapp.close_frame) - self.on_close(wsapp) + self.on_close(wsapp,close_status_code,close_msg) def _parse_binary_data(self, binary_data): parsed_data = { @@ -382,7 +340,8 @@ def _parse_binary_data(self, binary_data): if parsed_data["subscription_mode"] == self.SNAP_QUOTE: parsed_data["last_traded_timestamp"] = self._unpack_data(binary_data, 123, 131, byte_format="q")[0] parsed_data["open_interest"] = self._unpack_data(binary_data, 131, 139, byte_format="q")[0] - parsed_data["open_interest_change_percentage"] = self._unpack_data(binary_data, 139, 147, byte_format="q")[0] + parsed_data["open_interest_change_percentage"] = \ + self._unpack_data(binary_data, 139, 147, byte_format="q")[0] parsed_data["upper_circuit_limit"] = self._unpack_data(binary_data, 347, 355, byte_format="q")[0] parsed_data["lower_circuit_limit"] = self._unpack_data(binary_data, 355, 363, byte_format="q")[0] parsed_data["52_week_high_price"] = self._unpack_data(binary_data, 363, 371, byte_format="q")[0] @@ -395,7 +354,7 @@ def _parse_binary_data(self, binary_data): parsed_data.pop("sequence_number", None) parsed_data.pop("last_traded_price", None) parsed_data.pop("subscription_mode_val", None) - parsed_data["packet_received_time"]=self._unpack_data(binary_data, 35, 43, byte_format="q")[0] + parsed_data["packet_received_time"] = self._unpack_data(binary_data, 35, 43, byte_format="q")[0] depth_data_start_index = 43 depth_20_data = self._parse_depth_20_buy_and_sell_data(binary_data[depth_data_start_index:]) parsed_data["depth_20_buy_data"] = depth_20_data["depth_20_buy_data"] @@ -467,14 +426,16 @@ def _parse_depth_20_buy_and_sell_data(self, binary_data): buy_packet_data = { "quantity": self._unpack_data(binary_data, buy_start_idx, buy_start_idx + 4, byte_format="i")[0], "price": self._unpack_data(binary_data, buy_start_idx + 4, buy_start_idx + 8, byte_format="i")[0], - "num_of_orders": self._unpack_data(binary_data, buy_start_idx + 8, buy_start_idx + 10, byte_format="h")[0], + "num_of_orders": self._unpack_data(binary_data, buy_start_idx + 8, buy_start_idx + 10, byte_format="h")[ + 0], } # Parse sell data sell_packet_data = { "quantity": self._unpack_data(binary_data, sell_start_idx, sell_start_idx + 4, byte_format="i")[0], "price": self._unpack_data(binary_data, sell_start_idx + 4, sell_start_idx + 8, byte_format="i")[0], - "num_of_orders": self._unpack_data(binary_data, sell_start_idx + 8, sell_start_idx + 10, byte_format="h")[0], + "num_of_orders": + self._unpack_data(binary_data, sell_start_idx + 8, sell_start_idx + 10, byte_format="h")[0], } depth_20_buy_data.append(buy_packet_data) @@ -491,11 +452,11 @@ def _parse_depth_20_buy_and_sell_data(self, binary_data): def on_data(self, wsapp, data): pass - def on_close(self, wsapp): + def on_close(self, wsapp,close_status_code,close_msg): pass def on_open(self, wsapp): pass - def on_error(self): + def on_error(self, wsapp, error): pass