Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions data_tamer_cpp/include/data_tamer/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ class LogChannel : public std::enable_shared_from_this<LogChannel>
*/
void addDataSink(std::shared_ptr<DataSinkBase> sink);

/**
* @brief removeDataSink remove a sink, i.e. a class collecting our snapshots.
*/
void removeDataSink(std::shared_ptr<DataSinkBase> sink);

/**
* @brief takeSnapshot copies the current value of all your registered values
* and send an instance of Snapshot to all your Sinks.
Expand Down
36 changes: 31 additions & 5 deletions data_tamer_cpp/src/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct LogChannel::Pimpl
Schema schema;
bool logging_started = false;

mutable Mutex sinks_mutex;
std::unordered_set<std::shared_ptr<DataSinkBase>> sinks;
};

Expand Down Expand Up @@ -160,7 +161,23 @@ void LogChannel::unregister(const RegistrationID& id)

void LogChannel::addDataSink(std::shared_ptr<DataSinkBase> sink)
{
_p->sinks.insert(sink);
std::lock_guard const lock_sinks(_p->sinks_mutex);

if(!_p->logging_started)
{
_p->sinks.insert(sink);
}
else
{
sink->addChannel(_p->channel_name, _p->schema);
_p->sinks.insert(sink);
}
}

void LogChannel::removeDataSink(std::shared_ptr<DataSinkBase> sink)
{
std::lock_guard const lock(_p->sinks_mutex);
_p->sinks.erase(sink);
}

Schema LogChannel::getSchema() const
Expand All @@ -183,12 +200,16 @@ void LogChannel::addCustomType(const std::string& custom_type_name,
bool LogChannel::takeSnapshot(std::chrono::nanoseconds timestamp)
{
{
std::lock_guard const lock(_p->mutex);

std::lock_guard const lock_sinks(_p->sinks_mutex);
if(_p->sinks.empty())
{
return false;
}
}

{
std::lock_guard const lock(_p->mutex);

// update the _p->snapshot.active_mask if necessary
if(_p->mask_dirty)
{
Expand Down Expand Up @@ -220,6 +241,8 @@ bool LogChannel::takeSnapshot(std::chrono::nanoseconds timestamp)
{
_p->logging_started = true;
_p->snapshot.schema_hash = _p->schema.hash;

std::lock_guard const lock_sinks(_p->sinks_mutex);
for(auto const& sink : _p->sinks)
{
sink->addChannel(_p->channel_name, _p->schema);
Expand All @@ -243,9 +266,12 @@ bool LogChannel::takeSnapshot(std::chrono::nanoseconds timestamp)
}

bool all_pushed = true;
for(auto& sink : _p->sinks)
{
all_pushed &= sink->pushSnapshot(_p->snapshot);
std::lock_guard const lock_sinks(_p->sinks_mutex);
for(auto& sink : _p->sinks)
{
all_pushed &= sink->pushSnapshot(_p->snapshot);
}
}
return all_pushed;
}
Expand Down