diff --git a/include/websocket.hpp b/include/websocket.hpp index 1ab9cd4..e518698 100644 --- a/include/websocket.hpp +++ b/include/websocket.hpp @@ -38,7 +38,9 @@ struct RequestContent { Callback on_unsubscribe; // params to be passed with the subscription string json params; - bool subscribed = false; + // using promise to ensure this + std::promise subscribed_pr; + std::future subscription_future = subscribed_pr.get_future(); RequestIdType ws_id; RequestContent() = default; @@ -77,7 +79,7 @@ class session : public std::enable_shared_from_this { /// @brief push a function for subscription /// @param req the request to call - void subscribe(const RequestContent &req); + void subscribe(RequestContent *req); /// @brief push for unsubscription /// @param id the id to unsubscribe on @@ -147,7 +149,7 @@ class session : public std::enable_shared_from_this { std::atomic_bool is_connected; // map of subscription id with callback - std::unordered_map callback_map; + std::unordered_map callback_map; std::unordered_map maps_wsid_to_id; std::shared_mutex mutex_for_maps; diff --git a/lib/solana.cpp b/lib/solana.cpp index 835995a..312e042 100644 --- a/lib/solana.cpp +++ b/lib/solana.cpp @@ -1119,9 +1119,9 @@ int WebSocketSubscriber::onAccountChange(const solana::PublicKey &pub_key, json param = {pub_key, {{"encoding", "base64"}, {"commitment", commitment}}}; // create a new request content - RequestContent req(curr_id, "accountSubscribe", "accountUnsubscribe", - account_change_callback, std::move(param), on_subscibe, - on_unsubscribe); + RequestContent *req = new RequestContent( + curr_id, "accountSubscribe", "accountUnsubscribe", + account_change_callback, std::move(param), on_subscibe, on_unsubscribe); // subscribe the new request content sess->subscribe(req); @@ -1129,7 +1129,7 @@ int WebSocketSubscriber::onAccountChange(const solana::PublicKey &pub_key, // increase the curr_id so that it can be used for the next request content curr_id += 2; - return req.id; + return req->id; } /// @brief remove the account change listener for the given id diff --git a/lib/websocket.cpp b/lib/websocket.cpp index a42f5da..e7414a1 100644 --- a/lib/websocket.cpp +++ b/lib/websocket.cpp @@ -18,9 +18,9 @@ RequestContent::RequestContent(RequestIdType id, std::string subscribe_method, /// @return the json that can be used to make subscription json RequestContent::get_subscription_request() const { json req = {{"jsonrpc", "2.0"}, - {"id", id}, - {"method", subscribe_method}, - {"params", params}}; + {"id", this->id}, + {"method", this->subscribe_method}, + {"params", this->params}}; return req; } @@ -31,8 +31,8 @@ json RequestContent::get_unsubscription_request( RequestIdType subscription_id) const { json params = {subscription_id}; json req = {{"jsonrpc", "2.0"}, - {"id", id + 1}, - {"method", unsubscribe_method}, + {"id", this->id + 1}, + {"method", this->unsubscribe_method}, {"params", params}}; return req; } @@ -63,14 +63,14 @@ void session::run(std::string host, std::string port) { /// @brief push a function for subscription /// @param req the request to call -void session::subscribe(const RequestContent &req) { +void session::subscribe(RequestContent *req) { // context for unique_lock { std::unique_lock lk(mutex_for_maps); - callback_map[req.id] = req; + callback_map[req->id] = req; } // get subscription request and then send it to the websocket - ws.write(net::buffer(req.get_subscription_request().dump())); + ws.write(net::buffer(req->get_subscription_request().dump())); } /// @brief push for unsubscription @@ -78,15 +78,33 @@ void session::subscribe(const RequestContent &req) { void session::unsubscribe(RequestIdType id) { // context for shared lock std::string unsubsciption_request = ""; + std::unordered_map::iterator ite; { std::shared_lock lk(mutex_for_maps); - auto ite = callback_map.find(id); - if (ite == callback_map.end()) return; - unsubsciption_request = - ite->second.get_unsubscription_request(ite->second.ws_id).dump(); - maps_wsid_to_id.erase(ite->second.ws_id); + ite = callback_map.find(id); + } + if (ite == callback_map.end()) return; + + // wait for subscription to happen + bool subscription_successful = ite->second->subscription_future.get(); + + // if subscription wasn't successful then just remove the id from callback + if (!subscription_successful) { + { + std::unique_lock lk(mutex_for_maps); + const auto ite = callback_map.find(id); + if (ite != callback_map.end()) { + callback_map.erase(ite); + delete ite->second; + } + } + return; } + + unsubsciption_request = + ite->second->get_unsubscription_request(ite->second->ws_id).dump(); + maps_wsid_to_id.erase(ite->second->ws_id); if (!unsubsciption_request.empty()) { // write it to the websocket ws.write(net::buffer(unsubsciption_request)); @@ -197,10 +215,10 @@ void session::on_read(beast::error_code ec, std::size_t bytes_transferred) { // get the data from the websocket and parse it to json auto res = buffer.data(); json data = json::parse(net::buffers_begin(res), net::buffers_end(res)); - // if data contains field result then it's either subscription or // unsubscription response static const char *result = "result"; + static const char *error = "error"; try { if (data.contains(std::string{result})) { RequestIdType id = data["id"]; @@ -215,8 +233,9 @@ void session::on_read(beast::error_code ec, std::size_t bytes_transferred) { std::unique_lock lk(mutex_for_maps); const auto ite = callback_map.find(id); if (ite != callback_map.end()) { - on_unsubscribe = ite->second.on_unsubscribe; + on_unsubscribe = ite->second->on_unsubscribe; callback_map.erase(ite); + delete ite->second; } } if (on_unsubscribe) { @@ -231,10 +250,10 @@ void session::on_read(beast::error_code ec, std::size_t bytes_transferred) { const auto ite = callback_map.find(id); if (ite != callback_map.end()) { - on_subscribe = ite->second.on_subscribe; - ite->second.subscribed = true; - ite->second.ws_id = data[result]; - maps_wsid_to_id[ite->second.ws_id] = id; + on_subscribe = ite->second->on_subscribe; + ite->second->subscribed_pr.set_value(true); + ite->second->ws_id = data[result]; + maps_wsid_to_id[ite->second->ws_id] = id; } } if (on_subscribe) { @@ -242,6 +261,26 @@ void session::on_read(beast::error_code ec, std::size_t bytes_transferred) { } } } + // In case of erro + else if (data.contains(std::string{error})) { + RequestIdType id = data["id"]; + json er_mess = data["error"]; + // if id is even then error in subscribing else error in unsubscribing + if (id % 2 == 0) { + std::cout << "Some error happened while subscribing" << std::endl; + { + std::unique_lock lk(mutex_for_maps); + + const auto ite = callback_map.find(id); + if (ite != callback_map.end()) { + ite->second->subscribed_pr.set_value(false); + } + } + } else { + std::cout << "Some error happened while unsubscribing" << std::endl; + } + std::cout << er_mess << std::endl; + } // it's a notification process it sccordingly else { call_callback(data); @@ -275,7 +314,7 @@ Callback session::get_callback(RequestIdType request_id) { << std::endl; return nullptr; } - return sub_ite->second.cb; + return sub_ite->second->cb; } /// @brief call the specified callback @@ -307,4 +346,4 @@ void session::on_close(beast::error_code ec) { // If we get here then the connection is closed gracefully return; -} +} \ No newline at end of file