-
Notifications
You must be signed in to change notification settings - Fork 89
Open
Description
Description:
We are encountering segmentation faults, double free errors, and epoll-related crashes when using sdbusplus::asio::connection in a multithreaded application that listens for D-Bus signals. The issue only occurs when signal emission frequency(thread counter larger) is high. At low frequencies little thread, the application runs normally.
Each thread in the application creates its own:boost::asio::io_context,sdbusplus::asio::connection,sdbusplus::bus::match_t
Threads individually call io_context::run() in parallel.The match rule listens to the same D-Bus signal across threads.
Signal sender emits frequently (~1k+ signals per second)
Source code:
#include <atomic>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
#include <csignal>
#include <iostream>
#include <memory>
#include <sdbusplus/asio/connection.hpp>
#include <sdbusplus/bus/match.hpp>
#include <thread>
#include <vector>
std::atomic<bool> running{true};
void signal_handler(int)
{
running = false;
}
class DbusSignalTest : public std::enable_shared_from_this<DbusSignalTest>
{
private:
std::shared_ptr<boost::asio::io_context> m_io;
std::shared_ptr<sdbusplus::asio::connection> m_con;
std::shared_ptr<sdbusplus::bus::match_t> m_match;
std::thread m_thread;
std::optional<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>>
m_guard;
public:
DbusSignalTest() = default;
void start(const std::string &rule)
{
m_io = std::make_shared<boost::asio::io_context>();
m_con = std::make_shared<sdbusplus::asio::connection>(*m_io,
sdbusplus::bus::new_system());
m_guard.emplace(boost::asio::make_work_guard(*m_io));
auto self = shared_from_this();
m_match = std::make_shared<sdbusplus::bus::match_t>(
*m_con, rule, [self](sdbusplus::message_t &msg)
{
std::string content;
msg.read(content);
std::cout << "Signal received: " << content << std::endl; });
m_thread = std::thread([this]
{
try {
m_io->run();
}
catch (const std::exception& e) {
std::cerr << "Exception in io_context: " << e.what() << std::endl;
} });
}
~DbusSignalTest()
{
if (m_io)
{
m_io->stop();
}
if (m_thread.joinable())
{
m_thread.join();
}
m_guard.reset();
}
DbusSignalTest(const DbusSignalTest &) = delete;
DbusSignalTest &operator=(const DbusSignalTest &) = delete;
DbusSignalTest(DbusSignalTest &&) noexcept = default;
DbusSignalTest &operator=(DbusSignalTest &&) noexcept = default;
};
int main(int argc, char *argv[])
{
std::signal(SIGINT, signal_handler);
std::string matchRule = sdbusplus::bus::match::rules::type::signal() +
sdbusplus::bus::match::rules::sender("test.DBus.echo") +
sdbusplus::bus::match::rules::interface("test.DBus.echo") +
sdbusplus::bus::match::rules::member("TimerSignalEmitTest") +
sdbusplus::bus::match::rules::path("/test/DBus/echo/server");
unsigned int threadNum = 1;
if (argc >= 2)
{
char *end;
long val = strtol(argv[1], &end, 10);
if (*end == '\0' && errno != ERANGE && val > 0 && val <= INT_MAX)
{
threadNum = static_cast<unsigned int>(val);
}
else
{
std::cerr << "Invalid thread number. Using default 1.\n";
}
}
unsigned int maxSupported = std::thread::hardware_concurrency() * 2;
if (threadNum > maxSupported)
{
std::cerr << "Thread number resized to " << maxSupported << "\n";
threadNum = maxSupported;
}
std::vector<std::shared_ptr<DbusSignalTest>> instances;
instances.reserve(threadNum);
for (unsigned int i = 0; i < threadNum; ++i)
{
auto instance = std::make_shared<DbusSignalTest>();
instance->start(matchRule);
instances.emplace_back(std::move(instance));
}
std::cout << "Signal match running... Press Ctrl+C to stop." << std::endl;
while (running)
{
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
std::cout << "Stopping..." << std::endl;
return 0;
}
Metadata
Metadata
Assignees
Labels
No labels