Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Client/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ class Connection : public IServerConnection
format_settings = settings;
}

String getHost() { return host; }
UInt16 getPort() { return port; }

private:
String host;
UInt16 port;
Expand Down
2 changes: 2 additions & 0 deletions src/Client/HedgedConnections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ Packet HedgedConnections::receivePacketFromReplica(const ReplicaLocation & repli
{
ReplicaState & replica = offset_states[replica_location.offset].replicas[replica_location.index];
Packet packet = std::move(last_received_packet);
last_packet_connection = replica.connection

switch (packet.type)
{
case Protocol::Server::Data:
Expand Down
5 changes: 5 additions & 0 deletions src/Client/HedgedConnections.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class HedgedConnections : public IConnections

UInt64 receivePacketTypeUnlocked(AsyncCallback async_callback) override;

Connection * getLastPacketConnection() override { return last_packet_connection; }

void disconnect() override;

void sendCancel() override;
Expand Down Expand Up @@ -199,6 +201,9 @@ class HedgedConnections : public IConnections
/// to resume it's packet receiver when new packet is needed.
std::optional<ReplicaLocation> replica_with_last_received_packet;

/// Connection that received last packet.
Connection * last_packet_connection = nullptr;

Packet last_received_packet;

Epoll epoll;
Expand Down
2 changes: 2 additions & 0 deletions src/Client/IConnections.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class IConnections : boost::noncopyable

virtual UInt64 receivePacketTypeUnlocked(AsyncCallback async_callback) = 0;

virtual Connection * getLastPacketConnection() = 0;

/// Break all active connections.
virtual void disconnect() = 0;

Expand Down
2 changes: 2 additions & 0 deletions src/Client/MultiplexedConnections.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class MultiplexedConnections final : public IConnections

Packet receivePacket() override;

Connection * getLastPacketConnection() override { return current_connection; }

void disconnect() override;

void sendCancel() override;
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,9 @@ Ignore absence of file if it does not exist when reading certain keys.
Possible values:
- 1 — `SELECT` returns empty result.
- 0 — `SELECT` throws an exception.
)", 0) \
DECLARE(Bool, object_storage_stable_cluster_task_distribution, false, R"(
Use stable task distribution for object storage cluster table functions in order to better utilize filesystem cache.
)", 0) \
DECLARE(Bool, hdfs_ignore_file_doesnt_exist, false, R"(
Ignore absence of file if it does not exist when reading certain keys.
Expand Down
3 changes: 2 additions & 1 deletion src/QueryPipeline/RemoteQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,8 @@ void RemoteQueryExecutor::processReadTaskRequest()
throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized");

ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived);
auto response = (*extension->task_iterator)();

auto response = (*extension->task_iterator)(connections->getLastPacketConnection());
connections->sendReadTaskResponse(response);
}

Expand Down
2 changes: 1 addition & 1 deletion src/QueryPipeline/RemoteQueryExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class RemoteQueryExecutorReadContext;
class ParallelReplicasReadingCoordinator;

/// This is the same type as StorageS3Source::IteratorWrapper
using TaskIterator = std::function<String()>;
using TaskIterator = std::function<String(Connection*)>;

/// This class allows one to launch queries on remote replicas of one shard and get results
class RemoteQueryExecutor
Expand Down
32 changes: 25 additions & 7 deletions src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
#include <Storages/ObjectStorage/Utils.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>


namespace DB
{
namespace Setting
{
extern const SettingsBool use_hive_partitioning;
extern const SettingsBool object_storage_stable_cluster_task_distribution;
}

namespace ErrorCodes
Expand Down Expand Up @@ -118,14 +120,30 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
local_context, predicate, virtual_columns, nullptr, local_context->getFileProgressCallback());

auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String
auto cluster = getCluster(local_context);

if (local_context->getSettingsRef()[Setting::object_storage_stable_cluster_task_distribution])
{
auto object_info = iterator->next(0);
if (object_info)
return object_info->getPath();
return "";
});
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, cluster);

auto callback = std::make_shared<TaskIterator>(
[task_distributor](Connection * connection) mutable -> String {
return task_distributor->getNextTask(connection);
});

return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
}
else
{
auto callback = std::make_shared<TaskIterator>(
[iterator](Connection *) mutable -> String {
if (auto object_info = iterator->next(0))
return object_info->getPath();
return "";
});

return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
#include "StorageObjectStorageStableTaskDistributor.h"
#include <Common/SipHash.h>

namespace DB
{

StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
std::shared_ptr<IObjectIterator> iterator_,
const ClusterPtr & cluster)
: iterator(std::move(iterator_))
, iterator_exhausted(false)
, total_files_processed(0)
{
initializeConnectionMapping(cluster);

LOG_INFO(
log,
"Initialized StorageObjectStorageStableTaskDistributor to distribute files across {} unique replicas",
total_replicas
);
}

void StorageObjectStorageStableTaskDistributor::initializeConnectionMapping(const ClusterPtr & cluster)
{
connection_key_to_replica.clear();
replica_to_connection_key.clear();

const auto & addresses_with_failover = cluster->getShardsAddresses();

for (size_t shard_idx = 0; shard_idx < addresses_with_failover.size(); ++shard_idx)
{
const auto & addresses = addresses_with_failover[shard_idx];

for (const auto & address : addresses)
{
String connection_key = address.host_name + ":" + std::to_string(address.port);

if (connection_key_to_replica.contains(connection_key))
continue;

Int32 replica_idx = static_cast<Int32>(replica_to_connection_key.size());
connection_key_to_replica[connection_key] = replica_idx;
replica_to_connection_key.push_back(connection_key);

LOG_TRACE(
log,
"Discovered shard {} replica with connection key {} (replica_idx: {})",
shard_idx,
connection_key,
replica_idx
);
}
}

total_replicas = static_cast<Int32>(replica_to_connection_key.size());

LOG_INFO(
log,
"Mapping connections to {} unique replicas",
total_replicas
);
}

String StorageObjectStorageStableTaskDistributor::getNextTask(Connection * connection)
{
String connection_key = "default";
Int32 replica_idx = -1;

if (connection)
{
connection_key = connection->getHost() + ":" + std::to_string(connection->getPort());
auto it = connection_key_to_replica.find(connection_key);
if (it != connection_key_to_replica.end())
{
replica_idx = it->second;
}
}

LOG_TRACE(
log,
"Received a new connection ({}, replica_idx: {}) looking for a file",
connection_key,
replica_idx
);

// 1. Check pre-queued files first
String file = getPreQueuedFile(connection_key);
if (!file.empty())
return file;

// 2. Try to find a matching file from the iterator
file = getMatchingFileFromIterator(connection_key, replica_idx);
if (!file.empty())
return file;

if (!unprocessed_files.empty()) {
// Prevent initiator from stealing jobs from other replicas
sleepForMilliseconds(50);
}

// 3. Process unprocessed files if iterator is exhausted
return getAnyUnprocessedFile(connection_key);
}

Int32 StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path, Int32 total_replicas)
{
if (total_replicas <= 0)
return 0;

UInt64 hash_value = sipHash64(file_path);
return static_cast<Int32>(hash_value % total_replicas);
}

String StorageObjectStorageStableTaskDistributor::getPreQueuedFile(const String & connection_key)
{
std::lock_guard lock(mutex);
auto & files_for_connection = connection_to_files[connection_key];

// Find the first file that's still unprocessed
while (!files_for_connection.empty())
{
String next_file = files_for_connection.back();
files_for_connection.pop_back();

// Skip if this file was already processed
if (!unprocessed_files.contains(next_file))
continue;

unprocessed_files.erase(next_file);
total_files_processed++;

LOG_TRACE(
log,
"Assigning pre-queued file {} to connection {} (processed: {})",
next_file,
connection_key,
total_files_processed
);

return next_file;
}

return "";
}

String StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator(const String & connection_key, Int32 replica_idx)
{
while (!iterator_exhausted)
{
ObjectInfoPtr object_info;

{
std::lock_guard lock(mutex);
object_info = iterator->next(0);

if (!object_info)
{
iterator_exhausted = true;
break;
}
}

String file_path = object_info->getPath();
Int32 file_replica_idx = getReplicaForFile(file_path, total_replicas);

if (file_replica_idx == replica_idx)
{
std::lock_guard lock(mutex);
total_files_processed++;

LOG_TRACE(
log,
"Found file {} to connection {} (processed: {})",
file_path,
connection_key,
total_files_processed
);

return file_path;
}

// Queue file for its assigned replica
{
std::lock_guard lock(mutex);

unprocessed_files.insert(file_path);
if (file_replica_idx < total_replicas)
{
String target_connection_key = replica_to_connection_key[file_replica_idx];
connection_to_files[target_connection_key].push_back(file_path);
}
}
}

return "";
}

String StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(const String & connection_key)
{
std::lock_guard lock(mutex);

if (!unprocessed_files.empty())
{
auto it = unprocessed_files.begin();
String next_file = *it;
unprocessed_files.erase(it);
total_files_processed++;

LOG_TRACE(
log,
"Iterator exhausted. Assigning unprocessed file {} to connection {} (processed: {})",
next_file,
connection_key,
total_files_processed
);

return next_file;
}

return "";
}

}
Loading
Loading