Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ cmake_dependent_option(WITH_NODE_WEBRTC "Build with webrtc node-type"
cmake_dependent_option(WITH_NODE_WEBSOCKET "Build with websocket node-type" "${WITH_DEFAULTS}" "WITH_WEB" OFF)
cmake_dependent_option(WITH_NODE_ZEROMQ "Build with zeromq node-type" "${WITH_DEFAULTS}" "LIBZMQ_FOUND; NOT WITHOUT_GPL" OFF)
cmake_dependent_option(WITH_NODE_OPENDSS "Build with opendss node-type" "${WITH_DEFAULTS}" "OpenDSSC_FOUND" OFF)
cmake_dependent_option(WITH_NODE_DELTASHARING "Build with delta-sharing node-type" "${WITH_DEFAULTS}" "" ON)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please fix the indentation, so that all options are aligned?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are not checking the presence of any of the dependencies used by this node-type: arrow,...


# set a default for the build type
if("${CMAKE_BUILD_TYPE}" STREQUAL "")
Expand Down
298 changes: 298 additions & 0 deletions include/villas/nodes/delta_sharing/Protocol.h
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is C++ code, please use the .hpp extension.

Also, please use snake case capitalization for file names.

Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
/* Node type: Delta Share.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the correct name of this protocol?

  • Delta Sharing
  • or Delta Share?

*
* Author: Ritesh Karki <ritesh.karki@rwth-aachen.de>
* SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <iostream>
#include <villas/nodes/delta_sharing/jansson_wrapper.h>
#include <map>
#include <string>
#include <vector>
#include <boost/optional.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not use Boost in this project. Please limit yourself to the C++20 standard library (which also provides an optional type).

#include <boost/optional/optional_io.hpp>

namespace DeltaSharing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please format this code with clang-format. There is also a script to do so in the tools folder, or you can use pre-commit.

{

namespace DeltaSharingProtocol
{

using json = JanssonWrapper::json;

struct DeltaSharingProfile
{
public:
int shareCredentialsVersion;
std::string endpoint;
std::string bearerToken;
boost::optional<std::string> expirationTime;
};

inline void from_json(const json &j, DeltaSharingProfile &p)
{
if (j.contains("shareCredentialsVersion"))
{
p.shareCredentialsVersion = j["shareCredentialsVersion"].integer_value();
}
if (j.contains("endpoint"))
{
p.endpoint = j["endpoint"].string_value();
}
if (j.contains("bearerToken"))
{
p.bearerToken = j["bearerToken"].string_value();
}
if (j.contains("expirationTime"))
{
p.expirationTime = j["expirationTime"].string_value();
}
};

struct Share
{
public:
std::string name = "";
boost::optional<std::string> id;
};

inline void from_json(const json &j, Share &s)
{
s.name = j["name"].string_value();
if (j.contains("id") == false)
{
s.id = boost::none;
}
else
{
s.id = j["id"].string_value();
}
};

struct Schema
{
public:
std::string name;
std::string share;
};

inline void from_json(const json &j, Schema &s)
{
s.name = j["name"].string_value();
if (j.contains("share") == true)
{
s.share = j["share"].string_value();
}
};

struct Table
{
public:
std::string name;
std::string share;
std::string schema;
};

inline void from_json(const json &j, Table &t)
{
if (j.contains("name"))
{
t.name = j["name"].string_value();
}
if (j.contains("share"))
{
t.share = j["share"].string_value();
}
if (j.contains("schema"))
{
t.schema = j["schema"].string_value();
}
};

struct File
{
public:
std::string url;
boost::optional<std::string> id;
std::map<std::string, std::string> partitionValues;
std::size_t size;
std::string stats;
boost::optional<std::string> timestamp;
boost::optional<std::string> version;
};

inline void from_json(const json &j, File &f)
{
if (j.contains("url"))
{
f.url = j["url"].string_value();
}
if (j.contains("id"))
{
f.id = j["id"].string_value();
}
if (j.contains("partitionValues"))
{
json arr = j["partitionValues"];
if (arr.is_array())
{
for (auto it = arr.begin(); it != arr.end(); ++it)
{
json item = *it;
if (item.is_object()) {
for (auto kv_it = item.begin(); kv_it != item.end(); ++kv_it) {
f.partitionValues[kv_it.key()] = (*kv_it).string_value();
}
}
}
}
}
if (j.contains("size"))
{
f.size = static_cast<std::size_t>(j["size"].integer_value());
}
if (j.contains("stats"))
{
f.stats = j["stats"].string_value();
}
if (j.contains("timestamp"))
{
f.timestamp = j["timestamp"].string_value();
}
if (j.contains("version"))
{
f.version = j["version"].string_value();
}
};

struct Format {
std::string provider;
};

inline void from_json(const json &j, Format &f)
{
if (j.contains("provider"))
{
f.provider = j["provider"].string_value();
}
}

struct Metadata
{
Format format;
std::string id;
std::vector<std::string> partitionColumns;
std::string schemaString;
};

inline void from_json(const json &j, Metadata &m)
{
if (j.contains("format"))
{
from_json(j["format"], m.format);
}
if (j.contains("id"))
{
m.id = j["id"].string_value();
}
if (j.contains("partitionColumns"))
{
json arr = j["partitionColumns"];
if (arr.is_array())
{
for (auto it = arr.begin(); it != arr.end(); ++it)
{
m.partitionColumns.push_back((*it).string_value());
}
}
}
if (j.contains("schemaString"))
{
m.schemaString = j["schemaString"].string_value();
}
}

struct data
{
std::vector<std::string> predicateHints;
int limitHint;
};

inline void from_json(const json &j, data &d)
{
if (j.contains("predicateHints"))
{
json arr = j["predicateHints"];
if (arr.is_array())
{
for (auto it = arr.begin(); it != arr.end(); ++it)
{
d.predicateHints.push_back((*it).string_value());
}
}
}
if (j.contains("limitHint"))
{
d.limitHint = j["limitHint"].integer_value();
}
}

struct format
{
std::string provider;
};

inline void from_json(const json &j, format &f)
{
if (j.contains("provider"))
{
f.provider = j["provider"].string_value();
}
}

struct protocol
{
int minReaderVersion;
};

inline void from_json(const json &j, protocol &p)
{
if (j.contains("minReaderVersion"))
{
p.minReaderVersion = j["minReaderVersion"].integer_value();
}
}

struct stats
{
long long numRecords;
long minValues;
long maxValues;
long nullCount;
};

inline void from_json(const json &j, stats &s)
{
if (j.contains("numRecords"))
{
s.numRecords = j["numRecords"].integer_value();
}
if (j.contains("minValues"))
{
s.minValues = j["minValues"].integer_value();
}
if (j.contains("maxValues"))
{
s.maxValues = j["maxValues"].integer_value();
}
if (j.contains("nullCount"))
{
s.nullCount = j["nullCount"].integer_value();
}
}

};
};
67 changes: 67 additions & 0 deletions include/villas/nodes/delta_sharing/delta_sharing.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/* Node type: Delta Sharing.
*
* Author: Ritesh Karki <ritesh.karki@rwth-aachen.de>
* SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <cstddef>
#include <memory>
#include <string>
#include <vector>

#include <jansson.h>

#include <villas/nodes/delta_sharing/Protocol.h>
#include <villas/nodes/delta_sharing/delta_sharing_client.h>

namespace arrow { class Table; }

namespace villas {
namespace node {

// Forward declarations
class NodeCompat;

struct delta_sharing {
// Configuration
std::string profile_path;
std::string cache_dir;
std::string table_path;
size_t batch_size;
std::string schema;
std::string share;
std::string table;

// Client and state
std::shared_ptr<DeltaSharing::DeltaSharingClient> client;
std::shared_ptr<std::vector<DeltaSharing::DeltaSharingProtocol::Schema>> schemas;
std::shared_ptr<arrow::Table> table_ptr;
std::shared_ptr<std::vector<DeltaSharing::DeltaSharingProtocol::Table>> tables;
std::shared_ptr<std::vector<DeltaSharing::DeltaSharingProtocol::Share>> shares;

enum class TableOp { TABLE_NOOP, TABLE_READ, TABLE_WRITE } table_op;
};

char *deltaSharing_print(NodeCompat *n);

int deltaSharing_parse(NodeCompat *n, json_t *json);

int deltaSharing_start(NodeCompat *n);

int deltaSharing_stop(NodeCompat *n);

int deltaSharing_init(NodeCompat *n);

int deltaSharing_destroy(NodeCompat *n);

int deltaSharing_poll_fds(NodeCompat *n, int fds[]);

int deltaSharing_read(NodeCompat *n, struct Sample *const smps[], unsigned cnt);

int deltaSharing_write(NodeCompat *n, struct Sample *const smps[], unsigned cnt);

} // namespace node
} // namespace villas
Loading