Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 73 additions & 65 deletions SmartApi/smartWebSocketV2.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
"""
Created on Monday Jan 31 2022

@author: Nishant Jain

:copyright: (c) 2022 by Angel One Limited
"""
from __future__ import print_function

import struct
# import threading
# import time
import ssl
import json

Expand All @@ -21,13 +12,16 @@ class SmartWebSocketV2(object):
SmartAPI Web Socket version 2
"""

ROOT_URI = "ws://smartapisocket.angelone.in/smart-stream"
ROOT_URI = "ws://mds.angelone.in/smart-stream"
# ROOT_URI = "ws://smartapisocket.angelone.in/smart-stream"
HEART_BEAT_MESSAGE = "ping"
HEAR_BEAT_INTERVAL = 30
HEAR_BEAT_INTERVAL = 10
HEART_BEAT_TIMEOUT = 3
LITTLE_ENDIAN_BYTE_ORDER = "<"
RESUBSCRIBE_FLAG = False
# HB_THREAD_FLAG = True
MAX_RETRY_ATTEMPT = 1
CLOSE_CONNECTION = False
MAX_ACTIVE_CONNECTIONS = 1000

# Available Actions
SUBSCRIBE_ACTION = 1
Expand All @@ -54,46 +48,56 @@ class SmartWebSocketV2(object):
3: "SNAP_QUOTE"
}

EXCHANGE_VALIDATOR = {1,2,3,4,5,7,13}

wsapp = None
input_request_dict = {}
current_retry_attempt = 0

def __init__(self, auth_token, api_key, client_code, feed_token):
ERROR_MAPPING = {
'E1001' : "Invalid Request Payload.",
'E1002' : "Invalid Request. Subscription Limit Exceeded."
}

def requestValidator(self,request_data):
mode = request_data["params"]['mode']
tokenLists = request_data['params']['tokenList']
if mode not in range(1,4):
print("mode can be either 1,2, or 3.")
return False
for tokenList in tokenLists:
if tokenList['exchangeType'] not in self.EXCHANGE_VALIDATOR:
print("Incorrect exchange.")
return False
tokens = tokenList['tokens']
for token in tokens:
if token is None:
print("Token cannot be empty.")
return False



def __init__(self, client_code, feed_token):
"""
Initialise the SmartWebSocketV2 instance

Parameters
------
auth_token: string
jwt auth token received from Login API
api_key: string
api key from Smart API account
client_code: string
angel one account id
feed_token: string
feed token received from Login API
"""
self.auth_token = auth_token
self.api_key = api_key
self.client_code = client_code
self.feed_token = feed_token
self.no_of_active_connections = 0

if not self._sanity_check():
raise Exception("Provide valid value for all the tokens")

def _sanity_check(self):
if self.feed_token is None:
return False
return True
# if self.auth_token is None or self.api_key is None or self.client_code is None or self.feed_token is None:
# return False
# return True

# def _on_message(self, wsapp, message):
# print("message--->", message)
# if message != "pong":
# parsed_message = self._parse_binary_data(message)
# self.on_message(wsapp, parsed_message)
# else:
# self.on_message(wsapp, message)

def _on_data(self, wsapp, data, data_type, continue_flag):

Expand All @@ -104,10 +108,6 @@ def _on_data(self, wsapp, data, data_type, continue_flag):
self.on_data(wsapp, data)

def _on_open(self, wsapp):
# self.HB_THREAD_FLAG = True
# thread = threading.Thread(target=self.run, args=())
# thread.daemon = True
# thread.start()

if self.RESUBSCRIBE_FLAG:
self.resubscribe()
Expand All @@ -124,7 +124,6 @@ def _on_ping(self, wsapp, data):
def subscribe(self, correlation_id, mode, token_list):
"""
This Function subscribe the price data for the given token

Parameters
------
correlation_id: string
Expand Down Expand Up @@ -164,14 +163,27 @@ def subscribe(self, correlation_id, mode, token_list):
}
}


if self.requestValidator(request_data) == False:
print(self.ERROR_MAPPING['E1001'])





if self.input_request_dict.get(mode, None) is None:
self.input_request_dict[mode] = {}

for token in token_list:
if token['exchangeType'] in self.input_request_dict[mode]:
self.input_request_dict[mode][token['exchangeType']].extend(token["tokens"])
self.no_of_active_connections+=len(token["tokens"])
else:
self.input_request_dict[mode][token['exchangeType']] = token["tokens"]
self.no_of_active_connections+=len(token["tokens"])

if self.no_of_active_connections > self.MAX_ACTIVE_CONNECTIONS:
print(self.ERROR_MAPPING['E1002'])

self.wsapp.send(json.dumps(request_data))
self.RESUBSCRIBE_FLAG = True
Expand All @@ -181,7 +193,6 @@ def subscribe(self, correlation_id, mode, token_list):
def unsubscribe(self, correlation_id, mode, token_list):
"""
This function unsubscribe the data for given token

Parameters
------
correlation_id: string
Expand Down Expand Up @@ -220,8 +231,20 @@ def unsubscribe(self, correlation_id, mode, token_list):
"tokenList": token_list
}
}

self.input_request_dict.update(request_data)
mode = request_data["params"]['mode']
token_list = request_data["params"]["tokenList"]
print(self.input_request_dict)
for tokenList in token_list:
exchangeType = tokenList['exchangeType']
tokens = tokenList['tokens']
i=0
while i<len(tokens):
if tokens[i] not in self.input_request_dict[mode][exchangeType]:
tokens.remove(tokens[i])
else:
i+=1

self.no_of_active_connections = self.no_of_active_connections - len(tokens)
self.input_request_dict.update(request_data)
self.wsapp.send(json.dumps(request_data))
self.RESUBSCRIBE_FLAG = True
Expand Down Expand Up @@ -249,57 +272,44 @@ def resubscribe(self):
except Exception as e:
raise e


def connect(self):
"""
Make the web socket connection with the server
"""
try:
headers = {
"Authorization": self.auth_token,
"x-api-key": self.api_key,
"x-client-code": self.client_code,
"x-feed-token": self.feed_token
}
self.wsapp = websocket.WebSocketApp(self.ROOT_URI, header=headers, on_open=self._on_open,
on_error=self._on_error, on_close=self._on_close, on_data=self._on_data,
on_ping=self._on_ping, on_pong=self._on_pong)
self.wsapp.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, ping_interval=self.HEAR_BEAT_INTERVAL,
ping_payload=self.HEART_BEAT_MESSAGE)
ping_payload=self.HEART_BEAT_MESSAGE,ping_timeout=self.HEART_BEAT_TIMEOUT)

except Exception as e:
raise e

def close_connection(self):
"""
Closes the connection
"""
print("No of Active Connections are: ",self.no_of_active_connections)
self.CLOSE_CONNECTION = True
self.RESUBSCRIBE_FLAG = False
# self.HB_THREAD_FLAG = False
self.wsapp.close()

# def run(self):
# while True:
# if not self.HB_THREAD_FLAG:
# break
# self.send_heart_beat()
# time.sleep(self.HEAR_BEAT_INTERVAL)

def send_heart_beat(self):
try:
self.wsapp.send(self.HEART_BEAT_MESSAGE)
except Exception as e:
raise e

def _on_error(self, wsapp, error):
self.HB_THREAD_FLAG = False
self.RESUBSCRIBE_FLAG = True
if self.current_retry_attempt < self.MAX_RETRY_ATTEMPT:
print(error)
if self.current_retry_attempt < self.MAX_RETRY_ATTEMPT and not self.CLOSE_CONNECTION:
print("Attempting to resubscribe/reconnect...")
self.current_retry_attempt += 1
self.connect()

def _on_close(self, wsapp):
# self.HB_THREAD_FLAG = False
# print(self.wsapp.close_frame)
self.on_close(wsapp)

def _parse_binary_data(self, binary_data):
Expand Down Expand Up @@ -354,10 +364,11 @@ def _unpack_data(self, binary_data, start, end, byte_format="I"):
def _parse_token_value(binary_packet):
token = ""
for i in range(len(binary_packet)):
if binary_packet[i] == b'\x00':
if chr(binary_packet[i]) == '\x00':
return token
token += binary_packet[i].encode("UTF-8")
token += chr(binary_packet[i])
return token


def _parse_best_5_buy_and_sell_data(self, binary_data):

Expand Down Expand Up @@ -393,9 +404,6 @@ def split_packets(binary_packets):
"best_5_sell_data": best_5_sell_data
}

# def on_message(self, wsapp, message):
# print(message)

def on_data(self, wsapp, data):
pass

Expand All @@ -406,4 +414,4 @@ def on_open(self, wsapp):
pass

def on_error(self):
pass
pass