From 0fefa4c9bb67c4edcdf0c7d0a7c3baf4abe4d0e4 Mon Sep 17 00:00:00 2001 From: agosh01 Date: Thu, 15 Aug 2024 12:00:07 -0400 Subject: [PATCH 1/4] Adding subscriptions client header --- .../up-cpp/client/usubscription/v3/Consumer.h | 217 ++++++++++++++++++ 1 file changed, 217 insertions(+) create mode 100644 include/up-cpp/client/usubscription/v3/Consumer.h diff --git a/include/up-cpp/client/usubscription/v3/Consumer.h b/include/up-cpp/client/usubscription/v3/Consumer.h new file mode 100644 index 000000000..8f4964d63 --- /dev/null +++ b/include/up-cpp/client/usubscription/v3/Consumer.h @@ -0,0 +1,217 @@ +// SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + +#ifndef UP_CPP_CLIENT_Consumer_H +#define UP_CPP_CLIENT_Consumer_H + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace uprotocol::client::usubscription::v3 { +using namespace uprotocol::core::usubscription::v3; +using namespace uprotocol::utils; + +/** + * @struct ConsumerOptions + * @brief Additional details for uSubscription service. + * + * Each member represents an optional parameter for the uSubscription service. + */ +struct ConsumerOptions { + /// Permission level of the subscription request + std::optional permission_level; + /// TAP token for access. + std::optional token; + /// Expiration time of the subscription. + std::optional when_expire; + /// Sample period for the subscription messages in milliseconds. + std::optional sample_period_ms; + /// Details of the subscriber. + std::optional subscriber_details; + /// Details of the subscription. + std::optional subscription_details; +}; + + +/// @struct uSubscriptionUUriBuilder +/// @brief Structure to build uSubscription request URIs. +/// +/// This structure is used to build URIs for uSubscription service. It uses the +/// service options from uSubscription proto to set the authority name, ue_id, ue_version_major, and +/// the notification topic resource ID in the URI. +struct uSubscriptionUUriBuilder { +private: + /// URI for the uSubscription service + v1::UUri uri_; + /// Resource ID of the notification topic + uint32_t sink_resource_id_; + +public: + /// @brief Constructor for uSubscriptionUUriBuilder. + uSubscriptionUUriBuilder() { + // Get the service descriptor + const google::protobuf::ServiceDescriptor* service = + uSubscription::descriptor(); + const auto& service_options = service->options(); + + // Get the service options + const auto& service_name = + service_options.GetExtension(uprotocol::service_name); + const auto& service_version_major = + service_options.GetExtension(uprotocol::service_version_major); + const auto& service_id = + service_options.GetExtension(uprotocol::service_id); + const auto& notification_topic = + service_options.GetExtension(uprotocol::notification_topic, 0); + + // Set the values in the URI + uri_.set_authority_name(service_name); + uri_.set_ue_id(service_id); + uri_.set_ue_version_major(service_version_major); + sink_resource_id_ = notification_topic.id(); + } + + /// @brief Get the URI with a specific resource ID. + /// + /// @param resource_id The resource ID to set in the URI. + /// + /// @return The URI with the specified resource ID. + v1::UUri getServiceUriWithResourceId(uint32_t resource_id) const { + v1::UUri uri = uri_; // Copy the base URI + uri.set_resource_id(resource_id); + return uri; + } + + /// @brief Get the notification URI. + /// + /// @return The notification URI. + v1::UUri getNotificationUri() const { + v1::UUri uri = uri_; // Copy the base URI + uri.set_resource_id(sink_resource_id_); + return uri; + } +}; + +/// @brief Interface for uEntities to create subscriptions. +/// +/// Like all L3 client APIs, the Consumer is a wrapper on top of the +/// L2 Communication APIs and USubscription service. +struct Consumer { + using ConsumerOrStatus = + utils::Expected, v1::UStatus>; + using ListenCallback = transport::UTransport::ListenCallback; + using ListenHandle = transport::UTransport::ListenHandle; + using SubscriptionResponse = core::usubscription::v3::SubscriptionResponse; + + /// @brief Create a subscription + /// + /// @param transport Transport to register with. + /// @param subscription_topic Topic to subscribe to. + /// @param callback Function that is called when publish message is received. + /// @param priority Priority of the subscription request. + /// @param subscribe_request_ttl Time to live for the subscription request. + /// @param consumer_options Additional details for uSubscription service. + [[nodiscard]] static ConsumerOrStatus create( + std::shared_ptr transport, + const v1::UUri subscription_topic, ListenCallback&& callback, + v1::UPriority priority, + std::chrono::milliseconds subscription_request_ttl, + ConsumerOptions consumer_options); + + /// @brief Unsubscribe from the topic and call uSubscription service to close the subscription. + /// + /// @param priority Priority of the unsubscribe request. + /// @param request_ttl Time to live for the unsubscribe request. + void unsubscribe(v1::UPriority priority, + std::chrono::milliseconds request_ttl); + + /// @brief getter for subscription update + /// + /// @return subscription update + Update getSubscriptionUpdate() const { return subscription_update_; } + + /// @brief Destructor + ~Consumer() = default; + + /// This section for test code only delete later + +protected: + /// @brief Constructor + /// + /// @param transport Transport to register with. + /// @param subscriber_details Additional details about the subscriber. + Consumer(std::shared_ptr transport, + const v1::UUri subscription_topic, + ConsumerOptions consumer_options = {}); + +private: + // Transport + std::shared_ptr transport_; + + // Topic to subscribe to + const v1::UUri subscription_topic_; + // Additional details about uSubscription service + ConsumerOptions consumer_options_; + + // URI info about the uSubscription service + uSubscriptionUUriBuilder uSubscriptionUUriBuilder_; + + // Subscription updates + std::unique_ptr noficationSinkHandle_; + Update subscription_update_; + + // RPC request + std::unique_ptr rpc_client_; + communication::RpcClient::InvokeHandle rpc_handle_; + SubscriptionResponse subscription_response_; + + // L2 Subscriber details + std::unique_ptr subscriber_; + + // Allow the protected constructor for this class to be used in make_unique + // inside of create() + friend std::unique_ptr + std::make_unique, + const uprotocol::v1::UUri, + uprotocol::client::usubscription::v3::ConsumerOptions>( + std::shared_ptr&&, + const uprotocol::v1::UUri&&, + uprotocol::client::usubscription::v3::ConsumerOptions&&); + + /// @brief Build SubscriptionRequest for subscription request + SubscriptionRequest buildSubscriptionRequest(); + + /// @brief Build UnsubscriptionRequest for unsubscription request + UnsubscribeRequest buildUnsubscriptionRequest(); + + /// @brief Create a notification sink to receive subscription updates + v1::UStatus createNotificationSink(); + + /// @brief Subscribe to the topic + /// + /// @param topic Topic to subscribe to. + /// @param subscription_request_ttl Time to live for the subscription request. + /// @param callback Function that is called when a published message is received. + v1::UStatus subscribe(v1::UPriority priority, + std::chrono::milliseconds subscription_request_ttl, + ListenCallback&& callback); +}; + +} // namespace uprotocol::client::usubscription::v3 + +#endif // UP_CPP_CLIENT_Consumer_H \ No newline at end of file From b0b456fa9b2c7576d7683de56f6af210ecb0394a Mon Sep 17 00:00:00 2001 From: agosh01 Date: Mon, 19 Aug 2024 14:18:35 -0400 Subject: [PATCH 2/4] Adding utils to build request inputs --- include/up-cpp/utils/ProtoConverter.h | 60 +++++++++++++++++++ src/utils/ProtoConverter.cpp | 86 +++++++++++++++++++++++++++ 2 files changed, 146 insertions(+) create mode 100644 include/up-cpp/utils/ProtoConverter.h create mode 100644 src/utils/ProtoConverter.cpp diff --git a/include/up-cpp/utils/ProtoConverter.h b/include/up-cpp/utils/ProtoConverter.h new file mode 100644 index 000000000..f08bfe56f --- /dev/null +++ b/include/up-cpp/utils/ProtoConverter.h @@ -0,0 +1,60 @@ +#ifndef PROTO_CONVERTER_H +#define PROTO_CONVERTER_H + +#include +#include + +#include +#include + +namespace uprotocol::utils { +using namespace uprotocol::core::usubscription::v3; + +struct ProtoConverter { + /// @brief Converts std::chrono::time_point to google::protobuf::Timestamp + /// + /// @param tp the time point to convert + /// @return the converted google::protobuf::Timestamp + static google::protobuf::Timestamp ConvertToProtoTimestamp( + const std::chrono::system_clock::time_point& tp); + + /// @brief Builds a SubscriberInfo from the given parameters + /// + /// @param entity_uri the UUri of the entity subscribing + /// @param subscriber_details the details of the subscriber + /// @return the built SubscriberInfo + static SubscriberInfo BuildSubscriberInfo( + const v1::UUri& entity_uri, + std::optional subscriber_details); + + /// @brief Builds a SubscribeAttributes from the given parameters + /// + /// @param when_expire the optional time point when the subscription expires + /// @param subscription_details the details of the subscription + /// @param sample_period_ms the optional sample period in milliseconds + /// @return the built SubscribeAttributes + static SubscribeAttributes BuildSubscribeAttributes( + std::optional when_expire, + std::optional subscription_details, + std::optional sample_period_ms); + + /// @brief Builds a SubscriptionRequest from the given parameters + /// + /// @param subscription_topic the UUri of the topic to subscribe to + /// @param subscriber_info the SubscriberInfo of the subscriber + /// @param attributes the SubscribeAttributes for the subscription + /// @return the built SubscriptionRequest + static SubscriptionRequest BuildSubscriptionRequest( + const v1::UUri& subscription_topic, SubscriberInfo& subscriber_info, + std::optional attributes = {}); + + /// @brief Builds a UnsubscribeRequest from the given parameters + /// + /// @param subscription_topic the UUri of the topic to unsubscribe from + /// @param subscriber_info the SubscriberInfo of the subscriber + /// @return the built UnsubscribeRequest + static UnsubscribeRequest BuildUnSubscribeRequest( + const v1::UUri& subscription_topic, SubscriberInfo& subscriber_info); +}; +}; // namespace uprotocol::utils +#endif // PROTO_CONVERTER_H diff --git a/src/utils/ProtoConverter.cpp b/src/utils/ProtoConverter.cpp new file mode 100644 index 000000000..b776e96b4 --- /dev/null +++ b/src/utils/ProtoConverter.cpp @@ -0,0 +1,86 @@ +#include "up-cpp/utils/ProtoConverter.h" + +namespace uprotocol::utils { +google::protobuf::Timestamp ProtoConverter::ConvertToProtoTimestamp( + const std::chrono::system_clock::time_point& tp) { + google::protobuf::Timestamp timestamp; + auto duration = tp.time_since_epoch(); + auto seconds = + std::chrono::duration_cast(duration).count(); + auto nanoseconds = + std::chrono::duration_cast(duration).count() % + 1000000000ll; + + timestamp.set_seconds(seconds); + timestamp.set_nanos(static_cast(nanoseconds)); + + return timestamp; +} + +// SubscriberInfo builder +SubscriberInfo ProtoConverter::BuildSubscriberInfo( + const v1::UUri& entity_uri, + std::optional subscriber_details) { + SubscriberInfo subscriber_info; + + // Create a new instance of UUri and copy the contents from entity_uri + *subscriber_info.mutable_uri() = entity_uri; + + if (subscriber_details.has_value()) { + subscriber_info.add_details()->CopyFrom(subscriber_details.value()); + } + + return subscriber_info; +} + +// SubscribeAttributes builder +SubscribeAttributes ProtoConverter::BuildSubscribeAttributes( + std::optional when_expire, + std::optional subscription_details, + std::optional sample_period_ms) { + SubscribeAttributes attributes; + + if (when_expire.has_value()) { + *attributes.mutable_expire() = + ConvertToProtoTimestamp(when_expire.value()); + } + + if (subscription_details.has_value()) { + attributes.add_details()->CopyFrom(subscription_details.value()); + } + + if (sample_period_ms.has_value()) { + attributes.set_sample_period_ms( + static_cast(sample_period_ms.value().count())); + } + + return attributes; +} + +// SubscriptionRequest builder +SubscriptionRequest ProtoConverter::BuildSubscriptionRequest( + const v1::UUri& subscription_topic, SubscriberInfo& subscriber_info, + std::optional attributes) { + SubscriptionRequest subscription_request; + *subscription_request.mutable_topic() = subscription_topic; + *subscription_request.mutable_subscriber() = subscriber_info; + + // Use mutable attributes if provided + if (attributes.has_value()) { + *subscription_request.mutable_attributes() = + std::move(attributes.value()); + } + + return subscription_request; +} + +UnsubscribeRequest ProtoConverter::BuildUnSubscribeRequest( + const v1::UUri& subscription_topic, SubscriberInfo& subscriber_info) { + UnsubscribeRequest unsubscribe_request; + *unsubscribe_request.mutable_topic() = subscription_topic; + *unsubscribe_request.mutable_subscriber() = subscriber_info; + + return unsubscribe_request; +} + +} // namespace uprotocol::utils From 93f19a9fbecbcd27a866e0f6b8fae027e56ae65e Mon Sep 17 00:00:00 2001 From: agosh01 Date: Mon, 19 Aug 2024 14:19:21 -0400 Subject: [PATCH 3/4] Adding implimentation for consumer --- src/client/usubscription/v3/Consumer.cpp | 165 +++++++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 src/client/usubscription/v3/Consumer.cpp diff --git a/src/client/usubscription/v3/Consumer.cpp b/src/client/usubscription/v3/Consumer.cpp new file mode 100644 index 000000000..e0b484298 --- /dev/null +++ b/src/client/usubscription/v3/Consumer.cpp @@ -0,0 +1,165 @@ +// SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + +#include + +namespace uprotocol::client::usubscription::v3 { + +Consumer::Consumer(std::shared_ptr transport, + const v1::UUri subscription_topic, + ConsumerOptions consumer_options) + : transport_(transport), + subscription_topic_(subscription_topic), + consumer_options_(consumer_options) { + // Initialize uSubscriptionUUriBuilder_ + uSubscriptionUUriBuilder_ = uSubscriptionUUriBuilder(); + rpc_client_ = nullptr; +} + +[[nodiscard]] Consumer::ConsumerOrStatus Consumer::create( + std::shared_ptr transport, + const v1::UUri subscription_topic, ListenCallback&& callback, + v1::UPriority priority, std::chrono::milliseconds subscription_request_ttl, + ConsumerOptions consumer_options) { + auto consumer = std::make_unique( + std::forward>(transport), + std::forward(subscription_topic), + std::forward(consumer_options)); + + // Attempt to connect create notification sink for updates. + auto status = consumer->createNotificationSink(); + if (status.code() == v1::UCode::OK) { + status = consumer->subscribe(priority, subscription_request_ttl, + std::move(callback)); + if (status.code() == v1::UCode::OK) { + return consumer; + } else { + return uprotocol::utils::Unexpected(std::move(status)); + } + } else { + // If connection fails, return the error status. + return uprotocol::utils::Unexpected(std::move(status)); + } +} + +v1::UStatus Consumer::createNotificationSink() { + auto notification_sink_callback = [this](const v1::UMessage& update) { + if (update.has_payload()) { + Update data; + if (data.ParseFromString(update.payload())) { + if (data.topic().SerializeAsString() == + subscription_topic_.SerializeAsString()) { + subscription_update_ = std::move(data); + } + } + } + }; + + auto notification_topic = uSubscriptionUUriBuilder_.getNotificationUri(); + + auto result = communication::NotificationSink::create( + transport_, std::move(notification_sink_callback), + std::move(notification_topic)); + + if (result.has_value()) { + noficationSinkHandle_ = std::move(result).value(); + v1::UStatus status; + status.set_code(v1::UCode::OK); + return status; + } else { + return result.error(); + } +} + +SubscriptionRequest Consumer::buildSubscriptionRequest() { + auto subscriber_info = ProtoConverter::BuildSubscriberInfo( + transport_->getEntityUri(), consumer_options_.subscriber_details); + auto attributes = ProtoConverter::BuildSubscribeAttributes( + consumer_options_.when_expire, consumer_options_.subscription_details, + consumer_options_.sample_period_ms); + + auto subscription_request = ProtoConverter::BuildSubscriptionRequest( + subscription_topic_, subscriber_info, attributes); + return subscription_request; +} + +v1::UStatus Consumer::subscribe( + v1::UPriority priority, std::chrono::milliseconds subscription_request_ttl, + ListenCallback&& callback) { + rpc_client_ = std::make_unique( + transport_, + std::move(uSubscriptionUUriBuilder_.getServiceUriWithResourceId(1)), + priority, subscription_request_ttl); + + auto onResponse = [this](auto maybeResponse) { + if (maybeResponse.has_value() && maybeResponse.value().has_payload()) { + SubscriptionResponse response; + if (response.ParseFromString(maybeResponse.value().payload())) { + if (response.topic().SerializeAsString() == + subscription_topic_.SerializeAsString()) { + subscription_response_ = response; + } + } + } + }; + + SubscriptionRequest subscriptionRequest = buildSubscriptionRequest(); + auto payload = datamodel::builder::Payload(std::move(subscriptionRequest)); + + rpc_handle_ = + rpc_client_->invokeMethod(std::move(payload), std::move(onResponse)); + + // Create a L2 subscription + auto result = communication::Subscriber::subscribe( + transport_, subscription_topic_, std::move(callback)); + + if (result.has_value()) { + subscriber_ = std::move(result).value(); + v1::UStatus status; + status.set_code(v1::UCode::OK); + return status; + } else { + return result.error(); + } +} + +UnsubscribeRequest Consumer::buildUnsubscriptionRequest() { + auto subscriber_info = ProtoConverter::BuildSubscriberInfo( + transport_->getEntityUri(), consumer_options_.subscriber_details); + + auto unsubscribe_request = ProtoConverter::BuildUnSubscribeRequest( + subscription_topic_, subscriber_info); + return unsubscribe_request; +} + +void Consumer::unsubscribe(v1::UPriority priority, + std::chrono::milliseconds request_ttl) { + rpc_client_ = std::make_unique( + transport_, + std::move(uSubscriptionUUriBuilder_.getServiceUriWithResourceId(2)), + priority, request_ttl); + + auto onResponse = [this](auto maybeResponse) { + if (!maybeResponse.has_value()) { + // Do something as this means sucessfully unsubscribed. + } + }; + + UnsubscribeRequest unsubscribeRequest = buildUnsubscriptionRequest(); + auto payload = datamodel::builder::Payload(std::move(unsubscribeRequest)); + + rpc_handle_ = + rpc_client_->invokeMethod(std::move(payload), std::move(onResponse)); + + subscriber_.reset(); +} + +} // namespace uprotocol::client::usubscription::v3 \ No newline at end of file From 18e9c029259731892833ab1d3db27da196549d93 Mon Sep 17 00:00:00 2001 From: agosh01 Date: Mon, 19 Aug 2024 14:19:59 -0400 Subject: [PATCH 4/4] Adding subset of unit tests --- test/CMakeLists.txt | 3 + .../client/usubscription/v3/ConsumerTest.cpp | 202 ++++++++++++++++++ 2 files changed, 205 insertions(+) create mode 100644 test/coverage/client/usubscription/v3/ConsumerTest.cpp diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 0b04d4786..bd7e6d67f 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -81,6 +81,9 @@ add_coverage_test("SubscriberTest" coverage/communication/SubscriberTest.cpp) add_coverage_test("NotificationSinkTest" coverage/communication/NotificationSinkTest.cpp) add_coverage_test("NotificationSourceTest" coverage/communication/NotificationSourceTest.cpp) +# client +add_coverage_test("ConsumerTest" coverage/client/usubscription/v3/ConsumerTest.cpp) + ########################## EXTRAS ############################################# add_extra_test("PublisherSubscriberTest" extra/PublisherSubscriberTest.cpp) add_extra_test("NotificationTest" extra/NotificationTest.cpp) diff --git a/test/coverage/client/usubscription/v3/ConsumerTest.cpp b/test/coverage/client/usubscription/v3/ConsumerTest.cpp new file mode 100644 index 000000000..17ba73169 --- /dev/null +++ b/test/coverage/client/usubscription/v3/ConsumerTest.cpp @@ -0,0 +1,202 @@ +// SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + +#include +#include +#include +#include + +#include +#include + +#include "UTransportMock.h" + +namespace { +using MsgDiff = google::protobuf::util::MessageDifferencer; + +void someCallBack(const uprotocol::v1::UMessage& message) { + // Print the message + std::cout << message.DebugString() << std::endl; +} + +class ConsumerTest : public testing::Test { +protected: + // Run once per TEST_F. + // Used to set up clean environments per test. + std::shared_ptr mockTransportClient_; + std::shared_ptr mockTransportServer_; + uprotocol::v1::UUri client_uuri; + uprotocol::v1::UUri server_uuri; + uprotocol::v1::UUri subcription_uuri; + + void SetUp() override { + // Create a generic transport uri + client_uuri.set_authority_name("random_string"); + client_uuri.set_ue_id(0x18000); + client_uuri.set_ue_version_major(3); + client_uuri.set_resource_id(0); + + // Set up a transport + mockTransportClient_ = + std::make_shared(client_uuri); + + // Craete server default uri and set up a transport + server_uuri.set_authority_name("core.usubscription"); + server_uuri.set_ue_id(0); + server_uuri.set_ue_version_major(3); + server_uuri.set_resource_id(0); + + mockTransportServer_ = + std::make_shared(server_uuri); + + // Create a generic subscription uri + subcription_uuri.set_authority_name("10.0.0.2"); + subcription_uuri.set_ue_id(0x18000); + subcription_uuri.set_ue_version_major(3); + subcription_uuri.set_resource_id(0x8000); + }; + void TearDown() override {} + + // Run once per execution of the test application. + // Used for setup of all tests. Has access to this instance. + ConsumerTest() = default; + ~ConsumerTest() = default; + + void buildDefaultSourceURI(); + void buildValidNotificationURI(); + void buildInValidNotificationURI(); + + // Run once per execution of the test application. + // Used only for global setup outside of tests. + static void SetUpTestSuite() {} + static void TearDownTestSuite() {} +}; + +// Negative test case with no source filter +TEST_F(ConsumerTest, ConstructorTestSuccess) { + auto subcriptionCallback = someCallBack; + auto subscribe_request_ttl = std::chrono::milliseconds(1000); + auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; + + auto options = uprotocol::client::usubscription::v3::ConsumerOptions(); + + auto consumerOrSatus = + uprotocol::client::usubscription::v3::Consumer::create( + mockTransportClient_, subcription_uuri, + std::move(subcriptionCallback), priority, + std::move(subscribe_request_ttl), options); + + // Ensure that the consumer creation was successful + ASSERT_TRUE(consumerOrSatus.has_value()); + + // Obtain a pointer to the created consumer instance + auto& consumerPtr = consumerOrSatus.value(); + + // Verify that the consumer pointer is not null, indicating successful + // creation + ASSERT_NE(consumerPtr, nullptr); +} + +TEST_F(ConsumerTest, SubscribeTestSuccess) { + auto subcriptionCallback = someCallBack; + auto subscribe_request_ttl = std::chrono::milliseconds(1000); + auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; + + auto options = uprotocol::client::usubscription::v3::ConsumerOptions(); + + auto consumerOrSatus = + uprotocol::client::usubscription::v3::Consumer::create( + mockTransportClient_, subcription_uuri, + std::move(subcriptionCallback), priority, + std::move(subscribe_request_ttl), options); + + // Ensure that the consumer creation was successful + ASSERT_TRUE(consumerOrSatus.has_value()); + + // Obtain a pointer to the created consumer instance + auto& consumerPtr = consumerOrSatus.value(); + + // Verify that the consumer pointer is not null, indicating successful + // creation + ASSERT_NE(consumerPtr, nullptr); + + // Create notification source sink uri to match resource id of sink + auto notification_uuri = server_uuri; + notification_uuri.set_resource_id(0x8000); + + // set format UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY + auto format = + uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY; + + auto norificationSource = uprotocol::communication::NotificationSource( + mockTransportServer_, std::move(notification_uuri), + std::move(client_uuri), format); + // Build payload + const std::string data = "test"; + auto payload = uprotocol::datamodel::builder::Payload(data, format); + + norificationSource.notify(std::move(payload)); + + // Check send count + EXPECT_TRUE(mockTransportServer_->send_count_ == 1); + EXPECT_TRUE(mockTransportClient_->send_count_ == 1); +} + +TEST_F(ConsumerTest, UnsubscribeTestSuccess) { + auto subcriptionCallback = someCallBack; + auto subscribe_request_ttl = std::chrono::milliseconds(1000); + auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; + + auto options = uprotocol::client::usubscription::v3::ConsumerOptions(); + + auto consumerOrSatus = + uprotocol::client::usubscription::v3::Consumer::create( + mockTransportClient_, subcription_uuri, + std::move(subcriptionCallback), priority, + std::move(subscribe_request_ttl), options); + + // Ensure that the consumer creation was successful + ASSERT_TRUE(consumerOrSatus.has_value()); + + // Obtain a pointer to the created consumer instance + auto& consumerPtr = consumerOrSatus.value(); + + // Verify that the consumer pointer is not null, indicating successful + // creation + ASSERT_NE(consumerPtr, nullptr); + + // Create notification source sink uri to match resource id of sink + auto notification_uuri = server_uuri; + notification_uuri.set_resource_id(0x8000); + + // set format UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY + auto format = + uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY; + + auto norificationSource = uprotocol::communication::NotificationSource( + mockTransportServer_, std::move(notification_uuri), + std::move(client_uuri), format); + // Build payload + const std::string data = "test"; + auto payload = uprotocol::datamodel::builder::Payload(data, format); + + norificationSource.notify(std::move(payload)); + + // Check send count + EXPECT_TRUE(mockTransportServer_->send_count_ == 1); + EXPECT_TRUE(mockTransportClient_->send_count_ == 1); + + consumerPtr->unsubscribe(priority, subscribe_request_ttl); + + EXPECT_TRUE(mockTransportClient_->send_count_ == 2); +} + +} // namespace