diff --git a/CMakeLists.txt b/CMakeLists.txt index e9c3b7a40..0fef14ba1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -205,6 +205,7 @@ cmake_dependent_option(WITH_NODE_WEBRTC "Build with webrtc node-type" cmake_dependent_option(WITH_NODE_WEBSOCKET "Build with websocket node-type" "${WITH_DEFAULTS}" "WITH_WEB" OFF) cmake_dependent_option(WITH_NODE_ZEROMQ "Build with zeromq node-type" "${WITH_DEFAULTS}" "LIBZMQ_FOUND; NOT WITHOUT_GPL" OFF) cmake_dependent_option(WITH_NODE_OPENDSS "Build with opendss node-type" "${WITH_DEFAULTS}" "OpenDSSC_FOUND" OFF) +cmake_dependent_option(WITH_NODE_DELTASHARING "Build with delta-sharing node-type" "${WITH_DEFAULTS}" "" ON) # set a default for the build type if("${CMAKE_BUILD_TYPE}" STREQUAL "") diff --git a/include/villas/nodes/delta_sharing/Protocol.h b/include/villas/nodes/delta_sharing/Protocol.h new file mode 100644 index 000000000..5cde5e00c --- /dev/null +++ b/include/villas/nodes/delta_sharing/Protocol.h @@ -0,0 +1,298 @@ +/* Node type: Delta Share. + * + * Author: Ritesh Karki + * SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace DeltaSharing +{ + + namespace DeltaSharingProtocol + { + + using json = JanssonWrapper::json; + + struct DeltaSharingProfile + { + public: + int shareCredentialsVersion; + std::string endpoint; + std::string bearerToken; + boost::optional expirationTime; + }; + + inline void from_json(const json &j, DeltaSharingProfile &p) + { + if (j.contains("shareCredentialsVersion")) + { + p.shareCredentialsVersion = j["shareCredentialsVersion"].integer_value(); + } + if (j.contains("endpoint")) + { + p.endpoint = j["endpoint"].string_value(); + } + if (j.contains("bearerToken")) + { + p.bearerToken = j["bearerToken"].string_value(); + } + if (j.contains("expirationTime")) + { + p.expirationTime = j["expirationTime"].string_value(); + } + }; + + struct Share + { + public: + std::string name = ""; + boost::optional id; + }; + + inline void from_json(const json &j, Share &s) + { + s.name = j["name"].string_value(); + if (j.contains("id") == false) + { + s.id = boost::none; + } + else + { + s.id = j["id"].string_value(); + } + }; + + struct Schema + { + public: + std::string name; + std::string share; + }; + + inline void from_json(const json &j, Schema &s) + { + s.name = j["name"].string_value(); + if (j.contains("share") == true) + { + s.share = j["share"].string_value(); + } + }; + + struct Table + { + public: + std::string name; + std::string share; + std::string schema; + }; + + inline void from_json(const json &j, Table &t) + { + if (j.contains("name")) + { + t.name = j["name"].string_value(); + } + if (j.contains("share")) + { + t.share = j["share"].string_value(); + } + if (j.contains("schema")) + { + t.schema = j["schema"].string_value(); + } + }; + + struct File + { + public: + std::string url; + boost::optional id; + std::map partitionValues; + std::size_t size; + std::string stats; + boost::optional timestamp; + boost::optional version; + }; + + inline void from_json(const json &j, File &f) + { + if (j.contains("url")) + { + f.url = j["url"].string_value(); + } + if (j.contains("id")) + { + f.id = j["id"].string_value(); + } + if (j.contains("partitionValues")) + { + json arr = j["partitionValues"]; + if (arr.is_array()) + { + for (auto it = arr.begin(); it != arr.end(); ++it) + { + json item = *it; + if (item.is_object()) { + for (auto kv_it = item.begin(); kv_it != item.end(); ++kv_it) { + f.partitionValues[kv_it.key()] = (*kv_it).string_value(); + } + } + } + } + } + if (j.contains("size")) + { + f.size = static_cast(j["size"].integer_value()); + } + if (j.contains("stats")) + { + f.stats = j["stats"].string_value(); + } + if (j.contains("timestamp")) + { + f.timestamp = j["timestamp"].string_value(); + } + if (j.contains("version")) + { + f.version = j["version"].string_value(); + } + }; + + struct Format { + std::string provider; + }; + + inline void from_json(const json &j, Format &f) + { + if (j.contains("provider")) + { + f.provider = j["provider"].string_value(); + } + } + + struct Metadata + { + Format format; + std::string id; + std::vector partitionColumns; + std::string schemaString; + }; + + inline void from_json(const json &j, Metadata &m) + { + if (j.contains("format")) + { + from_json(j["format"], m.format); + } + if (j.contains("id")) + { + m.id = j["id"].string_value(); + } + if (j.contains("partitionColumns")) + { + json arr = j["partitionColumns"]; + if (arr.is_array()) + { + for (auto it = arr.begin(); it != arr.end(); ++it) + { + m.partitionColumns.push_back((*it).string_value()); + } + } + } + if (j.contains("schemaString")) + { + m.schemaString = j["schemaString"].string_value(); + } + } + + struct data + { + std::vector predicateHints; + int limitHint; + }; + + inline void from_json(const json &j, data &d) + { + if (j.contains("predicateHints")) + { + json arr = j["predicateHints"]; + if (arr.is_array()) + { + for (auto it = arr.begin(); it != arr.end(); ++it) + { + d.predicateHints.push_back((*it).string_value()); + } + } + } + if (j.contains("limitHint")) + { + d.limitHint = j["limitHint"].integer_value(); + } + } + + struct format + { + std::string provider; + }; + + inline void from_json(const json &j, format &f) + { + if (j.contains("provider")) + { + f.provider = j["provider"].string_value(); + } + } + + struct protocol + { + int minReaderVersion; + }; + + inline void from_json(const json &j, protocol &p) + { + if (j.contains("minReaderVersion")) + { + p.minReaderVersion = j["minReaderVersion"].integer_value(); + } + } + + struct stats + { + long long numRecords; + long minValues; + long maxValues; + long nullCount; + }; + + inline void from_json(const json &j, stats &s) + { + if (j.contains("numRecords")) + { + s.numRecords = j["numRecords"].integer_value(); + } + if (j.contains("minValues")) + { + s.minValues = j["minValues"].integer_value(); + } + if (j.contains("maxValues")) + { + s.maxValues = j["maxValues"].integer_value(); + } + if (j.contains("nullCount")) + { + s.nullCount = j["nullCount"].integer_value(); + } + } + + }; +}; diff --git a/include/villas/nodes/delta_sharing/delta_sharing.hpp b/include/villas/nodes/delta_sharing/delta_sharing.hpp new file mode 100644 index 000000000..a093c380f --- /dev/null +++ b/include/villas/nodes/delta_sharing/delta_sharing.hpp @@ -0,0 +1,67 @@ +/* Node type: Delta Sharing. + * + * Author: Ritesh Karki + * SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include + +#include + +#include +#include + +namespace arrow { class Table; } + +namespace villas { +namespace node { + +// Forward declarations +class NodeCompat; + +struct delta_sharing { + // Configuration + std::string profile_path; + std::string cache_dir; + std::string table_path; + size_t batch_size; + std::string schema; + std::string share; + std::string table; + + // Client and state + std::shared_ptr client; + std::shared_ptr> schemas; + std::shared_ptr table_ptr; + std::shared_ptr> tables; + std::shared_ptr> shares; + + enum class TableOp { TABLE_NOOP, TABLE_READ, TABLE_WRITE } table_op; +}; + +char *deltaSharing_print(NodeCompat *n); + +int deltaSharing_parse(NodeCompat *n, json_t *json); + +int deltaSharing_start(NodeCompat *n); + +int deltaSharing_stop(NodeCompat *n); + +int deltaSharing_init(NodeCompat *n); + +int deltaSharing_destroy(NodeCompat *n); + +int deltaSharing_poll_fds(NodeCompat *n, int fds[]); + +int deltaSharing_read(NodeCompat *n, struct Sample *const smps[], unsigned cnt); + +int deltaSharing_write(NodeCompat *n, struct Sample *const smps[], unsigned cnt); + +} // namespace node +} // namespace villas diff --git a/include/villas/nodes/delta_sharing/delta_sharing_client.h b/include/villas/nodes/delta_sharing/delta_sharing_client.h new file mode 100644 index 000000000..117cc7a5b --- /dev/null +++ b/include/villas/nodes/delta_sharing/delta_sharing_client.h @@ -0,0 +1,37 @@ +/* Node type: Delta Share. + * + * Author: Ritesh Karki + * SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include + +namespace DeltaSharing +{ + + struct DeltaSharingClient + { + public: + DeltaSharingClient(std::string filename, boost::optional cacheLocation); + std::shared_ptr LoadAsArrowTable(std::string &url); + std::shared_ptr ReadTableFromCache(std::string &url); + const std::shared_ptr> ListShares(int maxResult, std::string pageToken) const; + const std::shared_ptr> ListSchemas(const DeltaSharingProtocol::Share &share, int maxResult, std::string pageToken) const; + const std::shared_ptr> ListTables(const DeltaSharingProtocol::Schema &schema, int maxResult, std::string pageToken) const; + const std::shared_ptr> ListAllTables(const DeltaSharingProtocol::Share &share, int maxResult, std::string pageToken) const; + const std::shared_ptr> ListFilesInTable(const DeltaSharingProtocol::Table) const; + const DeltaSharingProtocol::Metadata QueryTableMetadata(const DeltaSharingProtocol::Table &table) const; + const int GetNumberOfThreads() { return this->maxThreads; }; + void PopulateCache(std::string url) { this->restClient.PopulateCache(url,this->cacheLocation); }; + protected: + private: + DeltaSharingRestClient restClient; + std::string cacheLocation; + int maxThreads; + }; +}; diff --git a/include/villas/nodes/delta_sharing/delta_sharing_rest_client.h b/include/villas/nodes/delta_sharing/delta_sharing_rest_client.h new file mode 100644 index 000000000..38a00c523 --- /dev/null +++ b/include/villas/nodes/delta_sharing/delta_sharing_rest_client.h @@ -0,0 +1,44 @@ +/* Node type: Delta Share. + * + * Author: Ritesh Karki + * SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + + +namespace DeltaSharing +{ + using json = JanssonWrapper::json; + struct DeltaSharingRestClient + { + public: + DeltaSharingRestClient(std::string filename); + ~DeltaSharingRestClient(); + const std::shared_ptr> ListShares(int maxResult, std::string pageToken) const; + const std::shared_ptr> ListSchemas(const DeltaSharingProtocol::Share &share, int maxResult, std::string pageToken) const; + const std::shared_ptr> ListTables(const DeltaSharingProtocol::Schema &schema, int maxResult, std::string pageToken) const; + const std::shared_ptr> ListAllTables(const DeltaSharingProtocol::Share &share, int maxResult, std::string pageToken) const; + const std::shared_ptr> ListFilesInTable(const DeltaSharingProtocol::Table) const; + const DeltaSharingProtocol::Metadata QueryTableMetadata(const DeltaSharingProtocol::Table &table) const; + const DeltaSharingProtocol::DeltaSharingProfile &GetProfile() const; + RestClient::Response get(std::string url); + void PopulateCache(std::string url, std::string cacheLocation); + const bool shouldRetry(RestClient::Response &response) const; + + protected: + json ReadFromFile(std::string filename); + + private: + DeltaSharingProtocol::DeltaSharingProfile profile; + static const std::string user_agent; + }; +}; diff --git a/include/villas/nodes/delta_sharing/functions.h b/include/villas/nodes/delta_sharing/functions.h new file mode 100644 index 000000000..40945fa35 --- /dev/null +++ b/include/villas/nodes/delta_sharing/functions.h @@ -0,0 +1,27 @@ +/* Node type: Delta Share. + * + * Author: Ritesh Karki + * SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace DeltaSharing { + +const std::vector ParseURL(std::string path); +std::shared_ptr +NewDeltaSharingClient(std::string profile, + boost::optional cacheLocation); +const std::shared_ptr LoadAsArrowTable(std::string path, + int fileno); +}; + +// namespace DeltaSharing diff --git a/include/villas/nodes/delta_sharing/jansson_wrapper.h b/include/villas/nodes/delta_sharing/jansson_wrapper.h new file mode 100644 index 000000000..2a0516f95 --- /dev/null +++ b/include/villas/nodes/delta_sharing/jansson_wrapper.h @@ -0,0 +1,255 @@ +/* Node type: Delta Share. + * + * Author: Ritesh Karki + * SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace DeltaSharing { +namespace JanssonWrapper { + +class JsonValue { +private: + json_t* m_json; + +public: + JsonValue() : m_json(nullptr) {} + + explicit JsonValue(json_t* json) : m_json(json) { + if (m_json) { + json_incref(m_json); + } + } + + //Reference Counting + JsonValue(const JsonValue& other) : m_json(other.m_json) { + if (m_json) { + json_incref(m_json); + } + } + + JsonValue& operator=(const JsonValue& other) { + if (this != &other) { + if (m_json) { + json_decref(m_json); + } + m_json = other.m_json; + if (m_json) { + json_incref(m_json); + } + } + return *this; + } + + ~JsonValue() { + if (m_json) { + json_decref(m_json); + } + } + + json_t* get() const { return m_json; } + json_t* release() { + json_t* result = m_json; + m_json = nullptr; + return result; + } + + // Type checking + bool is_object() const { return json_is_object(m_json); } + bool is_array() const { return json_is_array(m_json); } + bool is_string() const { return json_is_string(m_json); } + bool is_integer() const { return json_is_integer(m_json); } + bool is_real() const { return json_is_real(m_json); } + bool is_boolean() const { return json_is_boolean(m_json); } + bool is_null() const { return json_is_null(m_json); } + + std::string string_value() const { + if (!is_string()) return ""; + return std::string(json_string_value(m_json)); + } + + int integer_value() const { + if (!is_integer()) return 0; + return static_cast(json_integer_value(m_json)); + } + + double real_value() const { + if (!is_real()) return 0.0; + return json_real_value(m_json); + } + + bool boolean_value() const { + return json_is_true(m_json); + } + + bool contains(const std::string& key) const { + return json_object_get(m_json, key.c_str()) != nullptr; + } + + JsonValue operator[](const std::string& key) const { + json_t* value = json_object_get(m_json, key.c_str()); + return JsonValue(value); + } + + size_t size() const { + if (is_array()) { + return json_array_size(m_json); + } else if (is_object()) { + return json_object_size(m_json); + } + return 0; + } + + JsonValue operator[](size_t index) const { + if (!is_array()) return JsonValue(); + return JsonValue(json_array_get(m_json, index)); + } + + // Iteration through json items + class iterator { + private: + json_t* m_json; + void* m_iter; + bool m_is_array; + size_t m_array_index; + size_t m_array_size; + + public: + iterator(json_t* json, bool is_array = false) + : m_json(json), m_iter(nullptr), m_is_array(is_array), m_array_index(0), m_array_size(0) { + if (m_json) { + if (m_is_array) { + m_array_size = json_array_size(m_json); + if (m_array_size > 0) { + m_iter = json_array_get(m_json, 0); + } + } else { + m_iter = json_object_iter(m_json); + } + } + } + + iterator& operator++() { + if (m_is_array) { + m_array_index++; + if (m_array_index < m_array_size) { + m_iter = json_array_get(m_json, m_array_index); + } else { + m_iter = nullptr; + } + } else { + m_iter = json_object_iter_next(m_json, m_iter); + } + return *this; + } + + bool operator!=(const iterator& other) const { + return m_iter != other.m_iter; + } + + JsonValue operator*() const { + return JsonValue(static_cast(m_iter)); + } + + std::string key() const { + if (m_is_array) return std::to_string(m_array_index); + return std::string(json_object_iter_key(m_iter)); + } + }; + + iterator begin() const { + return iterator(m_json, is_array()); + } + + iterator end() const { + return iterator(nullptr, is_array()); + } + + // Conversion to string + std::string dump() const { + char* str = json_dumps(m_json, JSON_COMPACT); + if (!str) return ""; + std::string result(str); + free(str); + return result; + } + + // Static factory methods + static JsonValue parse(const std::string& json_str) { + json_error_t error; + json_t* json = json_loads(json_str.c_str(), 0, &error); + if (!json) { + throw std::runtime_error("JSON parse error: " + std::string(error.text)); + } + return JsonValue(json); + } + + static JsonValue parse_file(const std::string& filename) { + json_error_t error; + json_t* json = json_load_file(filename.c_str(), 0, &error); + if (!json) { + throw std::runtime_error("JSON file error: " + std::string(error.text)); + } + return JsonValue(json); + } + + static JsonValue object() { + return JsonValue(json_object()); + } + + static JsonValue array() { + return JsonValue(json_array()); + } + + static JsonValue string(const std::string& value) { + return JsonValue(json_string(value.c_str())); + } + + static JsonValue integer(int value) { + return JsonValue(json_integer(value)); + } + + static JsonValue real(double value) { + return JsonValue(json_real(value)); + } + + static JsonValue boolean(bool value) { + return JsonValue(value ? json_true() : json_false()); + } + + static JsonValue null() { + return JsonValue(json_null()); + } + + // Object manipulation + void set(const std::string& key, JsonValue value) { + if (is_object()) { + json_object_set_new(m_json, key.c_str(), value.release()); + } + } + + // Array manipulation + void push_back(JsonValue value) { + if (is_array()) { + json_array_append_new(m_json, value.release()); + } + } + + bool empty() const { + return m_json == nullptr || size() == 0; + } +}; + +using json = JsonValue; + +} // namespace JanssonWrapper +} // namespace DeltaSharing diff --git a/lib/nodes/CMakeLists.txt b/lib/nodes/CMakeLists.txt index 01cc0e7ef..e0f7e5e60 100644 --- a/lib/nodes/CMakeLists.txt +++ b/lib/nodes/CMakeLists.txt @@ -3,7 +3,7 @@ # Author: Steffen Vogel # SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University # SPDX-License-Identifier: Apache-2.0 - +include (ExternalProject) set(NODE_SRC loopback_internal.cpp ) @@ -189,6 +189,50 @@ if(WITH_NODE_WEBRTC) list(APPEND LIBRARIES LibDataChannel::LibDataChannel) endif() +# Enable Delta Sharing +if(WITH_NODE_DELTASHARING) + list(APPEND NODE_SRC + delta_sharing/delta_sharing.cpp + delta_sharing/delta_sharing_client.cpp + delta_sharing/delta_sharing_rest_client.cpp + delta_sharing/functions.cpp + ) + + ExternalProject_Add(restclient + GIT_REPOSITORY https://github.com/mrtazz/restclient-cpp + GIT_TAG 0.5.2 + SOURCE_DIR restclient + BUILD_IN_SOURCE 1 + UPDATE_COMMAND "" + PATCH_COMMAND "" + CONFIGURE_COMMAND ./autogen.sh && ./configure + BUILD_COMMAND make + INSTALL_COMMAND sudo make install + ) + + # Create imported target + add_library(restclient-cpp SHARED IMPORTED) + set_target_properties(restclient-cpp PROPERTIES + IMPORTED_LOCATION ${CMAKE_INSTALL_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}restclient-cpp${CMAKE_SHARED_LIBRARY_SUFFIX} + INTERFACE_INCLUDE_DIRECTORIES ${CMAKE_INSTALL_PREFIX}/include + ) + + # Add dependency + add_dependencies(restclient-cpp restclient) + + pkg_check_modules(ARROW REQUIRED arrow) + pkg_check_modules(PARQUET REQUIRED parquet) + + + list(APPEND LIBRARIES + restclient-cpp + PkgConfig::JANSSON + ${ARROW_LIBRARIES} + ${PARQUET_LIBRARIES} + ${RESTCLIENT_LIBRARIES} + ) +endif() + add_library(nodes STATIC ${NODE_SRC}) target_include_directories(nodes PUBLIC ${INCLUDE_DIRS}) target_link_libraries(nodes PUBLIC ${LIBRARIES}) diff --git a/lib/nodes/delta_sharing/delta_sharing.cpp b/lib/nodes/delta_sharing/delta_sharing.cpp new file mode 100644 index 000000000..9f97fd2f4 --- /dev/null +++ b/lib/nodes/delta_sharing/delta_sharing.cpp @@ -0,0 +1,413 @@ +/* Node type: delta_sharing. + * + * Author: Ritesh Karki + * SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace villas; +using namespace villas::node; + +static const char *const OP_READ = "read"; +static const char *const OP_WRITE = "write"; +static const char *const OP_NOOP = "noop"; + +int villas::node::deltaSharing_parse(NodeCompat *n, json_t *json) { + auto *d = n->getData(); + + int ret; + json_error_t err; + + const char *profile_path = nullptr; + const char *cache_dir = nullptr; + const char *table_path = nullptr; + const char *op = nullptr; + const char *schema = nullptr; + const char *share = nullptr; + const char *table = nullptr; + int batch_size = 0; + + ret = json_unpack_ex(json, &err, 0, + "{ s?: s, s?: s, s?: s, s?: s, s?: s, s?: s, s?: s, s?: i }", + "profile_path", &profile_path, "schema", &schema, + "share", &share, "table", &table,"cache_dir", &cache_dir, "table_path", + &table_path, "op", &op, "batch_size", &batch_size); + + if (ret) + throw ConfigError(json, err, "node-config-node-delta_sharing"); + + if (profile_path) + d->profile_path = profile_path; + if (share) + d->share = share; + if (schema) + d->schema = schema; + if (table) + d->table = table; + if (cache_dir) + d->cache_dir = cache_dir; + if (table_path) + d->table_path = table_path; + if (batch_size > 0) + d->batch_size = static_cast(batch_size); + + if (op) { + if (strcmp(op, OP_READ) == 0) + d->table_op = delta_sharing::TableOp::TABLE_READ; + else if (strcmp(op, OP_WRITE) == 0) + d->table_op = delta_sharing::TableOp::TABLE_WRITE; + else + d->table_op = delta_sharing::TableOp::TABLE_NOOP; + } + + return 0; +} + +char *villas::node::deltaSharing_print(NodeCompat *n) { + auto *d = n->getData(); + + std::string info = + std::string("profile_path=") + d->profile_path + ", share =" + d->share + + ", schema =" + d->schema + ", table =" + d->table + + ", cache_dir=" + d->cache_dir + ", table_path=" + d->table_path + + ", op=" + + (d->table_op == delta_sharing::TableOp::TABLE_READ + ? OP_READ + : (d->table_op == delta_sharing::TableOp::TABLE_WRITE ? OP_WRITE + : OP_NOOP)); + + return strdup(info.c_str()); +} + + +int villas::node::deltaSharing_start(NodeCompat *n) { + auto *d = n->getData(); + + if (d->profile_path.empty()) + throw RuntimeError( + "'profile_path' must be configured for delta_sharing node"); + + boost::optional cache_opt = + d->cache_dir.empty() ? boost::none + : boost::optional(d->cache_dir); + + d->client = DeltaSharing::NewDeltaSharingClient(d->profile_path, cache_opt); + + if (!d->client) + throw RuntimeError("Failed to create Delta Sharing client"); + + //List all shares from the profile path + d->shares = d->client->ListShares(100, ""); + + const auto &shares = *d->shares; + + for (const auto &share : shares) { + n->logger->info("Listing share {}", share.name); + d->schemas = d->client->ListSchemas(share, 100, ""); + //List all tables in a share + d->tables = d->client->ListAllTables(share, 100, ""); + //Check if tables are fetched correctly + n->logger->info("Table 1 {}", d->tables->at(0).name); + + } + + return 0; +} + +int villas::node::deltaSharing_stop(NodeCompat *n) { + auto *d = n->getData(); + d->table_ptr.reset(); + d->tables.reset(); + d->shares.reset(); + d->client.reset(); + return 0; +} + +int villas::node::deltaSharing_init(NodeCompat *n) { + auto *d = n->getData(); + + // d->profile_path = ""; + // d->cache_dir = ""; + // d->table_path = ""; + d->batch_size = 0; + + d->client.reset(); + d->table_ptr.reset(); + d->tables.reset(); + d->shares.reset(); + d->table_op = delta_sharing::TableOp::TABLE_NOOP; + n->logger->info("Init for Delta Share node"); + + return 0; +} + +int villas::node::deltaSharing_destroy(NodeCompat *n) { + auto *d = n->getData(); + d->client.reset(); + if (d->table_ptr != NULL) + d->table_ptr.reset(); + if (d->tables != NULL) + d->tables.reset(); + if (d->shares != NULL) + d->shares.reset(); + return 0; +} + +int villas::node::deltaSharing_poll_fds(NodeCompat *n, int fds[]) { + (void)n; + (void)fds; + return -1; // no polling support +} + +int villas::node::deltaSharing_read(NodeCompat *n, struct Sample *const smps[], + unsigned cnt) { + + auto *d = n->getData(); + + if (!d->client) { + n->logger->error("Delta Sharing client not initialized"); + return -1; + } + + if (d->table_path.empty()) { + n->logger->error("No table path configured"); + return -1; + } + + try { + auto path = DeltaSharing::ParseURL(d->table_path); + + if (path.size() != 4) { + n->logger->error("Invalid table path format. Expected: server#share.schema.table"); + return -1; + } + + DeltaSharing::DeltaSharingProtocol::Table table; + table.share = path[1]; + table.schema = path[2]; + table.name = path[3]; + + //Get files in the table + auto files = d->client->ListFilesInTable(table); + if (!files || files->empty()) { + n->logger->info("No files found in table"); + return 0; + } + + for (const auto &f : *files) { + d->client->PopulateCache(f.url); + } + + //Load the first file as an Arrow table + if (!d->table_ptr) { + d->table_ptr = d->client->LoadAsArrowTable(files->at(0).url); + + if (!d->table_ptr) { + n->logger->error("Failed to laod table from Delta Sharing server"); + } + } + + unsigned samples_read = 0; + auto num_rows = d->table_ptr->num_rows(); + unsigned num_cols = d->table_ptr->num_columns(); + + auto signals = n->getInputSignals(false); + if (!signals) { + n->logger->error("No input signals configured"); + return -1; + } + + for (unsigned i = 0; i < cnt && i < num_rows; i++) { + auto *smp = smps[i]; + n->logger->info("Row name {}", d->table_ptr->ColumnNames().at(3)); + smp->length = signals->size(); + smp->capacity = signals->size(); + smp->ts.origin = time_now(); + + for (unsigned col = 0; col < num_cols && col < signals->size(); col++) { + auto chunked_array = d->table_ptr->column(col); + auto first_chunk = chunked_array->chunk(0); + switch (first_chunk->type_id()) { + case arrow::Type::DOUBLE: { + auto double_array = + std::static_pointer_cast(first_chunk); + smp->data[col].f = double_array->Value(i); + break; + } + case arrow::Type::FLOAT: { + auto float_array = + std::static_pointer_cast(first_chunk); + smp->data[col].f = float_array->Value(i); + } + case arrow::Type::INT64: { + auto int_array = std::static_pointer_cast(first_chunk); + smp->data[col].i = int_array->Value(i); + break; + } + case arrow::Type::INT32: { + auto int_array = std::static_pointer_cast(first_chunk); + smp->data[col].i = int_array->Value(i); + break; + } + /* case arrow::Type::STRING: { + auto string_array = + std::static_pointer_cast(first_chunk); + smp->data[col]. + } */ + default: + n->logger->warn("Unsupported data type for column {}", col); + smp->data[col].f = 0.0; + } + } + samples_read++; + } + + n->logger->debug("Read {} samples from Delta Sharing table", samples_read); + return samples_read; + } catch (const std::exception &e) { + n->logger->error("Error reading from Delta Sharing table: {}", e.what()); + return -1; + } + } + +//TODO: write table to delta share server. Implementation to be tested +int villas::node::deltaSharing_write(NodeCompat *n, struct Sample *const smps[], + unsigned cnt) { + auto *d = n->getData(); + + if (!d->client) { + n->logger->error("Delta Sharing client not initialized"); + return -1; + } + + if (d->table_path.empty()) { + n->logger->error("No table path configured"); + return -1; + } + + try { + auto path_parts = DeltaSharing::ParseURL(d->table_path); + if (path_parts.size() != 4) { + n->logger->error("Invalid table path format. Expected: server#share.schema.table"); + return -1; + } + + auto signals = n->getOutputSignals(false); + if (!signals) { + n->logger->error("No output signals configured"); + return -1; + } + + std::vector> arrays; + std::vector> fields; + + for (unsigned col = 0; col < signals->size(); col++) { + auto signal = signals->at(col); + + std::string field_name = signal->name; + if (field_name.empty()) { + field_name = "col_" + std::to_string(col); + } + + //Determine arrow data type from signal data type + std::shared_ptr data_type; + switch (signal->type) { + case SignalType::FLOAT: + data_type = arrow::float64(); + break; + case SignalType::INTEGER: + data_type = arrow::int64(); + break; + default: + data_type = arrow::float64(); + } + + fields.push_back(arrow::field(field_name, data_type)); + + //create Arrow array from sampled data + std::shared_ptr array; + switch (signal->type) { + case SignalType::FLOAT: { + std::vector values; + for (unsigned i = 0; i < cnt; i++) { + values.push_back(smps[i]->data[col].f); + } + arrow::DoubleBuilder builder; + PARQUET_THROW_NOT_OK(builder.AppendValues(values)); + PARQUET_THROW_NOT_OK(builder.Finish(&array)); + break; + } + case SignalType::INTEGER: { + std::vector values; + for (unsigned i = 0; i < cnt; i++) { + values.push_back(smps[i]->data[col].i); + } + arrow::Int64Builder builder; + PARQUET_THROW_NOT_OK(builder.AppendValues(values)); + PARQUET_THROW_NOT_OK(builder.Finish(&array)); + break; + } + default: + n->logger->warn("Unsupported signal type for column {}", col); + continue; + } + + arrays.push_back(array); + } + // Create Arrow schema and table + auto schema = std::make_shared(fields); + auto table = arrow::Table::Make(schema, arrays); + + // Store the table for potential future use + d->table_ptr = table; + + n->logger->debug("Wrote {} samples to Delta Sharing table", cnt); + return cnt; + } catch (const std::exception &e) { + n->logger->error("Error writing to Delta Sharing: {}", e.what()); + return -1; + } +} + +static NodeCompatType p; + +__attribute__((constructor(110))) static void register_plugin() { + p.name = "delta_sharing"; + p.description = "Delta Sharing protocol node"; + p.vectorize = 1; + p.size = sizeof(struct delta_sharing); + p.init = deltaSharing_init; + p.destroy = deltaSharing_destroy; + p.parse = deltaSharing_parse; + p.print = deltaSharing_print; + p.start = deltaSharing_start; + p.stop = deltaSharing_stop; + p.read = deltaSharing_read; + p.write = deltaSharing_write; + p.poll_fds = deltaSharing_poll_fds; + + static NodeCompatFactory ncp(&p); +} diff --git a/lib/nodes/delta_sharing/delta_sharing_client.cpp b/lib/nodes/delta_sharing/delta_sharing_client.cpp new file mode 100644 index 000000000..32b971e6e --- /dev/null +++ b/lib/nodes/delta_sharing/delta_sharing_client.cpp @@ -0,0 +1,174 @@ +/* Node type: delta_sharing. + * + * Author: Ritesh Karki + * SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +//namespace ds = arrow::dataset; +//namespace fs = arrow::fs; + +namespace DeltaSharing +{ + + DeltaSharingClient::DeltaSharingClient(std::string filename, boost::optional cacheLocation) : restClient(filename) + { + auto path = std::filesystem::current_path().generic_string(); + std::cerr << "Current path: " << path << std::endl; + path.append("/cache"); + this->cacheLocation = cacheLocation.get_value_or(path); + if (std::filesystem::exists(this->cacheLocation) == false) + std::filesystem::create_directories(this->cacheLocation); + + if (std::filesystem::exists(this->cacheLocation) && std::filesystem::is_directory(this->cacheLocation)) + { + auto p = std::filesystem::status(this->cacheLocation).permissions(); + std::cerr << "Cache directory:" << this->cacheLocation << " Permission: " << ((p & std::filesystem::perms::owner_read) != std::filesystem::perms::none ? "r" : "-") + << ((p & std::filesystem::perms::owner_write) != std::filesystem::perms::none ? "w" : "-") + << ((p & std::filesystem::perms::owner_exec) != std::filesystem::perms::none ? "x" : "-") + << ((p & std::filesystem::perms::group_read) != std::filesystem::perms::none ? "r" : "-") + << ((p & std::filesystem::perms::group_write) != std::filesystem::perms::none ? "w" : "-") + << ((p & std::filesystem::perms::group_exec) != std::filesystem::perms::none ? "x" : "-") + << ((p & std::filesystem::perms::others_read) != std::filesystem::perms::none ? "r" : "-") + << ((p & std::filesystem::perms::others_write) != std::filesystem::perms::none ? "w" : "-") + << ((p & std::filesystem::perms::others_exec) != std::filesystem::perms::none ? "x" : "-") + << '\n'; + } + this->maxThreads = std::thread::hardware_concurrency(); + }; + + std::shared_ptr DeltaSharingClient::LoadAsArrowTable(std::string &url) + { + + if (url.length() == 0) + return std::shared_ptr(); + + int protocolLength = 0; + if ((url.find("http://")) != std::string::npos) + { + protocolLength = 7; + } + + if ((url.find("https://")) != std::string::npos) + { + protocolLength = 8; + } + auto pos = url.find_first_of('?', protocolLength); + auto path = url.substr(protocolLength, pos - protocolLength); // Removing "https://" + + std::vector urlparts; + while ((pos = path.find("/")) != std::string::npos) + { + urlparts.push_back(path.substr(0, pos)); + path.erase(0, pos + 1); + } + if (urlparts.size() != 3) + { + std::cerr << "Invalid URL:" << url << std::endl; + return std::shared_ptr(); + } + std::string tbl = urlparts.back(); + urlparts.pop_back(); + std::string schema = urlparts.back(); + urlparts.pop_back(); + std::string share = urlparts.back(); + + auto completePath = this->cacheLocation + "/" + share + "/" + schema + "/" + tbl; + std::shared_ptr infile; + try + { + PARQUET_ASSIGN_OR_THROW(infile, + arrow::io::ReadableFile::Open(completePath + "/" + path)); + } + catch (parquet::ParquetStatusException &e) + { + std::cerr << "error code:(" << e.status() << ") Message: " << e.what() << std::endl; + return std::shared_ptr(); + } + + auto reader_result = parquet::arrow::OpenFile(infile, arrow::default_memory_pool()); + if (!reader_result.ok()) { + throw std::runtime_error(reader_result.status().ToString()); + } + std::unique_ptr reader = std::move(reader_result).ValueOrDie(); + std::shared_ptr table; + + PARQUET_THROW_NOT_OK(reader->ReadTable(&table)); + + /* std::unique_ptr reader; + PARQUET_THROW_NOT_OK( + parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); + std::shared_ptr table; + PARQUET_THROW_NOT_OK(reader->ReadTable(&table)); */ + + return table; + }; + + std::shared_ptr DeltaSharingClient::ReadTableFromCache(std::string &completePath) + { + + // fs::FileSelector selector; + /// selector.base_dir = completePath; + + + // auto filesystem = fs::FileSystemFromUriOrPath(completePath); + //auto format = std::make_shared(); + //auto format = std::make_shared(); + //ds::FileSystemFactoryOptions opts; + + // ds::FileSystemDatasetFactory + // ds::FileSystemDatasetFactory f; + + // auto factor = ds::FileSystemDatasetFactory::Make(filesystem, selector, NULL, NULL); + return std::shared_ptr(); + }; + + const std::shared_ptr> DeltaSharingClient::ListShares(int maxResult, std::string pageToken) const + { + return this->restClient.ListShares(maxResult, pageToken); + }; + + const std::shared_ptr> DeltaSharingClient::ListSchemas(const DeltaSharingProtocol::Share &share, int maxResult, std::string pageToken) const + { + return this->restClient.ListSchemas(share, maxResult, pageToken); + }; + + const std::shared_ptr> DeltaSharingClient::ListTables(const DeltaSharingProtocol::Schema &schema, int maxResult, std::string pageToken) const + { + return this->restClient.ListTables(schema, maxResult, pageToken); + }; + + const std::shared_ptr> DeltaSharingClient::ListAllTables(const DeltaSharingProtocol::Share &share, int maxResult, std::string pageToken) const + { + return this->restClient.ListAllTables(share, maxResult, pageToken); + }; + + const std::shared_ptr> DeltaSharingClient::ListFilesInTable(const DeltaSharingProtocol::Table table) const + { + return this->restClient.ListFilesInTable(table); + }; + + const DeltaSharingProtocol::Metadata DeltaSharingClient::QueryTableMetadata(const DeltaSharingProtocol::Table &table) const + { + return this->restClient.QueryTableMetadata(table); + }; +}; diff --git a/lib/nodes/delta_sharing/delta_sharing_rest_client.cpp b/lib/nodes/delta_sharing/delta_sharing_rest_client.cpp new file mode 100644 index 000000000..6e7f49cbd --- /dev/null +++ b/lib/nodes/delta_sharing/delta_sharing_rest_client.cpp @@ -0,0 +1,289 @@ +/* Node type: delta_sharing. + * + * Author: Ritesh Karki + * SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DeltaSharing +{ + + const std::string DeltaSharingRestClient::user_agent = "delta-sharing-CPP/0.0.1"; + + DeltaSharingRestClient::DeltaSharingRestClient(std::string filename) + { + json j = ReadFromFile(filename); + if(j.empty()) { + return; + } + //assignment changed to explicit conversion using from_json() + // this->profile = j; + from_json(j, this->profile); + RestClient::init(); + }; + + DeltaSharingRestClient::~DeltaSharingRestClient() + { + std::cerr << "DeltaSharingRestClient destructed" << std::endl; + }; + + const std::shared_ptr> DeltaSharingRestClient::ListShares(int maxResult, std::string pageToken) const + { + std::unique_ptr c = std::unique_ptr(new RestClient::Connection(this->profile.endpoint)); + c->SetUserAgent(this->user_agent); + RestClient::Response r = c->get("/shares"); + json j = json::parse(r.body); + auto items = j["items"]; + std::shared_ptr> p; + p = std::make_shared>(); + for (auto it = items.begin(); it != items.end(); ++it) + { + //Change assignment to share s to explicit assignment using from_json + // DeltaSharingProtocol::Share s = *it; + DeltaSharingProtocol::Share s; + from_json(*it, s); + p->push_back(s); + } + return p; + }; + + const std::shared_ptr> DeltaSharingRestClient::ListSchemas(const DeltaSharingProtocol::Share &share, int maxResult, std::string pageToken) const + { + std::unique_ptr c = std::unique_ptr(new RestClient::Connection(this->profile.endpoint)); + c->SetUserAgent(this->user_agent); + std::string path = "/shares/" + share.name + "/schemas"; + RestClient::Response r = c->get(path); + json j = json::parse(r.body); + auto items = j["items"]; + std::shared_ptr> p; + p = std::make_shared>(); + for (auto it = items.begin(); it != items.end(); ++it) + { + //Change assignment to schema s to explicit assignment using from_json + // DeltaSharingProtocol::Schema s = *it; + DeltaSharingProtocol::Schema s; + from_json(*it, s); + p->push_back(s); + } + return p; + }; + + const std::shared_ptr> DeltaSharingRestClient::ListTables(const DeltaSharingProtocol::Schema &schema, int maxResult, std::string pageToken) const + { + std::unique_ptr c = std::unique_ptr(new RestClient::Connection(this->profile.endpoint)); + c->SetUserAgent(this->user_agent); + std::string path = "/shares/" + schema.share + "/schemas/" + schema.name + "/tables"; + RestClient::Response r = c->get(path); + json j = json::parse(r.body); + auto items = j["items"]; + std::shared_ptr> t; + t = std::make_shared>(); + for (auto it = items.begin(); it != items.end(); ++it) + { + //Change assignment to table s to explicit assignment using from_json + // DeltaSharingProtocol::Table s = *it; + DeltaSharingProtocol::Table s; + from_json(*it, s); + t->push_back(s); + } + return t; + }; + + const std::shared_ptr> DeltaSharingRestClient::ListAllTables(const DeltaSharingProtocol::Share &share, int maxResult, std::string pageToken) const + { + std::unique_ptr c = std::unique_ptr(new RestClient::Connection(this->profile.endpoint)); + c->SetUserAgent(this->user_agent); + std::string path = "/shares/" + share.name + "/all-tables"; + RestClient::Response r = c->get(path); + json j = json::parse(r.body); + auto items = j["items"]; + std::shared_ptr> t; + t = std::make_shared>(); + for (auto it = items.begin(); it != items.end(); ++it) + { + //change assignment to explicit assignment + // DeltaSharingProtocol::Table s = *it; + DeltaSharingProtocol::Table s; + from_json(*it, s); + t->push_back(s); + } + return t; + }; + + const DeltaSharingProtocol::Metadata DeltaSharingRestClient::QueryTableMetadata(const DeltaSharingProtocol::Table &table) const { + std::unique_ptr c = std::unique_ptr(new RestClient::Connection(this->profile.endpoint)); + c->SetUserAgent(this->user_agent); + std::string path = "/shares/" + table.share + "/schemas/" + table.schema + "/tables/" + table.name + "/metadata"; + RestClient::Response r = c->get(path); + std::istringstream input; + input.str(r.body); + json j; + DeltaSharingProtocol::Metadata m; + int cnt = 0; + for (std::string line; std::getline(input, line); cnt++) + { + if (cnt == 1) + { + j = json::parse(line); + // m = j["metaData"]; + from_json(j["metaData"], m); + } + } + + return m; + }; + + json DeltaSharingRestClient::ReadFromFile(std::string filename) + { + std::ifstream is; + try { + is.open(filename, std::ifstream::in); + } + catch (std::exception *e) { + return json::object(); + } + + json j = json::parse_file(filename); + is.close(); + return j; + }; + + const DeltaSharingProtocol::DeltaSharingProfile &DeltaSharingRestClient::GetProfile() const + { + return this->profile; + } + + void DeltaSharingRestClient::PopulateCache(std::string url, std::string cacheLocation) { + int protocolLength = 0; + if ((url.find("http://")) != std::string::npos) + { + protocolLength = 7; + } + + if ((url.find("https://")) != std::string::npos) + { + protocolLength = 8; + } + auto pos = url.find_first_of('?', protocolLength); + auto path = url.substr(protocolLength, pos - protocolLength); // Removing "https://" + + std::vector urlparts; + while ((pos = path.find("/")) != std::string::npos) + { + urlparts.push_back(path.substr(0, pos)); + path.erase(0, pos + 1); + } + if (urlparts.size() != 3) + { + std::cerr << "Invalid URL:" << url << std::endl; + return; + } + std::string tbl = urlparts.back(); + urlparts.pop_back(); + std::string schema = urlparts.back(); + urlparts.pop_back(); + std::string share = urlparts.back(); + + auto completePath = cacheLocation + "/" + share + "/" + schema + "/" + tbl; + + if(!std::filesystem::exists(completePath + "/" + path)) + { + std::cerr << completePath+ "/" + path << " does not exist in cache" << std::endl; + std::filesystem::create_directories(completePath); + auto r = this->get(url); + int cnt = 0; + std::cerr << url << " code: " << r.code << std::endl; + + while (this->shouldRetry(r)) + { + cnt++; + std::this_thread::sleep_for(std::chrono::seconds(1)); + if (cnt > 4) + { + std::cerr << "Failed to retrieve file using url: ( Response code: " << r.code << ") Message: " << r.body << std::endl; + return; + } + r = this->get(url); + } + + if (r.code != 200) + { + std::cerr << "Could not read file: " << r.code << " Message: " << r.body << std::endl; + return; + } + + std::fstream f; + f.open(completePath + "/" + path, std::ios::trunc | std::ios::out | std::ios::binary); + f.write(r.body.c_str(), r.body.size()); + f.flush(); + f.close(); + } + }; + + const std::shared_ptr> DeltaSharingRestClient::ListFilesInTable(DeltaSharingProtocol::Table table) const + { + std::unique_ptr c = std::unique_ptr(new RestClient::Connection(this->profile.endpoint)); + c->SetUserAgent(this->user_agent); + std::string path = "/shares/" + table.share + "/schemas/" + table.schema + "/tables/" + table.name + "/query"; + RestClient::HeaderFields h; + h.insert({"Content-Type", "application/json; charset=UTF-8"}); + h.insert({"Authorization", "Bearer: " + this->profile.bearerToken}); + c->SetHeaders(h); + DeltaSharingProtocol::data d; + json j = json::object(); + j.set("predicateHints", json::array()); + j.set("limitHint", json::integer(d.limitHint)); + RestClient::Response r = c->post(path, j.dump()); + int cnt = 0; + std::istringstream input; + input.str(r.body); + std::shared_ptr> t; + t = std::make_shared>(); + for (std::string line; std::getline(input, line); cnt++) + { + if (cnt > 1) + { + json jf = json::parse(line); + json jf2 = jf["file"]; + // DeltaSharingProtocol::File f = jf2; + DeltaSharingProtocol::File f; + from_json(jf2, f); + t->push_back(f); + } + } + + return t; + }; + RestClient::Response DeltaSharingRestClient::get(std::string url) + { + RestClient::Response r = RestClient::get(url); + return r; + }; + + + const bool DeltaSharingRestClient::shouldRetry(RestClient::Response &r) const { + if(r.code == 200) + return false; + if (r.code == 429) { + std::cerr << "Retry operation due to status code: 429" << std::endl; + return true; + } else if (r.code >= 500 && r.code < 600 ) { + std::cerr << "Retry operation due to status code: " << r.code << std::endl; + return true; + } else + return false; + + }; + +}; diff --git a/lib/nodes/delta_sharing/functions.cpp b/lib/nodes/delta_sharing/functions.cpp new file mode 100644 index 000000000..93a282592 --- /dev/null +++ b/lib/nodes/delta_sharing/functions.cpp @@ -0,0 +1,86 @@ +/* Node type: delta_sharing. + * + * Author: Ritesh Karki + * SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include + + +namespace DeltaSharing +{ + + const std::vector ParseURL(std::string path) + { + std::vector urlparts; + std::string url = path; + auto pos = url.find_last_of('#'); + + if (pos == std::string::npos) + { + std::cerr << "Invalid path: " << url << std::endl; + return std::vector(); + } + urlparts.push_back(url.substr(0, pos)); + url.erase(0, pos + 1); + while ((pos = url.find(".")) != std::string::npos) + { + urlparts.push_back(url.substr(0, pos)); + url.erase(0, pos + 1); + } + urlparts.push_back(url); + if (urlparts.size() != 4) + { + std::cerr << "Path does not follow pattern: #.., " << path << std::endl; + } + return urlparts; + }; + + std::shared_ptr NewDeltaSharingClient(std::string profile, boost::optional cacheLocation) + { + return std::make_shared(profile, cacheLocation); + }; + + const std::shared_ptr LoadAsArrowTable(std::string path, int fileno) { + + auto p = ParseURL(path); + if(p.size() != 4) { + std::cerr << "PATH NOT CORRECT: " << path << std::endl; + return std::shared_ptr(); + } + auto cl = NewDeltaSharingClient(p.at(0),boost::none); + DeltaSharingProtocol::Table t; + t.name = p.at(3); + t.schema = p.at(2); + t.share = p.at(1); + + auto flist = cl->ListFilesInTable(t); + std::vector writethreads; + + for(long unsigned int i = 0; i < flist->size(); i++) + { + auto arg = flist->at(i).url; + std::thread th(&DeltaSharingClient::PopulateCache, cl, arg); + writethreads.push_back(std::move(th)); + } + + for (auto i = writethreads.begin(); i != writethreads.end(); i++) { + if(i->joinable()) { + i->join(); + } + } + + + if(flist->size() > static_cast(fileno)) { + auto f = flist->at(fileno); + std::cerr << "Number of threads supported: " << cl->GetNumberOfThreads() << std::endl; + + return cl->LoadAsArrowTable(f.url); + } else + return std::shared_ptr(); + }; + + +}; diff --git a/packaging/deps.sh b/packaging/deps.sh index d8f86f2a8..e2159e581 100644 --- a/packaging/deps.sh +++ b/packaging/deps.sh @@ -505,6 +505,33 @@ if ! find /usr/{local/,}{lib,bin} -name "libOpenDSSC.so" | grep -q . && popd fi +# Build and install Apache Arrow with Parquet and Snappy +if ! cmake --find-package -DNAME=Arrow -DCOMPILER_ID=GNU -DLANGUAGE-CXX -DMODE=EXIST >/dev/null 2>/dev/null && \ + should_build "apache-arrow" "for Arrow/Parquet support"; then + ARROW_TAG=${ARROW_TAG:-apache-arrow-16.1.0} + ARROW_REPO=${ARROW_REPO:-https://github.com/apache/arrow.git} + + git clone ${GIT_OPTS} --branch ${ARROW_TAG} ${ARROW_REPO} apache-arrow + mkdir -p apache-arrow/cpp/build + pushd apache-arrow/cpp/build + + cmake -S ../ \ + -B . \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_INSTALL_PREFIX=${PREFIX} \ + -DARROW_BUILD_SHARED=ON \ + -DARROW_BUILD_STATIC=OFF \ + -DARROW_DEPENDENCY_SOURCE=BUNDLED \ + -DARROW_FILESYSTEM=ON \ + -DARROW_CSV=ON \ + -DARROW_JSON=ON \ + -DARROW_PARQUET=ON \ + -DPARQUET_BUILD_EXECUTABLES=OFF \ + -DPARQUET_BUILD_EXAMPLES=OFF + make ${MAKE_OPTS} install + popd +fi + popd >/dev/null rm -rf ${TMPDIR} diff --git a/tests/integration/node-deltaSharing.sh b/tests/integration/node-deltaSharing.sh new file mode 100644 index 000000000..e6d386658 --- /dev/null +++ b/tests/integration/node-deltaSharing.sh @@ -0,0 +1,264 @@ +#!/bin/bash + +# Delta Share Node Integration Test +# This test verifies the delta_sharing node can connect to an open Delta Sharing server +# and read/write data + +set -e + +# Test configuration +TEST_NAME="delta_sharing_integration" +TEST_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +BUILD_DIR="${TEST_DIR}/../../../../build" +# VILLAS_NODE="${BUILD_DIR}/src/villas-node" +VILLAS_NODE="villas-node" + +# Test data paths +TEST_PROFILE="${TEST_DIR}/open_delta_profile.json" +TEST_CACHE="/tmp/delta_sharing_test_cache" +TEST_CONFIG="${TEST_DIR}/test_config.json" +TEST_OUTPUT="${TEST_DIR}/test_output.json" +TEST_INPUT="${TEST_DIR}/test_input.json" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +# Helper functions +log_info() { + echo -e "${GREEN}[INFO]${NC} $1" +} + +log_warn() { + echo -e "${YELLOW}[WARN]${NC} $1" +} + +log_error() { + echo -e "${RED}[ERROR]${NC} $1" +} + +log_debug() { + echo -e "${BLUE}[DEBUG]${NC} $1" +} + +cleanup() { + log_info "Cleaning up test artifacts..." + rm -f "${TEST_CONFIG}" "${TEST_OUTPUT}" "${TEST_INPUT}" + rm -rf "${TEST_CACHE}" + rm -f "${TEST_PROFILE}" + log_info "Cleanup complete" +} + +# Set up test environment +setup_test() { + log_info "Setting up Delta Share integration test with open datasets server..." + + # Create test cache directory + mkdir -p "${TEST_CACHE}" + + cat > "${TEST_PROFILE}" << 'EOF' +{ + "shareCredentialsVersion": 1, + "endpoint": "https://sharing.delta.io/delta-sharing/", + "bearerToken": "faaie590d541265bcab1f2de9813274bf233" +} +EOF + + # Create test input data + cat > "${TEST_INPUT}" << 'EOF' +{ + "nodes": { + "signal_source": { + "type": "signal", + "signal": "sine", + "rate": 10, + "limit": 5 + } + }, + "paths": [ + { + "in": "signal_source" + } + ] +} +EOF + + # Create test configuration for reading from Delta Sharing + cat > "${TEST_CONFIG}" << EOF +{ + "nodes": { + "delta_reader": { + "type": "delta_sharing", + "profile_path": "${TEST_PROFILE}", + "cache_dir": "${TEST_CACHE}", + "table_path": "open-datasets.share#delta_sharing.default.COVID_19_NYT", + "op": "read", + "batch_size": 10 + }, + "delta_writer": { + "type": "delta_sharing", + "profile_path": "${TEST_PROFILE}", + "cache_dir": "${TEST_CACHE}", + "table_path": "open-delta-sharing.s3.us-west-2.amazonaws.com#samples.test_output", + "op": "write", + "batch_size": 10 + }, + "file1": { + "type": "file", + "uri": "${TEST_OUTPUT}", + "format": "json" + } + }, + "paths": [ + { + "in": "delta_reader", + "out": "file1" + } + ] +} +EOF + + log_info "Test setup complete" +} + +# Test 1: Verify Delta Sharing credentials +test_credentials() { + log_info "Testing Delta Sharing credentials..." + + if [ ! -f "${TEST_PROFILE}" ]; then + log_error "Profile file not found: ${TEST_PROFILE}" + return 1 + fi + + # Check if profile has valid JSON structure + if ! python3 -m json.tool "${TEST_PROFILE}" > /dev/null 2>&1; then + log_error "Invalid JSON in profile file" + return 1 + fi + + + log_info "Credentials validation test passed" + return 0 +} + +# Test 2: Test Delta Sharing connection +test_connection() { + log_info "Testing Delta Sharing server connection..." + + # Test basic connection by trying to start the node + if timeout 2 "${VILLAS_NODE}" -c "${TEST_CONFIG}" --start 2>&1 | grep -q "Delta Sharing"; then + log_info "Connection test passed (Delta Sharing client initialized)" + return 0 + else + log_error "Connection test failed" + return 1 + fi +} + +# Test 3: Test data reading from Delta Sharing +test_data_reading() { + log_info "Testing data reading from Delta Sharing..." + + # Try to read a small amount of data + log_debug "Attempting to read data from COVID-19_NYT table..." + + log_info "Data reading test completed" + return 0 +} + +# Test 4: Test node configuration parsing +test_config_parsing() { + log_info "Testing node configuration parsing..." + + # Test if the node type is recognized + if ! "${VILLAS_NODE}" --help | grep -i "delta_sharing"; then + log_error "delta_sharing node type not found in villas-node" + return 1 + else + log_info "delta_sharing node type present in villas-node" + fi + + #Test if the configuration can be parsed + if ! ("${VILLAS_NODE}" "${TEST_CONFIG}" &); then + log_error "Node configuration check failed" + return 1 + else + log_info "running example configuration for 3 seconds" + DELTA_PID=$! + kill $DELTA_PID + fi + # wait 3 + # kill $(DELTA_PID) + + + log_info "Configuration parsing test passed" + return 0 +} + +# Test 5: Test node lifecycle +test_node_lifecycle() { + log_info "Testing node lifecycle..." + + # Test start/stop without running for too long + if timeout 2 "${VILLAS_NODE}" -c "${TEST_CONFIG}" --start 2>&1 | grep -q "Delta Sharing\|Started"; then + log_info "Node lifecycle test passed" + return 0 + else + log_warn "Node lifecycle test inconclusive" + return 0 + fi +} + + +# Main test execution +run_tests() { + local exit_code=0 + + log_info "Starting Delta Share integration tests with open datasets server..." + + # Run all tests + test_credentials || exit_code=1 + test_config_parsing || exit_code=1 + test_connection || exit_code=1 + test_data_reading || exit_code=1 + test_node_lifecycle || exit_code=1 + + if [ $exit_code -eq 0 ]; then + log_info "All tests passed!" + else + log_error "Some tests failed!" + fi + + return $exit_code +} + +# Main execution +main() { + # Ensure we're in the right directory + cd "${TEST_DIR}" + + # Set up trap for cleanup + trap cleanup EXIT + + # Check if profile exists and has valid credentials + if [ ! -f "${TEST_PROFILE}" ] || grep -q "YOUR_ACTUAL_BEARER_TOKEN_HERE" "${TEST_PROFILE}"; then + log_warn "Please set up your Delta Sharing credentials before running tests" + log_warn "Some tests will be skipped" + fi + + # Run tests + setup_test + run_tests + local test_result=$? + + # Cleanup automatically via trap + + exit $test_result +} + +main "$@" + +