Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e489910
fix default string support
zhanglei1949 Mar 16, 2026
038f315
minor fixes
zhanglei1949 Mar 16, 2026
69e6d6f
use a separate file to store validility
zhanglei1949 Mar 16, 2026
e75485a
revert to uint32_t
zhanglei1949 Mar 16, 2026
61ad47c
Merge branch 'main' into fix-str-defaul-val
zhanglei1949 Mar 16, 2026
c788bf9
fix
zhanglei1949 Mar 16, 2026
a4f4c0a
Merge branch 'fix-str-defaul-val' of https://github.com/zhanglei1949/…
zhanglei1949 Mar 16, 2026
8212b34
ensure backward compatibility
zhanglei1949 Mar 16, 2026
cef540c
Update include/neug/utils/mmap_array.h
zhanglei1949 Mar 17, 2026
316c9af
minor fix
zhanglei1949 Mar 17, 2026
d02df26
Merge branch 'fix-str-defaul-val' of https://github.com/zhanglei1949/…
zhanglei1949 Mar 17, 2026
1d2ba36
Merge branch 'main' into fix-str-defaul-val
zhanglei1949 Mar 17, 2026
ba0167a
use mmap_array<uint8_t> to store materialized info
zhanglei1949 Mar 17, 2026
b4c05eb
Merge branch 'fix-str-defaul-val' of https://github.com/zhanglei1949/…
zhanglei1949 Mar 17, 2026
00550ce
store default value in data_ and let multiple items point to it
zhanglei1949 Mar 17, 2026
7dc36f1
fix
zhanglei1949 Mar 17, 2026
f186997
revert unneccessary changes
zhanglei1949 Mar 17, 2026
4b6f3f9
Merge branch 'main' into fix-str-defaul-val
zhanglei1949 Mar 17, 2026
23d7ff5
fix
zhanglei1949 Mar 17, 2026
a07158c
Merge branch 'fix-str-defaul-val' of https://github.com/zhanglei1949/…
zhanglei1949 Mar 17, 2026
54dff3d
Update include/neug/utils/mmap_array.h
zhanglei1949 Mar 17, 2026
47254f1
resolve greptile comments
zhanglei1949 Mar 17, 2026
4d638db
Merge branch 'fix-str-defaul-val' of https://github.com/zhanglei1949/…
zhanglei1949 Mar 17, 2026
46a6b20
only keep default value in schema
zhanglei1949 Mar 18, 2026
cc22bf0
Merge branch 'main' into fix-str-defaul-val
zhanglei1949 Mar 18, 2026
bc63913
remove dummy code
zhanglei1949 Mar 18, 2026
8cdb184
minor fix
zhanglei1949 Mar 18, 2026
8295173
migrate to new offset
zhanglei1949 Mar 18, 2026
e8a5cf5
fix
zhanglei1949 Mar 18, 2026
c34ccf8
minor fix
zhanglei1949 Mar 18, 2026
c9202f9
report error with throw
zhanglei1949 Mar 18, 2026
357c293
minor fix
zhanglei1949 Mar 18, 2026
a25949c
fix
zhanglei1949 Mar 18, 2026
28a76b7
Merge branch 'main' into fix-str-defaul-val
zhanglei1949 Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions include/neug/utils/id_indexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,11 @@ class LFIndexer {
void init(const DataTypeId& type,
std::shared_ptr<ExtraTypeInfo> extra_type_info = nullptr) {
keys_ = nullptr;
auto default_value = get_default_value(type);
switch (type) {
#define TYPE_DISPATCHER(enum_val, T) \
case DataTypeId::enum_val: { \
keys_ = std::make_shared<TypedColumn<T>>( \
PropUtils<T>::to_typed(default_value), StorageStrategy::kMem); \
break; \
#define TYPE_DISPATCHER(enum_val, T) \
case DataTypeId::enum_val: { \
keys_ = std::make_shared<TypedColumn<T>>(StorageStrategy::kMem); \
break; \
}
TYPE_DISPATCHER(kInt64, int64_t)
TYPE_DISPATCHER(kInt32, int32_t)
Expand Down
98 changes: 73 additions & 25 deletions include/neug/utils/mmap_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
#include <filesystem>
#include <string>
#include <string_view>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include "glog/logging.h"
#include "neug/storages/file_names.h"
#include "neug/utils/exception/exception.h"
#include "neug/utils/file_utils.h"

#ifdef __ia64__
#define ADDR (void*) (0x8000000000000000UL)
Expand Down Expand Up @@ -66,6 +69,9 @@ inline size_t hugepage_round_up(size_t size) { return ROUND_UP(size); }

namespace neug {

template <typename T>
class TypedColumn;

enum class MemoryStrategy {
kSyncToFile,
kMemoryOnly,
Expand Down Expand Up @@ -476,6 +482,7 @@ struct string_item {
template <>
class mmap_array<std::string_view> {
public:
friend class TypedColumn<std::string_view>;
mmap_array() {}
mmap_array(mmap_array&& rhs) : mmap_array() { swap(rhs); }
~mmap_array() {}
Expand All @@ -498,20 +505,17 @@ class mmap_array<std::string_view> {
}

void open_with_hugepages(const std::string& filename) {
is_writable_ = true;
items_.open_with_hugepages(filename + ".items");
data_.open_with_hugepages(filename + ".data");
}

void touch(const std::string& filename) {
items_.touch(filename + ".items");
data_.touch(filename + ".data");
}

void dump(const std::string& filename) {
// Compact before dumping to reclaim unused space
auto plan = prepare_compaction_plan();
size_t effective_size = plan.total_size - plan.reused_size;
bool should_stream =
!data_.is_sync_to_file() && plan.total_size < data_.size();
!data_.is_sync_to_file() && effective_size < data_.size();
if (should_stream) {
stream_compact_and_dump(plan, filename + ".data", filename + ".items");
return;
Expand All @@ -520,6 +524,7 @@ class mmap_array<std::string_view> {
compact();
items_.dump(filename + ".items");
data_.dump(filename + ".data");
reset();
}

void resize(size_t size, size_t data_size) {
Expand All @@ -545,7 +550,8 @@ class mmap_array<std::string_view> {
}

void set(size_t idx, size_t offset, const std::string_view& val) {
items_.set(idx, {offset, static_cast<uint32_t>(val.size())});
items_.set(idx, {static_cast<uint64_t>(offset),
static_cast<uint32_t>(val.size())});
assert(data_.data() + offset + val.size() <= data_.data() + data_.size());
memcpy(data_.data() + offset, val.data(), val.size());
}
Expand All @@ -562,6 +568,7 @@ class mmap_array<std::string_view> {
void swap(mmap_array& rhs) {
items_.swap(rhs.items_);
data_.swap(rhs.data_);
std::swap(is_writable_, rhs.is_writable_);
}

void set_writable(bool is_writable) {
Expand All @@ -579,6 +586,7 @@ class mmap_array<std::string_view> {
is_writable_ = true;
}

private:
// Compact the data buffer by removing unused space and updating offsets
// This is an in-place operation that shifts valid string data forward
// Returns the compacted data size. Note that the reserved size of data buffer
Expand All @@ -590,32 +598,41 @@ class mmap_array<std::string_view> {
return 0;
}
size_t size_before_compact = data_.size();
if (plan.total_size == size_before_compact) {
if (plan.total_size == plan.reused_size + size_before_compact) {
return size_before_compact;
}
size_t effective_size = plan.total_size - plan.reused_size;

std::vector<char> temp_buf(plan.total_size);
std::vector<char> temp_buf(effective_size);
size_t write_offset = 0;
size_t limit_offset = 0;
std::unordered_map<uint64_t, uint64_t> old_offset_to_new;
for (const auto& entry : plan.entries) {
const char* src = data_.data() + entry.offset;
char* dst = temp_buf.data() + write_offset;
limit_offset = std::max(limit_offset,
static_cast<size_t>(entry.offset + entry.length));
memcpy(dst, src, entry.length);
items_.set(entry.index,
{static_cast<uint64_t>(write_offset), entry.length});
if (entry.length > 0) {
auto it = old_offset_to_new.find(entry.offset);
if (it != old_offset_to_new.end()) {
items_.set(entry.index, {it->second, entry.length});
continue;
}
old_offset_to_new.insert({entry.offset, write_offset});
const char* src = data_.data() + entry.offset;
char* dst = temp_buf.data() + write_offset;
limit_offset = std::max(
limit_offset, static_cast<size_t>(entry.offset + entry.length));
memcpy(dst, src, entry.length);
}
items_.set(entry.index, {static_cast<uint64_t>(write_offset),
static_cast<uint32_t>(entry.length)});
write_offset += entry.length;
}
assert(write_offset == plan.total_size);
memcpy(data_.data(), temp_buf.data(), plan.total_size);
assert(write_offset + plan.reused_size == plan.total_size);
memcpy(data_.data(), temp_buf.data(), effective_size);

VLOG(1) << "Compaction completed. New data size: " << plan.total_size
VLOG(1) << "Compaction completed. New data size: " << effective_size
<< ", old data size: " << limit_offset;
return plan.total_size;
return effective_size;
}

private:
struct CompactionPlan {
struct Entry {
size_t index;
Expand All @@ -624,15 +641,25 @@ class mmap_array<std::string_view> {
};
std::vector<Entry> entries;
size_t total_size = 0;
size_t reused_size = 0;
};

CompactionPlan prepare_compaction_plan() const {
CompactionPlan plan;
plan.entries.reserve(items_.size());
std::unordered_set<uint64_t> seen_offsets;
for (size_t i = 0; i < items_.size(); ++i) {
const string_item& item = items_.get(i);
plan.total_size += item.length;
plan.entries.push_back({i, item.offset, item.length});
plan.entries.push_back(
{i, item.offset, static_cast<uint32_t>(item.length)});
if (item.length > 0) {
if (seen_offsets.find(item.offset) != seen_offsets.end()) {
plan.reused_size += item.length;
} else {
seen_offsets.insert(item.offset);
}
}
}
return plan;
}
Expand All @@ -651,24 +678,33 @@ class mmap_array<std::string_view> {
}

size_t write_offset = 0;
std::unordered_map<uint64_t, uint64_t> old_offset_to_new;
for (const auto& entry : plan.entries) {
if (entry.length > 0) {
auto it = old_offset_to_new.find(entry.offset);
if (it != old_offset_to_new.end()) {
items_.set(entry.index, {it->second, entry.length});
continue;
}
old_offset_to_new.insert({entry.offset, write_offset});
const char* src = data_.data() + entry.offset;
if (fwrite(src, 1, entry.length, fout) != entry.length) {
fclose(fout);
std::stringstream ss;
ss << "Failed to fwrite file [ " << data_filename << " ], "
<< strerror(errno);
LOG(ERROR) << ss.str();
THROW_RUNTIME_ERROR(ss.str());
}
}
items_.set(entry.index,
{static_cast<uint64_t>(write_offset), entry.length});
items_.set(entry.index, {static_cast<uint64_t>(write_offset),
static_cast<uint32_t>(entry.length)});
write_offset += entry.length;
}
assert(write_offset == plan.total_size);
assert(write_offset + plan.reused_size == plan.total_size);

if (fflush(fout) != 0) {
fclose(fout);
std::stringstream ss;
ss << "Failed to fflush file [ " << data_filename << " ], "
<< strerror(errno);
Expand All @@ -677,13 +713,15 @@ class mmap_array<std::string_view> {
}
int fd = fileno(fout);
if (fd == -1) {
fclose(fout);
std::stringstream ss;
ss << "Failed to get file descriptor for [ " << data_filename << " ], "
<< strerror(errno);
LOG(ERROR) << ss.str();
THROW_RUNTIME_ERROR(ss.str());
}
if (ftruncate(fd, static_cast<off_t>(size_before_compact)) != 0) {
fclose(fout);
std::stringstream ss;
ss << "Failed to ftruncate file [ " << data_filename << " ], "
<< strerror(errno);
Expand Down Expand Up @@ -716,6 +754,16 @@ class mmap_array<std::string_view> {
items_.dump(items_filename);
}

// Should only be used internally when we are sure the idx is valid
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private methods, only called by friend class TypedColumn<std::string_view>

string_item get_string_item(size_t idx) const { return items_.get(idx); }
void set_string_item(size_t idx, const string_item& item) {
if (!is_writable_) {
THROW_RUNTIME_ERROR(
"Attempt to set_string_item on a read-only mmap_array");
}
items_.set(idx, item);
}

mmap_array<string_item> items_;
mmap_array<char> data_;
bool is_writable_ = true;
Expand Down
Loading
Loading