Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake/autogenerated_versions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# NOTE: has nothing common with DBMS_TCP_PROTOCOL_VERSION,
# only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes.
SET(VERSION_REVISION 136)
SET(VERSION_REVISION 140)
SET(VERSION_MAJOR 3)
SET(VERSION_MINOR 0)
SET(VERSION_PATCH 5)
Expand Down
2 changes: 1 addition & 1 deletion programs/server/grok-patterns
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# '60' is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
TIME %{HOUR}:%{MINUTE}(?::%{SECOND})
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
Expand Down
6 changes: 4 additions & 2 deletions src/AggregateFunctions/IAggregateFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,10 @@ class IAggregateFunction : public std::enable_shared_from_this<IAggregateFunctio
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method negate is not supported for {}", getName());
}

/// Move state from rhs to target place
virtual void move(AggregateDataPtr __restrict place, AggregateDataPtr rhs, Arena * arena) const
/// Copy state from rhs to target place, it behaves the same way as merge(),
/// The difference is that copy() does not affect \rhs state,
/// while the merge() method may cause \rhs state to change, such as Streaming::AggregateFunctionDistinct
virtual void copy(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const
{
merge(place, rhs, arena);
}
Expand Down
47 changes: 31 additions & 16 deletions src/AggregateFunctions/Streaming/AggregateFunctionDistinct.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ struct AggregateFunctionDistinctSingleNumericData
extra_data_since_last_finalize.emplace_back(vec[row_num]);
}

void merge(const Self & rhs, Arena *)
void merge(const Self & rhs, Arena *, bool move_rhs)
{
/// Deduplicate owned extra data based on rhs, also make sure it doesn't exist in rhs extra data
for (auto it = extra_data_since_last_finalize.begin(); it != extra_data_since_last_finalize.end();)
Expand Down Expand Up @@ -133,10 +133,13 @@ struct AggregateFunctionDistinctSingleNumericData

set.merge(rhs.set);

uintptr_t merged_place = reinterpret_cast<uintptr_t>(&rhs);
auto find_place = std::find(merged_places.begin(), merged_places.end(), merged_place);
if (find_place == merged_places.end())
merged_places.emplace_back(merged_place);
if (move_rhs)
{
uintptr_t merged_place = reinterpret_cast<uintptr_t>(&rhs);
auto find_place = std::find(merged_places.begin(), merged_places.end(), merged_place);
if (find_place == merged_places.end())
merged_places.emplace_back(merged_place);
}
}

void serialize(WriteBuffer & buf) const
Expand Down Expand Up @@ -181,7 +184,7 @@ struct AggregateFunctionDistinctGenericData

bool use_extra_data = false;

void merge(const Self & rhs, Arena * arena)
void merge(const Self & rhs, Arena * arena, bool move_rhs)
{
Set::LookupResult it;
bool inserted;
Expand Down Expand Up @@ -216,10 +219,13 @@ struct AggregateFunctionDistinctGenericData

set.merge(rhs.set);

uintptr_t merged_place = reinterpret_cast<uintptr_t>(&rhs);
auto find_place = std::find(merged_places.begin(), merged_places.end(), merged_place);
if (find_place == merged_places.end())
merged_places.emplace_back(merged_place);
if (move_rhs)
{
uintptr_t merged_place = reinterpret_cast<uintptr_t>(&rhs);
auto find_place = std::find(merged_places.begin(), merged_places.end(), merged_place);
if (find_place == merged_places.end())
merged_places.emplace_back(merged_place);
}
}

void serialize(WriteBuffer & buf) const
Expand Down Expand Up @@ -382,7 +388,7 @@ struct AggregateFunctionDistinctGenericDataWithoutArena
return argument_columns;
}

void merge(const AggregateFunctionDistinctGenericDataWithoutArena & rhs, Arena *)
void merge(const AggregateFunctionDistinctGenericDataWithoutArena & rhs, Arena *, bool move_rhs)
{
/// Deduplicate owned extra data based on rhs, also make sure it doesn't exist in rhs extra data
for (auto next = extra_data_since_last_finalize.begin(); next != extra_data_since_last_finalize.end();)
Expand Down Expand Up @@ -412,10 +418,13 @@ struct AggregateFunctionDistinctGenericDataWithoutArena

set.insert(rhs.set.begin(), rhs.set.end());

uintptr_t merged_place = reinterpret_cast<uintptr_t>(&rhs);
auto find_place = std::find(merged_places.begin(), merged_places.end(), merged_place);
if (find_place == merged_places.end())
merged_places.emplace_back(merged_place);
if (move_rhs)
{
uintptr_t merged_place = reinterpret_cast<uintptr_t>(&rhs);
auto find_place = std::find(merged_places.begin(), merged_places.end(), merged_place);
if (find_place == merged_places.end())
merged_places.emplace_back(merged_place);
}
}

void serialize(WriteBuffer & buf) const
Expand Down Expand Up @@ -488,7 +497,13 @@ class AggregateFunctionDistinct : public IAggregateFunctionDataHelper<Data, Aggr

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).merge(this->data(rhs), arena);
this->data(place).merge(this->data(rhs), arena, /*move_rhs=*/true);
nested_func->merge(getNestedPlace(place), getNestedPlace(rhs), arena);
}

void copy(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).merge(this->data(rhs), arena, /*move_rhs=*/false);
nested_func->merge(getNestedPlace(place), getNestedPlace(rhs), arena);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct AggregateFunctionDistinctRetractGenericData

AggregateFunctionDistinctRetractGenericData() : map(INTERNAL_MAP_SIZE) { }

void merge(const Self & rhs)
void merge(const Self & rhs, bool move_rhs)
{
for (auto next = extra_data_since_last_finalize.begin(); next != extra_data_since_last_finalize.end();)
{
Expand Down Expand Up @@ -68,10 +68,13 @@ struct AggregateFunctionDistinctRetractGenericData

map.merge(rhs.map);

uintptr_t merged_place = reinterpret_cast<uintptr_t>(&rhs);
auto find_place = std::find(merged_places.begin(), merged_places.end(), merged_place);
if (find_place == merged_places.end())
merged_places.emplace_back(merged_place);
if (move_rhs)
{
uintptr_t merged_place = reinterpret_cast<uintptr_t>(&rhs);
auto find_place = std::find(merged_places.begin(), merged_places.end(), merged_place);
if (find_place == merged_places.end())
merged_places.emplace_back(merged_place);
}
}

void serialize(WriteBuffer & buf) const
Expand Down Expand Up @@ -209,10 +212,16 @@ class AggregateFunctionDistinctRetract : public IAggregateFunctionDataHelper<Dat

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).merge(this->data(rhs));
this->data(place).merge(this->data(rhs), /*move_rhs=*/true);
nested_func->merge(getNestedPlace(place), getNestedPlace(rhs), arena);
}

void copy(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).merge(this->data(rhs), /*move_rhs=*/false);
nested_func->copy(getNestedPlace(place), getNestedPlace(rhs), arena);
}

void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
this->data(place).serialize(buf);
Expand Down
10 changes: 10 additions & 0 deletions src/AggregateFunctions/Streaming/AggregateFunctionUniq.h
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,11 @@ class AggregateFunctionUniq final : public IAggregateFunctionDataHelper<Data, Ag
detail::Adder<T, Data>::add(this->data(place), columns, num_args, row_num);
}

void ALWAYS_INLINE negate(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{
detail::Adder<T, Data>::negate(this->data(place), columns, num_args, row_num);
}

void ALWAYS_INLINE addBatchSinglePlace(
size_t row_begin,
size_t row_end,
Expand Down Expand Up @@ -520,6 +525,11 @@ class AggregateFunctionUniqVariadic final : public IAggregateFunctionDataHelper<
detail::Adder<T, Data>::add(this->data(place), columns, num_args, row_num);
}

void negate(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{
detail::Adder<T, Data>::negate(this->data(place), columns, num_args, row_num);
}

void addBatchSinglePlace(
size_t row_begin,
size_t row_end,
Expand Down
2 changes: 2 additions & 0 deletions src/Common/HybridHashTable/TimeBucketHybridHashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ class TimeBucketHybridHashTable
return metrics;
}

bool empty() const { return approximateCount() == 0; }

size_t approximateCount() const
{
size_t estimated_keys = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/Databases/DatabaseOrdinary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ void DatabaseOrdinary::loadTableFromMetadata(ContextMutablePtr local_context, co
}
catch (const Exception & ex)
{
LOG_INFO(log, "Failed to load table, error={} ddl={}", ex.message(), queryToString(create_query, true));
LOG_ERROR(log, "Failed to load table, error={} ddl={}", ex.message(), queryToString(create_query, true));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#include "MetadataStorageFromPlainObjectStorage.h"
#include <Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.h>

#include <Disks/IDisk.h>
#include <Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h>
#include <Disks/ObjectStorages/MetadataStorageFromPlainObjectStorageOperations.h>
#include <Disks/ObjectStorages/StaticDirectoryIterator.h>

#include <Common/filesystemHelpers.h>
#include <IO/Expect404ResponseScope.h>

#include <filesystem>
#include <tuple>
Expand Down
29 changes: 29 additions & 0 deletions src/IO/Expect404ResponseScope.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "Expect404ResponseScope.h"

#include <base/defines.h>

namespace DB
{

thread_local size_t expected_404_scope_count = 0;

Expect404ResponseScope::Expect404ResponseScope()
: initial_thread_id(std::this_thread::get_id())
{
++expected_404_scope_count;
}

Expect404ResponseScope::~Expect404ResponseScope()
{
// check that instance is destroyed in the same thread
chassert(initial_thread_id == std::this_thread::get_id());
chassert(expected_404_scope_count);
--expected_404_scope_count;
}

bool Expect404ResponseScope::is404Expected()
{
return expected_404_scope_count != 0;
}

}
23 changes: 23 additions & 0 deletions src/IO/Expect404ResponseScope.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#pragma once

#include <thread>

namespace DB
{

// Inside the inner scope
// Remote storage response with PATH_NOT_FOUND error is considered as expected
// No error logs are written, no profile events about errors are incremented
class Expect404ResponseScope
{
public:
Expect404ResponseScope();
~Expect404ResponseScope();

static bool is404Expected();

private:
std::thread::id initial_thread_id;
};

}
49 changes: 49 additions & 0 deletions src/IO/S3/AWSLogger.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#include <IO/S3/AWSLogger.h>
#include <IO/ReadHelpers.h>
#include <IO/Expect404ResponseScope.h>
#include <Poco/Net/HTTPResponse.h>

#if USE_AWS_MSK_IAM || USE_AWS_S3 /// proton: updated

Expand Down Expand Up @@ -54,13 +57,59 @@ Aws::Utils::Logging::LogLevel AWSLogger::GetLogLevel() const
return Aws::Utils::Logging::LogLevel::Info;
}

namespace
{
/// This function helps to avoid reading the whole str when strlen is called
bool startsWith(const char * str, const char * prefix)
{
while (*prefix && *str == *prefix)
{
++str;
++prefix;
}
return *prefix == 0;
}

bool is404Muted(const char * message)
{
/// This is the way, how to mute scary logs from `AWSXMLClient::BuildAWSError`
/// about 404 when 404 is the expected response
if (!Expect404ResponseScope::is404Expected())
return false;

static const char * prefix_str = "HTTP response code: ";
static const size_t prefix_len = strlen(prefix_str);

if (!startsWith(message, prefix_str))
return false;

const char * code_str = message + prefix_len;
size_t code_len = 3;

// check that strlen(code_str) >= code_len
for (size_t i = 0; i < code_len; ++i)
if (!code_str[i])
return false;

UInt64 code = 0;
if (!tryParse<UInt64>(code, code_str, code_len))
return false;

return code == Poco::Net::HTTPResponse::HTTP_NOT_FOUND;
}
}

void AWSLogger::Log(Aws::Utils::Logging::LogLevel log_level, const char * tag, const char * format_str, ...) // NOLINT
{
if (is404Muted(format_str))
return;
callLogImpl(log_level, tag, format_str); /// FIXME. Variadic arguments?
}

void AWSLogger::LogStream(Aws::Utils::Logging::LogLevel log_level, const char * tag, const Aws::OStringStream & message_stream)
{
if (is404Muted(message_stream.str().c_str()))
return;
callLogImpl(log_level, tag, message_stream.str().c_str());
}

Expand Down
5 changes: 3 additions & 2 deletions src/IO/S3/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <aws/core/utils/HashingUtils.h>
#include <aws/core/utils/logging/ErrorMacros.h>

#include <IO/Expect404ResponseScope.h>
#include <IO/S3Common.h>
#include <IO/S3/Requests.h>
#include <IO/S3/PocoHTTPClientFactory.h>
Expand Down Expand Up @@ -400,12 +401,12 @@ Model::UploadPartCopyOutcome Client::UploadPartCopy(const UploadPartCopyRequest

Model::DeleteObjectOutcome Client::DeleteObject(const DeleteObjectRequest & request) const
{
return doRequest(request, [this](const Model::DeleteObjectRequest & req) { return DeleteObject(req); });
return doRequest(request, [this](const Model::DeleteObjectRequest & req) { Expect404ResponseScope scope; return DeleteObject(req); });
}

Model::DeleteObjectsOutcome Client::DeleteObjects(const DeleteObjectsRequest & request) const
{
return doRequest(request, [this](const Model::DeleteObjectsRequest & req) { return DeleteObjects(req); });
return doRequest(request, [this](const Model::DeleteObjectsRequest & req) { Expect404ResponseScope scope; return DeleteObjects(req); });
}

Client::ComposeObjectOutcome Client::ComposeObject(const ComposeObjectRequest & request) const
Expand Down
13 changes: 11 additions & 2 deletions src/IO/S3/PocoHTTPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Common/Stopwatch.h>
#include <Common/Throttler.h>
#include <Common/re2.h>
#include <IO/Expect404ResponseScope.h>
#include <IO/HTTPCommon.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
Expand Down Expand Up @@ -504,8 +505,16 @@ void PocoHTTPClient::makeRequestInternalImpl(

int status_code = static_cast<int>(poco_response.getStatus());

if (enable_s3_requests_logging)
LOG_TEST(log, "Response status: {}, {}", status_code, poco_response.getReason());
if (status_code >= SUCCESS_RESPONSE_MIN && status_code <= SUCCESS_RESPONSE_MAX)
{
if (enable_s3_requests_logging)
LOG_TEST(log, "Response status: {}, {}", status_code, poco_response.getReason());
}
else if (Poco::Net::HTTPResponse::HTTP_NOT_FOUND != status_code || !Expect404ResponseScope::is404Expected())
{
/// Error statuses are more important so we show them even if `enable_s3_requests_logging == false`.
LOG_ERROR(log, "Response status: {}, {} HttpMethod={} URI={}", status_code, poco_response.getReason(), request.GetMethod(), request.GetURIString()); /// proton : updates
}

if (poco_response.getStatus() == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT)
{
Expand Down
Loading
Loading