From a758407010b894642f45d81d3b9d22c36fcd3494 Mon Sep 17 00:00:00 2001 From: AntiBargu <11840928@qq.com> Date: Mon, 8 Sep 2025 09:35:42 +0800 Subject: [PATCH 1/2] Add Cache --- trpc/util/cache/BUILD | 8 + trpc/util/cache/cache.h | 74 +++++ trpc/util/cache/decorators/BUILD | 43 +++ trpc/util/cache/decorators/fifo_cache.h | 241 +++++++++++++++++ trpc/util/cache/decorators/fifo_cache_test.cc | 186 +++++++++++++ trpc/util/cache/decorators/lru_cache.h | 252 ++++++++++++++++++ trpc/util/cache/decorators/lru_cache_test.cc | 185 +++++++++++++ trpc/util/cache/impl/BUILD | 22 ++ trpc/util/cache/impl/basic_cache.h | 93 +++++++ trpc/util/cache/impl/basic_cache_test.cc | 125 +++++++++ 10 files changed, 1229 insertions(+) create mode 100644 trpc/util/cache/BUILD create mode 100644 trpc/util/cache/cache.h create mode 100644 trpc/util/cache/decorators/BUILD create mode 100644 trpc/util/cache/decorators/fifo_cache.h create mode 100644 trpc/util/cache/decorators/fifo_cache_test.cc create mode 100644 trpc/util/cache/decorators/lru_cache.h create mode 100644 trpc/util/cache/decorators/lru_cache_test.cc create mode 100644 trpc/util/cache/impl/BUILD create mode 100644 trpc/util/cache/impl/basic_cache.h create mode 100644 trpc/util/cache/impl/basic_cache_test.cc diff --git a/trpc/util/cache/BUILD b/trpc/util/cache/BUILD new file mode 100644 index 00000000..84eb18d7 --- /dev/null +++ b/trpc/util/cache/BUILD @@ -0,0 +1,8 @@ +licenses(["notice"]) + +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "cache", + hdrs = ["cache.h"], +) diff --git a/trpc/util/cache/cache.h b/trpc/util/cache/cache.h new file mode 100644 index 00000000..2404e03c --- /dev/null +++ b/trpc/util/cache/cache.h @@ -0,0 +1,74 @@ +/* + * + * Tencent is pleased to support the open source community by making + * tRPC available. + * + * Copyright (C) 2025 Tencent. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include + +namespace trpc::cache { + +/// @brief Abstract base class for cache implementations. +/// +/// @tparam KeyType Type of the cache keys. +/// @tparam ValueType Type of the cache values. +/// @tparam HashFn Hash function for keys (default: std::hash). +/// @tparam KeyEqual Key equality comparator (default: std::equal_to). +/// @tparam Mutex Mutex type for synchronization (default: std::mutex). +template , + typename KeyEqual = std::equal_to> +class Cache { + public: + Cache() = default; + virtual ~Cache() = default; + + /// @brief Insert or update a key-value pair into the cache (copy semantics). + /// @param key The key to insert/update. + /// @param value The value to associate with the key. + /// @return true if insertion succeeded, false otherwise (e.g., key exists depending on impl). + virtual bool Put(const KeyType& key, const ValueType& value) = 0; + + /// @brief Insert or update a key-value pair into the cache (move semantics). + /// @param key The key to insert/update. + /// @param value The value to associate with the key (will be moved). + /// @return true if insertion succeeded, false otherwise (e.g., key exists depending on impl). + virtual bool Put(const KeyType& key, ValueType&& value) = 0; + + /// @brief Retrieves the value associated with the given key. + /// @param key The key to look up. + /// @return An optional containing the value if the key exists, std::nullopt otherwise. + virtual std::optional Get(const KeyType& key) = 0; + + /// @brief Removes the key-value pair associated with the given key. + /// @param key The key to remove. + /// @return true if the key was found and removed, false otherwise. + virtual bool Remove(const KeyType& key) = 0; + + /// @brief Clear all key-value pairs from the cache. + virtual void Clear() = 0; + + /// @brief Get the number of entries currently in the cache. + /// @return The current size of the cache. + virtual size_t Size() = 0; +}; + +} // namespace trpc::cache diff --git a/trpc/util/cache/decorators/BUILD b/trpc/util/cache/decorators/BUILD new file mode 100644 index 00000000..fc1e6332 --- /dev/null +++ b/trpc/util/cache/decorators/BUILD @@ -0,0 +1,43 @@ +# Description: trpc-cpp. + +licenses(["notice"]) + +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "fifo_cache", + hdrs = ["fifo_cache.h"], + deps = [ + "//trpc/util/cache", + ], +) + +cc_test( + name = "fifo_cache_test", + srcs = ["fifo_cache_test.cc"], + deps = [ + ":fifo_cache", + "//trpc/util/cache/impl:basic_cache", + "@com_google_googletest//:gtest", + "@com_google_googletest//:gtest_main", + ], +) + +cc_library( + name = "lru_cache", + hdrs = ["lru_cache.h"], + deps = [ + "//trpc/util/cache", + ], +) + +cc_test( + name = "lru_cache_test", + srcs = ["lru_cache_test.cc"], + deps = [ + ":lru_cache", + "//trpc/util/cache/impl:basic_cache", + "@com_google_googletest//:gtest", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/trpc/util/cache/decorators/fifo_cache.h b/trpc/util/cache/decorators/fifo_cache.h new file mode 100644 index 00000000..47d75d25 --- /dev/null +++ b/trpc/util/cache/decorators/fifo_cache.h @@ -0,0 +1,241 @@ +/* + * + * Tencent is pleased to support the open source community by making + * tRPC available. + * + * Copyright (C) 2025 Tencent. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include + +#include "trpc/util/cache/cache.h" + +namespace trpc::cache { + +/// @brief A FIFO (First-In-First-Out) Cache implementation that evicts the oldest entry upon exceeding capacity. +/// This class decorates another Cache implementation (e.g., BasicCache) and adds FIFO eviction logic. +/// Thread-safety is achieved using a mutex to guard all operations. +/// +/// @tparam KeyType Type of the cache keys. +/// @tparam ValueType Type of the cache values. +/// @tparam HashFn Hash function for keys (default: std::hash). +/// @tparam KeyEqual Key equality comparator (default: std::equal_to). +/// @tparam Mutex Mutex type for synchronization (default: std::mutex). +template , + typename KeyEqual = std::equal_to, typename Mutex = std::mutex> +class FIFOCache final : public Cache { + public: + /// @brief Constructs a FIFO Cache with a wrapped cache instance, maximum capacity, and number of shards. + /// @param cache The underlying cache implementation to decorate. + /// @param capacity Maximum number of entries allowed in the cache (default: 1024). + /// @param shards_num Number of shards for concurrent access (default: 32). + explicit FIFOCache(std::unique_ptr> cache, size_t capacity = 1024, + size_t shards_num = 32) + : capacity_(capacity), cache_(std::move(cache)) { + shards_num_ = RoundUpPowerOf2(shards_num); + shards_ = std::make_unique[]>(shards_num_); + // Calculate capacity per shard, ensuring at least 1 per shard + capacity_per_shard_ = (capacity_ + shards_num_ - 1) / shards_num_; + } + + ~FIFOCache() = default; + + // Disable copying + FIFOCache(const FIFOCache&) = delete; + FIFOCache& operator=(const FIFOCache&) = delete; + + // Disable moving + FIFOCache(FIFOCache&& other) = delete; + FIFOCache& operator=(FIFOCache&& other) = delete; + + /// @brief Insert or update a key-value pair into the cache (copy semantics). + /// @param key The key to insert or update. + /// @param value The value to associate with the key. + /// @return true if insertion succeeded, false otherwise (e.g., key exists). + bool Put(const KeyType& key, const ValueType& value) override { + size_t hash_code = HashFn()(key); + size_t idx = GetShardIdx(hash_code); + auto& shard = shards_[idx]; + + { + std::lock_guard lock(shard.mutex); + + // If key exists, update value in underlying cache but don't change FIFO order + if (auto it = shard.keys.find(key); it != shard.keys.end()) { + return cache_->Put(key, value); + } + + if (entries_num >= capacity_) { + // If shard is full, evict the oldest key (front of the queue) + const KeyType& oldest_key = shard.fifo_queue.front(); + shard.keys.erase(oldest_key); + cache_->Remove(oldest_key); + shard.fifo_queue.pop(); + } else { + // Increase key count for new entry + entries_num.fetch_add(1, std::memory_order_relaxed); + } + + // Add new key to the back of the queue + shard.fifo_queue.push(key); + // Just mark existence, no iterator needed for FIFO + shard.keys[key] = true; + } + + return cache_->Put(key, value); + } + + /// @brief Insert or update a key-value pair into the cache (move semantics). + /// @param key The key to insert or update. + /// @param value The value to associate with the key. + /// @return true if insertion succeeded, false otherwise (e.g., key exists). + bool Put(const KeyType& key, ValueType&& value) override { + size_t hash_code = HashFn()(key); + size_t idx = GetShardIdx(hash_code); + auto& shard = shards_[idx]; + + { + std::lock_guard lock(shard.mutex); + + // If key exists, update value in underlying cache but don't change FIFO order + if (auto it = shard.keys.find(key); it != shard.keys.end()) { + return cache_->Put(key, std::forward(value)); + } + if (size_t size = entries_num.load(std::memory_order_acquire); size >= capacity_) { + // If shard is full, evict the oldest key (front of the queue) + const KeyType& oldest_key = shard.fifo_queue.front(); + shard.keys.erase(oldest_key); + cache_->Remove(oldest_key); + shard.fifo_queue.pop(); + } else { + // Increase key count for new entry + entries_num.fetch_add(1, std::memory_order_relaxed); + } + + // Add new key to the back of the queue + shard.fifo_queue.push(key); + // Just mark existence, no iterator needed for FIFO + shard.keys[key] = true; + } + + return cache_->Put(key, std::forward(value)); + } + + /// @brief Retrieves the value associated with the given key. + /// @param key The key to look up. + /// @return An optional containing the value if the key exists, std::nullopt otherwise. + std::optional Get(const KeyType& key) override { + // For FIFO, getting a value doesn't change its position in the queue + return cache_->Get(key); + } + + /// @brief Removes the key-value pair associated with the given key. + /// @param key The key to remove. + /// @return true if the key was found and removed, false otherwise. + bool Remove(const KeyType& key) override { + size_t hash_code = HashFn()(key); + size_t idx = GetShardIdx(hash_code); + auto& shard = shards_[idx]; + + { + std::lock_guard lock(shard.mutex); + + if (auto it = shard.keys.find(key); it == shard.keys.end()) { + return false; + } + + shard.keys.erase(key); + } + entries_num.fetch_sub(1, std::memory_order_relaxed); + + return cache_->Remove(key); + } + + /// @brief Clear all key-value pairs from the cache. + void Clear() override { + for (size_t i = 0; i < shards_num_; ++i) { + auto& shard = shards_[i]; + std::lock_guard lock(shard.mutex); + // Clear FIFO queue and key map + while (!shard.fifo_queue.empty()) { + shard.fifo_queue.pop(); + } + shard.keys.clear(); + } + + entries_num.store(0, std::memory_order_relaxed); + cache_->Clear(); + } + + /// @brief Get the number of entries currently in the cache. + /// @return The current size of the cache. + size_t Size() override { return entries_num.load(std::memory_order_acquire); } + + private: + template + struct alignas(64) Shard { + M mutex; + std::queue fifo_queue; + std::unordered_map keys; + + Shard() = default; + ~Shard() = default; + + // Disable copying + Shard(const Shard&) = delete; + Shard& operator=(const Shard&) = delete; + + // Disable moving + Shard(Shard&&) = delete; + Shard& operator=(Shard&&) = delete; + }; + + size_t GetShardIdx(size_t hash_code) const { return hash_code & (shards_num_ - 1); } + + size_t RoundUpPowerOf2(size_t n) { + bool size_is_power_of_2 = (n >= 1) && ((n & (n - 1)) == 0); + if (!size_is_power_of_2) { + uint64_t tmp = 1; + while (tmp <= n) { + tmp <<= 1; + } + n = tmp; + } + return n; + } + + private: + // Maximum number of entries allowed. + size_t capacity_; + // The underlying cache implementation. + std::unique_ptr> cache_; + // Number of shards (always power of two). + size_t shards_num_; + // Capacity per individual shard. + size_t capacity_per_shard_; + // Atomic counter for the total number of entries. + std::atomic entries_num{0}; + // Array of shards + std::unique_ptr[]> shards_; +}; + +} // namespace trpc::cache diff --git a/trpc/util/cache/decorators/fifo_cache_test.cc b/trpc/util/cache/decorators/fifo_cache_test.cc new file mode 100644 index 00000000..fdbf36a9 --- /dev/null +++ b/trpc/util/cache/decorators/fifo_cache_test.cc @@ -0,0 +1,186 @@ +/* + * + * Tencent is pleased to support the open source community by making + * tRPC available. + * + * Copyright (C) 2025 Tencent. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "trpc/util/cache/decorators/fifo_cache.h" +#include "trpc/util/cache/impl/basic_cache.h" + +#include "gtest/gtest.h" + +namespace trpc::cache::testing { + +/// @brief Test basic Put/Get operations and size tracking. +/// @note Verifies fundamental cache operations including insertion, retrieval, +/// and size maintenance functionality. +TEST(FIFOCacheTest, BasicPutGetSize) { + auto basic_cache = std::make_unique>(); + FIFOCache fifo_cache(std::move(basic_cache)); + + ASSERT_EQ(fifo_cache.Size(), 0); + ASSERT_TRUE(fifo_cache.Put(1, 1)); + ASSERT_EQ(fifo_cache.Get(1), 1); + ASSERT_EQ(fifo_cache.Size(), 1); +} + +/// @brief Test handling of duplicate key insertion. +/// @note Verifies that inserting existing keys updates values correctly +/// without affecting cache size. +TEST(FIFOCacheTest, PutDuplicateKey) { + auto basic_cache = std::make_unique>(); + FIFOCache fifo_cache(std::move(basic_cache)); + + ASSERT_TRUE(fifo_cache.Put(1, 1)); + ASSERT_FALSE(fifo_cache.Put(1, 2)); + ASSERT_EQ(fifo_cache.Size(), 1); + ASSERT_EQ(fifo_cache.Get(1), 2); +} + +/// @brief Test removal of existing keys. +/// @note Verifies successful removal of existing entries and subsequent cache state. +TEST(FIFOCacheTest, RemoveExistingKey) { + auto basic_cache = std::make_unique>(); + FIFOCache fifo_cache(std::move(basic_cache)); + + fifo_cache.Put(1, 1); + ASSERT_TRUE(fifo_cache.Remove(1)); + ASSERT_EQ(fifo_cache.Get(1), std::nullopt); + ASSERT_EQ(fifo_cache.Size(), 0); +} + +/// @brief Test removal of non-existent keys. +/// @note Verifies cache behavior when attempting to remove non-existing keys. +TEST(FIFOCacheTest, RemoveNonExistingKey) { + auto basic_cache = std::make_unique>(); + FIFOCache fifo_cache(std::move(basic_cache)); + + ASSERT_FALSE(fifo_cache.Remove(1)); + ASSERT_EQ(fifo_cache.Size(), 0); +} + +/// @brief Test complete cache clearance. +/// @note Verifies that Clear() operation removes all entries and resets cache state. +TEST(FIFOCacheTest, ClearCache) { + auto basic_cache = std::make_unique>(); + FIFOCache fifo_cache(std::move(basic_cache)); + + fifo_cache.Put(1, 1); + fifo_cache.Put(2, 2); + fifo_cache.Clear(); + + ASSERT_EQ(fifo_cache.Size(), 0); + ASSERT_EQ(fifo_cache.Get(1), std::nullopt); + ASSERT_EQ(fifo_cache.Get(2), std::nullopt); +} + +/// @brief Test FIFO eviction policy when capacity is exceeded. +/// @note Verifies that oldest entries are evicted according to FIFO policy when cache reaches capacity. +TEST(FIFOCacheTest, EvictOldestKeyWhenFull) { + auto basic_cache = std::make_unique>(); + FIFOCache fifo_cache(std::move(basic_cache), 3, 1); + + // Insert 3 entries + fifo_cache.Put(1, 1); + fifo_cache.Put(2, 2); + fifo_cache.Put(3, 3); + + // Access key 1 - should not change FIFO order + ASSERT_EQ(fifo_cache.Get(1), 1); + + // Insert 4th item - should evict oldest (key 1) + fifo_cache.Put(4, 4); + + // Verify eviction + ASSERT_EQ(fifo_cache.Get(1), std::nullopt); // Oldest key (1) should be evicted + ASSERT_EQ(fifo_cache.Get(2), 2); // Should remain + ASSERT_EQ(fifo_cache.Get(3), 3); // Should remain + ASSERT_EQ(fifo_cache.Get(4), 4); // Newly added should remain + ASSERT_EQ(fifo_cache.Size(), 3); +} + +/// @brief Test concurrent Put operations under high contention. +/// @note Verifies that concurrent operations maintain data consistency. +TEST(FIFOCacheTest, ConcurrentPut) { + constexpr int capacity = 1000; + auto basic_cache = std::make_unique>(); + FIFOCache fifo_cache(std::move(basic_cache), capacity, 32); + + constexpr int threads_num = 8; + constexpr int puts_per_thread = 200; + constexpr int puts_num = threads_num * puts_per_thread; + + std::vector threads; + std::atomic key_gen{0}; + + for (int i = 0; i < threads_num; ++i) { + threads.emplace_back([&]() { + for (int j = 0; j < puts_per_thread; ++j) { + int key = key_gen.fetch_add(1, std::memory_order_relaxed); + fifo_cache.Put(key, key); + } + }); + } + for (auto& t : threads) t.join(); + + int hits{0}, misses{0}; + + for (int key = 0; key < puts_num; ++key) { + if (fifo_cache.Get(key) == std::nullopt) { + ++misses; + } else { + ++hits; + } + } + + ASSERT_EQ(hits, capacity); + ASSERT_EQ(misses, puts_num - capacity); +} + +/// @brief Test concurrent Get operations under high contention. +/// @note Verifies that concurrent operations maintain data consistency. +TEST(FIFOCacheTest, ConcurrentGet) { + const int capacity = 1000; + auto basic_cache = std::make_unique>(); + FIFOCache fifo_cache(std::move(basic_cache), capacity); + + for (int key = 0; key < capacity; ++key) { + fifo_cache.Put(key, key); + } + + constexpr int threads_num = 8; + constexpr int gets_per_thread = 200; + std::vector threads; + + for (int i = 0; i < threads_num; ++i) { + threads.emplace_back([&, i]() { + for (int j = 0; j < gets_per_thread; ++j) { + int key = j + i * gets_per_thread; + if (key < capacity) { + ASSERT_EQ(fifo_cache.Get(key), key); + } else { + ASSERT_EQ(fifo_cache.Get(key), std::nullopt); + } + } + }); + } + for (auto& t : threads) t.join(); +} + +} // namespace trpc::cache::testing diff --git a/trpc/util/cache/decorators/lru_cache.h b/trpc/util/cache/decorators/lru_cache.h new file mode 100644 index 00000000..a9b35f49 --- /dev/null +++ b/trpc/util/cache/decorators/lru_cache.h @@ -0,0 +1,252 @@ +/* + * + * Tencent is pleased to support the open source community by making + * tRPC available. + * + * Copyright (C) 2025 Tencent. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include + +#include "trpc/util/cache/cache.h" + +namespace trpc::cache { + +/// @brief A LRU (Least Recently Used) Cache implementation that evicts the least recently used entry upon exceeding +/// capacity. +/// This class decorates another Cache implementation (e.g., BasicCache) and adds LRU eviction logic. +/// Thread-safety is achieved using a mutex to guard all operations. +/// +/// @tparam KeyType Type of the cache keys. +/// @tparam ValueType Type of the cache values. +/// @tparam HashFn Hash function for keys (default: std::hash). +/// @tparam KeyEqual Key equality comparator (default: std::equal_to). +/// @tparam Mutex Mutex type for synchronization (default: std::mutex). +template , + typename KeyEqual = std::equal_to, typename Mutex = std::mutex> +class LRUCache final : public Cache { + public: + /// @brief Constructs a LRU Cache with a wrapped cache instance, maximum capacity, and number of shards. + /// @param cache The underlying cache implementation to decorate. + /// @param capacity Maximum number of entries allowed in the cache (default: 1024). + /// @param num_shards Number of shards for concurrent access (default: 32). + explicit LRUCache(std::unique_ptr> cache, size_t capacity = 1024, + size_t shards_num = 32) + : capacity_(capacity), cache_(std::move(cache)) { + shards_num_ = RoundUpPowerOf2(shards_num); + shards_ = std::make_unique[]>(shards_num_); + // Calculate capacity per shard, ensuring at least 1 per shard + capacity_per_shard_ = (capacity_ + shards_num_ - 1) / shards_num_; + } + + ~LRUCache() = default; + + // Disable copying + LRUCache(const LRUCache&) = delete; + LRUCache& operator=(const LRUCache&) = delete; + + // Disable moving + LRUCache(LRUCache&& other) = delete; + LRUCache& operator=(LRUCache&& other) = delete; + + /// @brief Insert or update a key-value pair into the cache (copy semantics). + /// @param key The key to insert or update. + /// @param value The value to associate with the key. + /// @return true if insertion succeeded, false otherwise (e.g., key exists). + bool Put(const KeyType& key, const ValueType& value) override { + size_t hash_code = HashFn()(key); + size_t idx = GetShardIdx(hash_code); + auto& shard = shards_[idx]; + + { + std::lock_guard lock(shard.mutex); + + // If key exists, move it to front and update value in underlying cache. + if (auto it = shard.keys.find(key); it != shard.keys.end()) { + shard.lru_list.splice(shard.lru_list.begin(), shard.lru_list, it->second); + return cache_->Put(key, value); + } + + if (entries_num >= capacity_) { + // If shard is full, evict the oldest key. + const KeyType& oldest_key = shard.lru_list.back(); + shard.keys.erase(oldest_key); + cache_->Remove(oldest_key); + + shard.lru_list.back() = key; + shard.lru_list.splice(shard.lru_list.begin(), shard.lru_list, std::prev(shard.lru_list.end())); + } else { + // Insert new key at front. + shard.lru_list.emplace_front(key); + entries_num.fetch_add(1, std::memory_order_relaxed); + } + shard.keys[key] = shard.lru_list.begin(); + } + + return cache_->Put(key, value); + } + + /// @brief Insert or update a key-value pair into the cache (move semantics). + /// @param key The key to insert or update. + /// @param value The value to associate with the key. + /// @return true if insertion succeeded, false otherwise (e.g., key exists). + bool Put(const KeyType& key, ValueType&& value) override { + size_t hash_code = HashFn()(key); + size_t idx = GetShardIdx(hash_code); + auto& shard = shards_[idx]; + + { + std::lock_guard lock(shard.mutex); + + // If key exists, move it to front and update value in underlying cache. + if (auto it = shard.keys.find(key); it != shard.keys.end()) { + shard.lru_list.splice(shard.lru_list.begin(), shard.lru_list, it->second); + return cache_->Put(key, std::forward(value)); + } + + if (size_t size = entries_num.load(std::memory_order_acquire); size >= capacity_) { + // If shard is full, evict the oldest key. + const KeyType& oldest_key = shard.lru_list.back(); + shard.keys.erase(oldest_key); + cache_->Remove(oldest_key); + + shard.lru_list.back() = key; + shard.lru_list.splice(shard.lru_list.begin(), shard.lru_list, std::prev(shard.lru_list.end())); + } else { + // Insert new key at front. + shard.lru_list.emplace_front(key); + entries_num.fetch_add(1, std::memory_order_relaxed); + } + shard.keys[key] = shard.lru_list.begin(); + } + + return cache_->Put(key, std::forward(value)); + } + + /// @brief Retrieves the value associated with the given key. + /// @param key The key to look up. + /// @return An optional containing the value if the key exists, std::nullopt otherwise. + std::optional Get(const KeyType& key) override { + size_t hash_code = HashFn()(key); + size_t idx = GetShardIdx(hash_code); + auto& shard = shards_[idx]; + + std::lock_guard lock(shard.mutex); + + if (auto it = shard.keys.find(key); it != shard.keys.end()) { + shard.lru_list.splice(shard.lru_list.begin(), shard.lru_list, it->second); + return cache_->Get(key); + } + + return std::nullopt; + } + + /// @brief Removes the key-value pair associated with the given key. + /// @param key The key to remove. + /// @return true if the key was found and removed, false otherwise. + bool Remove(const KeyType& key) override { + size_t hash_code = HashFn()(key); + size_t idx = GetShardIdx(hash_code); + auto& shard = shards_[idx]; + + { + std::lock_guard lock(shard.mutex); + + auto it = shard.keys.find(key); + if (it == shard.keys.end()) { + return false; + } + + shard.lru_list.erase(it->second); + shard.keys.erase(key); + } + entries_num.fetch_sub(1, std::memory_order_relaxed); + + return cache_->Remove(key); + } + + /// @brief Clear all key-value pairs from the cache. + void Clear() override { + for (size_t i = 0; i < shards_num_; ++i) { + auto& shard = shards_[i]; + std::lock_guard lock(shard.mutex); + shard.lru_list.clear(); + shard.keys.clear(); + } + entries_num.store(0, std::memory_order_relaxed); + cache_->Clear(); + } + + /// @brief Get the number of entries currently in the cache. + /// @return The current size of the cache. + size_t Size() override { return entries_num.load(std::memory_order_acquire); } + + private: + template + struct alignas(64) Shard { + M mutex; + std::list lru_list; + std::unordered_map::iterator> keys; + + Shard() = default; + ~Shard() = default; + + // Disable copying + Shard(const Shard&) = delete; + Shard& operator=(const Shard&) = delete; + + // Disable moving + Shard(Shard&&) = delete; + Shard& operator=(Shard&&) = delete; + }; + + size_t GetShardIdx(size_t hash_code) const { return hash_code & (shards_num_ - 1); } + + size_t RoundUpPowerOf2(size_t n) { + bool size_is_power_of_2 = (n >= 1) && ((n & (n - 1)) == 0); + if (!size_is_power_of_2) { + uint64_t tmp = 1; + while (tmp <= n) { + tmp <<= 1; + } + + n = tmp; + } + return n; + } + + private: + // Maximum number of entries allowed. + size_t capacity_; + // The underlying cache implementation. + std::unique_ptr> cache_; + // Number of shards (always power of two). + size_t shards_num_; + // Capacity per individual shard. + size_t capacity_per_shard_; + // Atomic counter for the total number of entries. + std::atomic entries_num{0}; + // Array of shards + std::unique_ptr[]> shards_; +}; + +} // namespace trpc::cache \ No newline at end of file diff --git a/trpc/util/cache/decorators/lru_cache_test.cc b/trpc/util/cache/decorators/lru_cache_test.cc new file mode 100644 index 00000000..2e2e675b --- /dev/null +++ b/trpc/util/cache/decorators/lru_cache_test.cc @@ -0,0 +1,185 @@ +/* + * + * Tencent is pleased to support the open source community by making + * tRPC available. + * + * Copyright (C) 2025 Tencent. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "trpc/util/cache/decorators/lru_cache.h" +#include "trpc/util/cache/impl/basic_cache.h" + +#include "gtest/gtest.h" + +namespace trpc::cache::testing { + +/// @brief Test basic Put/Get operations and size tracking. +/// @note Verifies fundamental cache operations including insertion, retrieval, +/// and size maintenance functionality. +TEST(LRUCacheTest, BasicPutGetSize) { + auto basic_cache = std::make_unique>(); + LRUCache lru_cache(std::move(basic_cache)); + + ASSERT_EQ(lru_cache.Size(), 0); + ASSERT_TRUE(lru_cache.Put(1, 1)); + ASSERT_EQ(lru_cache.Get(1), 1); + ASSERT_EQ(lru_cache.Size(), 1); +} + +/// @brief Test handling of duplicate key insertion. +/// @note Verifies that inserting existing keys updates values correctly +/// without affecting cache size. +TEST(LRUCacheTest, PutDuplicateKey) { + auto basic_cache = std::make_unique>(); + LRUCache lru_cache(std::move(basic_cache)); + + ASSERT_TRUE(lru_cache.Put(1, 1)); + ASSERT_FALSE(lru_cache.Put(1, 2)); + ASSERT_EQ(lru_cache.Size(), 1); + ASSERT_EQ(lru_cache.Get(1), 2); +} + +/// @brief Test removal of existing keys. +/// @note Verifies successful removal of existing entries and subsequent cache state. +TEST(LRUCacheTest, RemoveExistingKey) { + auto basic_cache = std::make_unique>(); + LRUCache lru_cache(std::move(basic_cache)); + + lru_cache.Put(1, 1); + ASSERT_TRUE(lru_cache.Remove(1)); + ASSERT_EQ(lru_cache.Get(1), std::nullopt); + ASSERT_EQ(lru_cache.Size(), 0); +} + +/// @brief Test removal of non-existent keys. +/// @note Verifies cache behavior when attempting to remove non-existing keys. +TEST(LRUCacheTest, RemoveNonExistingKey) { + auto basic_cache = std::make_unique>(); + LRUCache lru_cache(std::move(basic_cache)); + + ASSERT_FALSE(lru_cache.Remove(1)); + ASSERT_EQ(lru_cache.Size(), 0); +} + +/// @brief Test complete cache clearance. +/// @note Verifies that Clear() operation removes all entries and resets cache state. +TEST(LRUCacheTest, ClearCache) { + auto basic_cache = std::make_unique>(); + LRUCache lru_cache(std::move(basic_cache)); + + lru_cache.Put(1, 1); + lru_cache.Put(2, 2); + lru_cache.Clear(); + + ASSERT_EQ(lru_cache.Size(), 0); + ASSERT_EQ(lru_cache.Get(1), std::nullopt); + ASSERT_EQ(lru_cache.Get(2), std::nullopt); +} + +/// @brief Test LRU eviction policy when capacity is exceeded. +/// @note Verifies that least recently used entries are evicted according to LRU policy when cache reaches capacity. +TEST(LRUCacheTest, EvictLeastRecentlyUsedKeyWhenFull) { + auto basic_cache = std::make_unique>(); + LRUCache lru_cache(std::move(basic_cache), 3, 1); + + // Insert 3 entries + lru_cache.Put(1, 1); + lru_cache.Put(2, 2); + lru_cache.Put(3, 3); + + // Access key 1 to make it most recently used + ASSERT_EQ(lru_cache.Get(1), 1); + + // Insert 4th item - should evict least recently used (key 2) + lru_cache.Put(4, 4); + + // Verify eviction + ASSERT_EQ(lru_cache.Get(2), std::nullopt); // Least recently used (key 2) should be evicted + ASSERT_EQ(lru_cache.Get(1), 1); // Most recently used (key 1) should remain + ASSERT_EQ(lru_cache.Get(3), 3); // Middle used (key 3) should remain + ASSERT_EQ(lru_cache.Get(4), 4); // Newly added (key 4) should remain + ASSERT_EQ(lru_cache.Size(), 3); +} + +/// @brief Test concurrent Put operations under high contention. +/// @note Verifies that concurrent operations maintain data consistency. +TEST(LRUCacheTest, ConcurrentPut) { + constexpr int capacity = 1000; + auto basic_cache = std::make_unique>(); + LRUCache lru_cache(std::move(basic_cache), capacity, 32); + + constexpr int threads_num = 8; + constexpr int puts_per_thread = 200; + constexpr int puts_num = threads_num * puts_per_thread; + std::vector threads; + std::atomic key_gen{0}; + + for (int i = 0; i < threads_num; ++i) { + threads.emplace_back([&]() { + for (int j = 0; j < puts_per_thread; ++j) { + int key = key_gen.fetch_add(1, std::memory_order_relaxed); + lru_cache.Put(key, key); + } + }); + } + for (auto& t : threads) t.join(); + + int hits{0}, misses{0}; + + for (int key = 0; key < puts_num; ++key) { + if (lru_cache.Get(key) == std::nullopt) { + ++misses; + } else { + ++hits; + } + } + + ASSERT_EQ(hits, capacity); + ASSERT_EQ(misses, puts_num - capacity); +} + +/// @brief Test concurrent Get operations under high contention. +/// @note Verifies that concurrent operations maintain data consistency. +TEST(LRUCacheTest, ConcurrentGet) { + const int capacity = 1000; + auto basic_cache = std::make_unique>(); + LRUCache lru_cache(std::move(basic_cache), capacity); + + for (int key = 0; key < capacity; ++key) { + lru_cache.Put(key, key); + } + + constexpr int threads_num = 8; + constexpr int gets_per_thread = 200; + std::vector threads; + + for (int i = 0; i < threads_num; ++i) { + threads.emplace_back([&, i]() { + for (int j = 0; j < gets_per_thread; ++j) { + int key = j + i * gets_per_thread; + if (key < capacity) { + ASSERT_EQ(lru_cache.Get(key), key); + } else { + ASSERT_EQ(lru_cache.Get(key), std::nullopt); + } + } + }); + } + for (auto& t : threads) t.join(); +} + +} // namespace trpc::cache::testing diff --git a/trpc/util/cache/impl/BUILD b/trpc/util/cache/impl/BUILD new file mode 100644 index 00000000..91f1cf8a --- /dev/null +++ b/trpc/util/cache/impl/BUILD @@ -0,0 +1,22 @@ +licenses(["notice"]) + +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "basic_cache", + hdrs = ["basic_cache.h"], + deps = [ + "//trpc/util/cache", + "//trpc/util/concurrency:lightly_concurrent_hashmap", + ], +) + +cc_test( + name = "basic_cache_test", + srcs = ["basic_cache_test.cc"], + deps = [ + ":basic_cache", + "@com_google_googletest//:gtest", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/trpc/util/cache/impl/basic_cache.h b/trpc/util/cache/impl/basic_cache.h new file mode 100644 index 00000000..f9b588ec --- /dev/null +++ b/trpc/util/cache/impl/basic_cache.h @@ -0,0 +1,93 @@ +/* + * + * Tencent is pleased to support the open source community by making + * tRPC available. + * + * Copyright (C) 2025 Tencent. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include "trpc/util/cache/cache.h" +#include "trpc/util/concurrency/lightly_concurrent_hashmap.h" + +namespace trpc::cache { + +/// @brief Basic thread-safe cache implementation using LightlyConcurrentHashMap. +/// +/// @tparam KeyType Type of the cache keys. +/// @tparam ValueType Type of the cache values. +/// @tparam HashFn Hash function for keys (default: std::hash). +/// @tparam KeyEqual Key equality comparator (default: std::equal_to). +template , + typename KeyEqual = std::equal_to> +class BasicCache final : public Cache { + public: + BasicCache() = default; + ~BasicCache() = default; + + BasicCache(const BasicCache&) = delete; + BasicCache& operator=(const BasicCache&) = delete; + + BasicCache(BasicCache&& other) noexcept = default; + BasicCache& operator=(BasicCache&& other) noexcept = default; + + /// @brief Insert or update a key-value pair into the cache (copy semantics). + /// @param key The key to insert or update. + /// @param value The value to associate with the key. + /// @return true if insertion succeeded, false otherwise (e.g., key exists). + bool Put(const KeyType& key, const ValueType& value) override { return cache_map_.InsertOrAssign(key, value); } + + /// @brief Insert or update a key-value pair into the cache (move semantics). + /// @param key The key to insert or update. + /// @param value The value to associate with the key. + /// @return true if insertion succeeded, false otherwise (e.g., key exists). + bool Put(const KeyType& key, ValueType&& value) override { + return cache_map_.InsertOrAssign(key, std::forward(value)); + } + + /// @brief Retrieves the value associated with the given key. + /// @param key The key to look up. + /// @return An optional containing the value if the key exists, std::nullopt otherwise. + std::optional Get(const KeyType& key) override { + ValueType value; + + if (cache_map_.Get(key, value)) { + return value; + } + + return std::nullopt; + } + + /// @brief Removes the key-value pair associated with the given key. + /// @param key The key to remove. + /// @return true if the key was found and removed, false otherwise. + bool Remove(const KeyType& key) override { return cache_map_.Erase(key); } + + /// @brief Clear all key-value pairs from the cache. + void Clear() override { cache_map_.Clear(); } + + /// @brief Get the number of entries currently in the cache. + /// @return The current size of the cache. + size_t Size() override { return cache_map_.Size(); } + + private: + /// @brief The underlying concurrent hash map that stores the actual key-value data. + concurrency::LightlyConcurrentHashMap cache_map_; +}; + +} // namespace trpc::cache diff --git a/trpc/util/cache/impl/basic_cache_test.cc b/trpc/util/cache/impl/basic_cache_test.cc new file mode 100644 index 00000000..bffbc04d --- /dev/null +++ b/trpc/util/cache/impl/basic_cache_test.cc @@ -0,0 +1,125 @@ +/* + * + * Tencent is pleased to support the open source community by making + * tRPC available. + * + * Copyright (C) 2025 Tencent. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "trpc/util/cache/impl/basic_cache.h" +#include "gtest/gtest.h" + +namespace trpc::cache::testing { + +/// @brief Test basic Put/Get operations and size tracking. +/// @note Verifies fundamental cache operations including insertion, retrieval, +/// and size maintenance functionality. +TEST(BasicCacheTest, BasicPutGetSize) { + BasicCache basic_cache; + + ASSERT_EQ(basic_cache.Size(), 0); + ASSERT_TRUE(basic_cache.Put(1, 1)); + ASSERT_EQ(basic_cache.Get(1), 1); + ASSERT_EQ(basic_cache.Size(), 1); + + // Test updating existing key + ASSERT_FALSE(basic_cache.Put(1, 2)); + ASSERT_EQ(basic_cache.Get(1), 2); + ASSERT_EQ(basic_cache.Size(), 1); // Size should not change on update +} + +/// @brief Test handling of duplicate key insertion. +/// @note Verifies that inserting existing keys updates values correctly +/// without affecting cache size. +TEST(BasicCacheTest, PutDuplicateKey) { + BasicCache basic_cache; + + ASSERT_TRUE(basic_cache.Put(1, 1)); + ASSERT_FALSE(basic_cache.Put(1, 2)); + ASSERT_EQ(basic_cache.Get(1), 2); + ASSERT_EQ(basic_cache.Size(), 1); +} + +/// @brief Test removal of existing keys. +/// @note Verifies successful removal of existing items and subsequent cache state. +TEST(BasicCacheTest, RemoveExistingKey) { + BasicCache basic_cache; + + basic_cache.Put(1, 1); + ASSERT_TRUE(basic_cache.Remove(1)); + ASSERT_EQ(basic_cache.Get(1), std::nullopt); + ASSERT_EQ(basic_cache.Size(), 0); +} + +/// @brief Test removal of non-existent keys. +/// @note Verifies cache behavior when attempting to remove non-existing keys. +TEST(BasicCacheTest, RemoveNonExistingKey) { + BasicCache basic_cache; + + ASSERT_FALSE(basic_cache.Remove(1)); + ASSERT_EQ(basic_cache.Size(), 0); +} + +/// @brief Test complete cache clearance. +/// @note Verifies that Clear() operation removes all items and resets cache state. +TEST(BasicCacheTest, ClearCache) { + BasicCache basic_cache; + + basic_cache.Put(1, 1); + basic_cache.Put(2, 2); + basic_cache.Clear(); + + ASSERT_EQ(basic_cache.Size(), 0); + ASSERT_EQ(basic_cache.Get(1), std::nullopt); + ASSERT_EQ(basic_cache.Get(2), std::nullopt); +} + +/// @brief Test concurrent Put operations under high contention. +/// @note Verifies that concurrent operations maintain data consistency. +TEST(BasicCacheTest, ConcurrentPut) { + BasicCache basic_cache; + constexpr int threads_num = 8; + constexpr int puts_per_thread = 200; + constexpr int puts_num = threads_num * puts_per_thread; + std::vector threads; + std::atomic key_gen{0}; + + for (int i = 0; i < threads_num; ++i) { + threads.emplace_back([&]() { + for (int j = 0; j < puts_per_thread; ++j) { + int key = key_gen.fetch_add(1, std::memory_order_relaxed); + basic_cache.Put(key, key); + } + }); + } + for (auto& t : threads) t.join(); + + int hits{0}, misses{0}; + + for (int key = 0; key < puts_num; ++key) { + if (basic_cache.Get(key) == std::nullopt) { + ++misses; + } else { + ++hits; + } + } + + ASSERT_EQ(hits, puts_num); + ASSERT_EQ(misses, 0); +} + +} // namespace trpc::cache::testing From 82055f5402d95a876a839e2af6d5a9d935e1d22d Mon Sep 17 00:00:00 2001 From: AntiBargu <11840928@qq.com> Date: Tue, 9 Sep 2025 12:49:38 +0800 Subject: [PATCH 2/2] Add Scheduled Cache --- trpc/util/cache/decorators/BUILD | 19 ++ trpc/util/cache/decorators/scheduled_cache.h | 248 ++++++++++++++++++ .../cache/decorators/scheduled_cache_test.cc | 199 ++++++++++++++ 3 files changed, 466 insertions(+) create mode 100644 trpc/util/cache/decorators/scheduled_cache.h create mode 100644 trpc/util/cache/decorators/scheduled_cache_test.cc diff --git a/trpc/util/cache/decorators/BUILD b/trpc/util/cache/decorators/BUILD index fc1e6332..1f92d860 100644 --- a/trpc/util/cache/decorators/BUILD +++ b/trpc/util/cache/decorators/BUILD @@ -41,3 +41,22 @@ cc_test( "@com_google_googletest//:gtest_main", ], ) + +cc_library( + name = "scheduled_cache", + hdrs = ["scheduled_cache.h"], + deps = [ + "//trpc/util/cache", + ], +) + +cc_test( + name = "scheduled_cache_test", + srcs = ["scheduled_cache_test.cc"], + deps = [ + ":scheduled_cache", + "//trpc/util/cache/impl:basic_cache", + "@com_google_googletest//:gtest", + "@com_google_googletest//:gtest_main", + ], +) \ No newline at end of file diff --git a/trpc/util/cache/decorators/scheduled_cache.h b/trpc/util/cache/decorators/scheduled_cache.h new file mode 100644 index 00000000..7334c295 --- /dev/null +++ b/trpc/util/cache/decorators/scheduled_cache.h @@ -0,0 +1,248 @@ +/* + * + * Tencent is pleased to support the open source community by making + * tRPC available. + * + * Copyright (C) 2025 Tencent. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "trpc/util/cache/cache.h" + +namespace trpc::cache { + +/// @brief A TTL (Time-To-Live) Cache implementation that automatically evicts entries after expiration. +/// This class decorates another Cache implementation and adds TTL expiration logic. +/// Uses sharded mutexes for concurrent access and lazy expiration cleanup. +/// +/// @tparam KeyType Type of the cache keys. +/// @tparam ValueType Type of the cache values. +/// @tparam HashFn Hash function for keys (default: std::hash). +/// @tparam KeyEqual Key equality comparator (default: std::equal_to). +/// @tparam Mutex Mutex type for synchronization (default: std::mutex). +template , + typename KeyEqual = std::equal_to, typename Mutex = std::mutex> +class ScheduledCache final : public Cache { + public: + /// @brief Constructs a TTL Cache with a wrapped cache instance and number of shards. + /// @param cache The underlying cache implementation to decorate. + /// @param default_ttl Default TTL duration for entries in milliseconds (default: 60000ms). + /// @param shards_num Number of shards for concurrent access (default: 32). + explicit ScheduledCache(std::unique_ptr> cache, + std::chrono::milliseconds default_ttl = std::chrono::milliseconds(60000), + size_t shards_num = 32) + : default_ttl_(default_ttl), cache_(std::move(cache)) { + shards_num_ = RoundUpPowerOf2(shards_num); + shards_ = std::make_unique[]>(shards_num_); + } + + ~ScheduledCache() = default; + + // Disable copying + ScheduledCache(const ScheduledCache&) = delete; + ScheduledCache& operator=(const ScheduledCache&) = delete; + + // Disable moving + ScheduledCache(ScheduledCache&& other) = delete; + ScheduledCache& operator=(ScheduledCache&& other) = delete; + + /// @brief Insert or update a key-value pair with default TTL (copy semantics). + /// @param key The key to insert or update. + /// @param value The value to associate with the key. + /// @return true if insertion succeeded, false otherwise. + bool Put(const KeyType& key, const ValueType& value) override { return Put(key, value, default_ttl_); } + + /// @brief Insert or update a key-value pair with default TTL (move semantics). + /// @param key The key to insert or update. + /// @param value The value to associate with the key (will be moved). + /// @return true if insertion succeeded, false otherwise. + bool Put(const KeyType& key, ValueType&& value) override { + return Put(key, std::forward(value), default_ttl_); + } + + /// @brief Insert or update a key-value pair with custom TTL (copy semantics). + /// @param key The key to insert or update. + /// @param value The value to associate with the key. + /// @param ttl Custom TTL duration for this entry. + /// @return true if insertion succeeded, false otherwise. + bool Put(const KeyType& key, const ValueType& value, std::chrono::milliseconds ttl) { + size_t hash_code = HashFn()(key); + size_t idx = GetShardIdx(hash_code); + auto& shard = shards_[idx]; + + auto expire_time = std::chrono::steady_clock::now() + ttl; + { + std::lock_guard lock(shard.mutex); + + if (auto it = shard.expiry_list.find(key); it != shard.expiry_list.end()) { + if (it->second <= std::chrono::steady_clock::now()) { + shard.expiry_list.erase(it); + cache_->Remove(key); + } + } else { + entries_num.fetch_add(1, std::memory_order_relaxed); + } + shard.expiry_list[key] = expire_time; + } + + return cache_->Put(key, value); + } + + /// @brief Insert or update a key-value pair with custom TTL (move semantics). + /// @param key The key to insert or update. + /// @param value The value to associate with the key. + /// @param ttl Custom TTL duration for this entry. + /// @return true if insertion succeeded, false otherwise. + bool Put(const KeyType& key, ValueType&& value, std::chrono::milliseconds ttl) { + size_t hash_code = HashFn()(key); + size_t idx = GetShardIdx(hash_code); + auto& shard = shards_[idx]; + + auto expire_time = std::chrono::steady_clock::now() + ttl; + { + std::lock_guard lock(shard.mutex); + + if (auto it = shard.expiry_list.find(key); it != shard.expiry_list.end()) { + if (it->second <= std::chrono::steady_clock::now()) { + shard.expiry_list.erase(it); + cache_->Remove(key); + } + } else { + entries_num.fetch_add(1, std::memory_order_relaxed); + } + shard.expiry_list[key] = expire_time; + } + + return cache_->Put(key, std::forward(value)); + } + + /// @brief Retrieves the value if the key exists and hasn't expired. + /// @param key The key to look up. + /// @return An optional containing the value if the key exists and is valid, std::nullopt otherwise. + std::optional Get(const KeyType& key) override { + size_t hash_code = HashFn()(key); + size_t idx = GetShardIdx(hash_code); + auto& shard = shards_[idx]; + + { + std::lock_guard lock(shard.mutex); + + if (auto it = shard.expiry_list.find(key); it != shard.expiry_list.end()) { + if (it->second > std::chrono::steady_clock::now()) { + return cache_->Get(key); + } + // Key has expired, remove it (lazy evaluation) + shard.expiry_list.erase(it); + cache_->Remove(key); + entries_num.fetch_sub(1, std::memory_order_relaxed); + } + } + + return std::nullopt; + } + + /// @brief Removes the key-value pair if it exists. + /// @param key The key to remove. + /// @return true if the key was found and removed, false otherwise. + bool Remove(const KeyType& key) override { + size_t hash_code = HashFn()(key); + size_t idx = GetShardIdx(hash_code); + auto& shard = shards_[idx]; + + { + std::lock_guard lock(shard.mutex); + + if (auto it = shard.expiry_list.find(key); it == shard.expiry_list.end()) { + return false; + } + + shard.expiry_list.erase(key); + } + entries_num.fetch_sub(1, std::memory_order_relaxed); + + return cache_->Remove(key); + } + + /// @brief Clear all entries from the cache. + void Clear() override { + for (size_t i = 0; i < shards_num_; ++i) { + auto& shard = shards_[i]; + std::lock_guard lock(shard.mutex); + shard.expiry_list.clear(); + } + entries_num.store(0, std::memory_order_relaxed); + cache_->Clear(); + } + + /// @brief Get the number of entries currently in the cache. + /// @return The current size of the cache. + size_t Size() override { return entries_num.load(std::memory_order_acquire); } + + private: + template + struct alignas(64) Shard { + M mutex; + std::unordered_map expiry_list; + + Shard() = default; + ~Shard() = default; + + // Disable copying + Shard(const Shard&) = delete; + Shard& operator=(const Shard&) = delete; + + // Disable moving + Shard(Shard&&) = delete; + Shard& operator=(Shard&&) = delete; + }; + + size_t GetShardIdx(size_t hash_code) const { return hash_code & (shards_num_ - 1); } + + size_t RoundUpPowerOf2(size_t n) { + bool size_is_power_of_2 = (n >= 1) && ((n & (n - 1)) == 0); + if (!size_is_power_of_2) { + uint64_t tmp = 1; + while (tmp <= n) { + tmp <<= 1; + } + + n = tmp; + } + return n; + } + + private: + /// Default TTL duration for entries in milliseconds. + std::chrono::milliseconds default_ttl_; + // The underlying cache implementation. + std::unique_ptr> cache_; + // Number of shards (always power of two). + size_t shards_num_; + // Atomic counter for the total number of entries. + std::atomic entries_num{0}; + // Array of shards. + std::unique_ptr[]> shards_; +}; + +} // namespace trpc::cache \ No newline at end of file diff --git a/trpc/util/cache/decorators/scheduled_cache_test.cc b/trpc/util/cache/decorators/scheduled_cache_test.cc new file mode 100644 index 00000000..73884fde --- /dev/null +++ b/trpc/util/cache/decorators/scheduled_cache_test.cc @@ -0,0 +1,199 @@ +/* + * + * Tencent is pleased to support the open source community by making + * tRPC available. + * + * Copyright (C) 2025 Tencent. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "trpc/util/cache/decorators/scheduled_cache.h" +#include "trpc/util/cache/impl/basic_cache.h" + +#include "gtest/gtest.h" + +namespace trpc::cache::testing { + +/// @brief Test basic Put/Get operations and size tracking. +/// @note Verifies fundamental cache operations including insertion, retrieval, +/// and size maintenance functionality. +TEST(ScheduledCacheTest, BasicPutGetWithTTL) { + auto basic_cache = std::make_unique>(); + ScheduledCache scheduled_cache(std::move(basic_cache), std::chrono::milliseconds(100)); + + ASSERT_EQ(scheduled_cache.Size(), 0); + ASSERT_TRUE(scheduled_cache.Put(1, 1)); + ASSERT_EQ(scheduled_cache.Get(1), 1); + ASSERT_EQ(scheduled_cache.Size(), 1); +} + +/// @brief Test handling of duplicate key insertion. +/// @note Verifies that inserting existing keys updates values correctly +/// without affecting cache size. +TEST(ScheduledCacheTest, PutDuplicateKey) { + auto basic_cache = std::make_unique>(); + ScheduledCache scheduled_cache(std::move(basic_cache), std::chrono::milliseconds(100)); + + ASSERT_TRUE(scheduled_cache.Put(1, 1)); + ASSERT_FALSE(scheduled_cache.Put(1, 2)); + ASSERT_EQ(scheduled_cache.Size(), 1); + ASSERT_EQ(scheduled_cache.Get(1), 2); +} + +/// @brief Test removal of existing keys. +/// @note Verifies successful removal of existing entries and subsequent cache state. +TEST(ScheduledCacheTest, RemoveExistingKey) { + auto basic_cache = std::make_unique>(); + ScheduledCache scheduled_cache(std::move(basic_cache)); + + scheduled_cache.Put(1, 1); + ASSERT_TRUE(scheduled_cache.Remove(1)); + ASSERT_EQ(scheduled_cache.Get(1), std::nullopt); + ASSERT_EQ(scheduled_cache.Size(), 0); +} + +/// @brief Test removal of non-existent keys. +/// @note Verifies cache behavior when attempting to remove non-existing keys. +TEST(ScheduledCacheTest, RemoveNonExistingKey) { + auto basic_cache = std::make_unique>(); + ScheduledCache scheduled_cache(std::move(basic_cache)); + + ASSERT_FALSE(scheduled_cache.Remove(1)); + ASSERT_EQ(scheduled_cache.Size(), 0); +} + +/// @brief Test complete cache clearance. +/// @note Verifies that Clear() operation removes all entries and resets cache state. +TEST(ScheduledCacheTest, ClearCache) { + auto basic_cache = std::make_unique>(); + ScheduledCache scheduled_cache(std::move(basic_cache)); + + scheduled_cache.Put(1, 1); + scheduled_cache.Put(2, 2); + scheduled_cache.Clear(); + + ASSERT_EQ(scheduled_cache.Size(), 0); + ASSERT_EQ(scheduled_cache.Get(1), std::nullopt); + ASSERT_EQ(scheduled_cache.Get(2), std::nullopt); +} + +/// @brief Test automatic expiration of entries based on TTL. +/// @note Verifies that entries are automatically evicted after their TTL expires, +/// and cache size is updated accordingly. +TEST(ScheduledCacheTest, EntryExpiration) { + auto basic_cache = std::make_unique>(); + ScheduledCache scheduled_cache(std::move(basic_cache), std::chrono::milliseconds(50)); + + ASSERT_TRUE(scheduled_cache.Put(1, 1)); + ASSERT_EQ(scheduled_cache.Get(1), 1); + + // Wait for entry to expire + std::this_thread::sleep_for(std::chrono::milliseconds(60)); + ASSERT_EQ(scheduled_cache.Get(1), std::nullopt); + ASSERT_EQ(scheduled_cache.Size(), 0); +} + +/// @brief Test custom TTL per entry. +/// @note Verifies that individual entries can have custom TTL values, +// and expiration is handled correctly for each. +TEST(ScheduledCacheTest, CustomTTLPerEntry) { + auto basic_cache = std::make_unique>(); + ScheduledCache scheduled_cache(std::move(basic_cache), std::chrono::milliseconds(1000)); + + // Short TTL + ASSERT_TRUE(scheduled_cache.Put(1, 1, std::chrono::milliseconds(50))); + // Long TTL + ASSERT_TRUE(scheduled_cache.Put(2, 2, std::chrono::milliseconds(200))); + + std::this_thread::sleep_for(std::chrono::milliseconds(60)); + ASSERT_EQ(scheduled_cache.Get(1), std::nullopt); // Should be expired + ASSERT_EQ(scheduled_cache.Get(2), 2); // Should still be valid + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + ASSERT_EQ(scheduled_cache.Get(2), std::nullopt); // Should be expired +} + +/// @brief Test concurrent Put operations with TTL. +/// @note Verifies that concurrent Put operations maintain data consistency +/// and TTL expiration under high contention. + +TEST(ScheduledCacheTest, ConcurrentPut) { + auto basic_cache = std::make_unique>(); + ScheduledCache scheduled_cache(std::move(basic_cache), std::chrono::milliseconds(1000), 32); + + constexpr int threads_num = 8; + constexpr int puts_per_thread = 200; + constexpr int puts_num = threads_num * puts_per_thread; + std::vector threads; + std::atomic key_gen{0}; + + for (int i = 0; i < threads_num; ++i) { + threads.emplace_back([&]() { + for (int j = 0; j < puts_per_thread; ++j) { + int key = key_gen.fetch_add(1, std::memory_order_relaxed); + scheduled_cache.Put(key, key); + } + }); + } + for (auto& t : threads) t.join(); + + int hits{0}, misses{0}; + for (int key = 0; key < puts_num; ++key) { + if (scheduled_cache.Get(key) == std::nullopt) { + ++misses; + } else { + ++hits; + } + } + + ASSERT_EQ(hits, threads_num * puts_per_thread); + ASSERT_EQ(misses, 0); +} + +/// @brief Test concurrent Get operations with expiration. +/// @note Verifies that concurrent Get operations handle expiration correctly, +/// with some gets succeeding before expiration and some failing after. +TEST(ScheduledCacheTest, ConcurrentGetWithExpiration) { + auto basic_cache = std::make_unique>(); + ScheduledCache scheduled_cache(std::move(basic_cache), std::chrono::milliseconds(50), 32); + + for (int key = 0; key < 100; ++key) { + scheduled_cache.Put(key, key); + } + + constexpr int threads_num = 4; + constexpr int gets_per_thread = 25; + std::vector threads; + std::atomic valid_gets{0}; + + for (int i = 0; i < threads_num; ++i) { + threads.emplace_back([&, i]() { + for (int j = 0; j < gets_per_thread; ++j) { + int key = j + i * gets_per_thread; + if (scheduled_cache.Get(key)) { + valid_gets.fetch_add(1, std::memory_order_relaxed); + } + } + }); + } + for (auto& t : threads) t.join(); + + // Some gets should succeed (before expiration), some should fail (after expiration) + ASSERT_GT(valid_gets.load(), 0); + ASSERT_LE(valid_gets.load(), 100); +} + +} // namespace trpc::cache::testing