From 82bdac4b31b58f0a922a1d371e1b3e4ee8799c5b Mon Sep 17 00:00:00 2001 From: Lukas Martinelli Date: Wed, 9 Aug 2017 15:02:51 -0400 Subject: [PATCH 1/5] Build changeset index --- CMakeLists.txt | 2 ++ README.md | 6 ++++ build_changeset_lookup.cpp | 67 ++++++++++++++++++++++++++++++++++++++ build_tag_lookup.cpp | 10 +----- db.hpp | 54 ++++++++++++++++++++++++++++++ 5 files changed, 130 insertions(+), 9 deletions(-) create mode 100644 build_changeset_lookup.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index f8eb88e..0a7317e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -50,7 +50,9 @@ set(ALL_LIBRARIES ${CMAKE_THREAD_LIBS_INIT} ${EXPAT_LIBRARIES} ${ROCKSDB_LIBRARI #---------------------------------------------------------------------- add_executable(build_tag_lookup build_tag_lookup.cpp) add_executable(add_tags add_tags.cpp) +add_executable(build_changeset_lookup build_changeset_lookup.cpp) +target_link_libraries(build_changeset_lookup ${ALL_LIBRARIES}) target_link_libraries(build_tag_lookup ${ALL_LIBRARIES}) target_link_libraries(add_tags ${ALL_LIBRARIES}) #----------------------------------------------------------------------------- diff --git a/README.md b/README.md index c2f9416..883196b 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,12 @@ make ## Run +First build up a changeset index (`http://planet.osm.org/planet/changesets-latest.osm.bz2`). + +``` +build_changeset_lookup INDEX_DIR OSM_CHANGESET_FILE +``` + First build up a historic tag index. ``` diff --git a/build_changeset_lookup.cpp b/build_changeset_lookup.cpp new file mode 100644 index 0000000..93cf59a --- /dev/null +++ b/build_changeset_lookup.cpp @@ -0,0 +1,67 @@ +#include // for std::exit +#include // for std::strncmp +#include // for std::cout, std::cerr +#include +#include + +#include +#include +#include +#include + +#include "db.hpp" + +class ChangesetHandler : public osmium::handler::Handler { + TagStore* m_store; + +public: + ChangesetHandler(TagStore* store) : m_store(store) {} + void changeset(const osmium::Changeset& changeset) { + m_store->store_changeset(changeset); + } +}; + +std::atomic_bool stop_progress{false}; + +void report_progress(const TagStore* store) { + unsigned long last_changesets_count{0}; + auto start = std::chrono::steady_clock::now(); + + while(true) { + if(stop_progress) { + auto end = std::chrono::steady_clock::now(); + auto diff = end - start; + + std::cerr << "Processed " << last_changesets_count << " changesets in " << std::chrono::duration (diff).count() << " ms" << std::endl; + break; + } + + auto diff_changesets_count = store->stored_changesets_count - last_changesets_count; + std::cerr << "Processing " << diff_changesets_count << " changesets/s" << std::endl; + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + last_changesets_count += diff_changesets_count; + } +} + +int main(int argc, char* argv[]) { + if (argc != 3) { + std::cerr << "Usage: " << argv[0] << " INDEX_DIR CHANGESET_FILE" << std::endl; + std::exit(1); + } + + std::string index_dir = argv[1]; + std::string changeset_filename = argv[2]; + + TagStore store(index_dir, true); + ChangesetHandler handler(&store); + + std::thread t_progress(report_progress, &store); + + osmium::io::Reader reader{changeset_filename, osmium::osm_entity_bits::changeset}; + osmium::apply(reader, handler); + + stop_progress = true; + t_progress.join(); + store.flush(); +} diff --git a/build_tag_lookup.cpp b/build_tag_lookup.cpp index 34a2955..5f9329e 100644 --- a/build_tag_lookup.cpp +++ b/build_tag_lookup.cpp @@ -1,11 +1,3 @@ -/* - - Build Tag History Database - - (Based on osmium_pub_names example) - -*/ - #include // for std::exit #include // for std::strncmp #include // for std::cout, std::cerr @@ -85,7 +77,7 @@ int main(int argc, char* argv[]) { std::string index_dir = argv[1]; std::string osm_filename = argv[2]; - TagStore store(index_dir, true); + TagStore store(index_dir, false); TagStoreHandler tag_handler(&store); std::thread t_progress(report_progress, &store); diff --git a/db.hpp b/db.hpp index c2a06e6..67a8e89 100644 --- a/db.hpp +++ b/db.hpp @@ -28,6 +28,7 @@ class TagStore { rocksdb::ColumnFamilyHandle* m_cf_ways; rocksdb::ColumnFamilyHandle* m_cf_nodes; rocksdb::ColumnFamilyHandle* m_cf_relations; + rocksdb::ColumnFamilyHandle* m_cf_changesets; rocksdb::WriteOptions m_write_options; rocksdb::WriteBatch m_buffer_batch; @@ -62,6 +63,10 @@ class TagStore { uint64_t relation_keys{0}; m_db->GetIntProperty(m_cf_relations, "rocksdb.estimate-num-keys", &relation_keys); std::cerr << "Stored ~" << relation_keys << "/" << stored_relations_count << " relations" << std::endl; + + uint64_t changeset_keys{0}; + m_db->GetIntProperty(m_cf_changesets, "rocksdb.estimate-num-keys", &changeset_keys); + std::cerr << "Stored ~" << changeset_keys << "/" << stored_changesets_count << " changesets" << std::endl; } public: @@ -71,6 +76,7 @@ class TagStore { unsigned long stored_nodes_count{0}; unsigned long stored_ways_count{0}; unsigned long stored_relations_count{0}; + unsigned long stored_changesets_count{0}; unsigned long stored_objects_count() { return stored_nodes_count + stored_ways_count + stored_relations_count; @@ -104,6 +110,7 @@ class TagStore { s = m_db->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "ways", &m_cf_ways); assert(s.ok()); s = m_db->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "relations", &m_cf_relations); + s = m_db->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "changesets", &m_cf_changesets); assert(s.ok()); } else { db_options.error_if_exists = false; @@ -118,6 +125,7 @@ class TagStore { column_families.push_back(rocksdb::ColumnFamilyDescriptor( "nodes", rocksdb::ColumnFamilyOptions())); column_families.push_back(rocksdb::ColumnFamilyDescriptor( "ways", rocksdb::ColumnFamilyOptions())); column_families.push_back(rocksdb::ColumnFamilyDescriptor( "relations", rocksdb::ColumnFamilyOptions())); + column_families.push_back(rocksdb::ColumnFamilyDescriptor( "changesets", rocksdb::ColumnFamilyOptions())); std::vector handles; @@ -127,6 +135,7 @@ class TagStore { m_cf_nodes = handles[1]; m_cf_ways = handles[2]; m_cf_relations = handles[3]; + m_cf_changesets = handles[4]; } } rocksdb::Status get_tags(const int64_t osm_id, const int osm_type, const int version, std::string* json_value) { @@ -159,6 +168,49 @@ class TagStore { } } + bool store_changeset(const osmium::Changeset& changeset) { + rapidjson::Document doc; + doc.SetObject(); + + rapidjson::Document::AllocatorType& a = doc.GetAllocator(); + + doc.AddMember("@created_at", changeset.created_at().to_iso(), a); + doc.AddMember("@closed_at", changeset.closed_at().to_iso(), a); + doc.AddMember("@user", std::string{changeset.user()}, a); + doc.AddMember("@uid", changeset.uid(), a); + doc.AddMember("@num_changes", changeset.num_changes(), a); + doc.AddMember("@num_comments", changeset.num_comments(), a); + + const osmium::TagList& tags = changeset.tags(); + rapidjson::Value changeset_tags(rapidjson::kObjectType); + for (const osmium::Tag& tag : tags) { + rapidjson::Value key(rapidjson::StringRef(tag.key())); + rapidjson::Value value(rapidjson::StringRef(tag.value())); + changeset_tags.AddMember(key, value, a); + } + + doc.AddMember("@tags", changeset_tags, a); + + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + doc.Accept(writer); + + auto lookup = std::to_string(changeset.id()); + rocksdb::Status stat = m_buffer_batch.Put(m_cf_changesets, lookup, buffer.GetString()); + + stored_changesets_count++; + if (m_buffer_batch.Count() > 1000) { + m_db->Write(m_write_options, &m_buffer_batch); + m_buffer_batch.Clear(); + } + + if (stored_changesets_count != 0 && (stored_changesets_count % 1000000) == 0) { + flush_family("changesets", m_cf_changesets); + report_count_stats(); + } + return true; + } + bool store_tags(const osmium::OSMObject& object, rocksdb::ColumnFamilyHandle* cf) { const auto lookup = make_lookup(object.id(), object.version()); if (object.tags().empty()) { @@ -228,10 +280,12 @@ class TagStore { flush_family("nodes", m_cf_nodes); flush_family("ways", m_cf_ways); flush_family("relations", m_cf_relations); + flush_family("changesets", m_cf_changesets); compact_family("nodes", m_cf_nodes); compact_family("ways", m_cf_ways); compact_family("relations", m_cf_relations); + compact_family("changesets", m_cf_changesets); report_count_stats(); } From 11e794c4f4bbcda65d1f07c532f3f507f21727ca Mon Sep 17 00:00:00 2001 From: Lukas Martinelli Date: Wed, 9 Aug 2017 16:03:04 -0400 Subject: [PATCH 2/5] Lookup changset when storing --- db.hpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/db.hpp b/db.hpp index 67a8e89..9d17811 100644 --- a/db.hpp +++ b/db.hpp @@ -150,6 +150,7 @@ class TagStore { } } + void store_tags(const osmium::Way& way) { if(store_tags(way, m_cf_ways)) { stored_ways_count++; @@ -212,6 +213,7 @@ class TagStore { } bool store_tags(const osmium::OSMObject& object, rocksdb::ColumnFamilyHandle* cf) { + const auto lookup = make_lookup(object.id(), object.version()); if (object.tags().empty()) { empty_objects_count++; @@ -233,6 +235,20 @@ class TagStore { doc.AddMember("@changeset", object.changeset(), a); doc.AddMember("@version", object.version(), a); + // Add closed at if found + std::string changeset_json; + auto read_status = m_db->Get(rocksdb::ReadOptions(), m_cf_changesets, std::to_string(object.changeset()), &changeset_json); + if (read_status.ok()) { + rapidjson::Document changeset_doc; + std::cout << changeset_json << std::endl; + if(!changeset_doc.Parse<0>(changeset_json.c_str()).HasParseError()) { + if(changeset_doc.HasMember("@closed_at")) { + // doc.AddMember("@closed_at", changeset_doc["@closed_at"], a); + // std::cout << "Found closed at" << changeset_doc["@closed_at"].GetString() << std::endl; + } + } + } + //Ignore trying to store geometries, but if we could scale that, it'd be awesome. const osmium::TagList& tags = object.tags(); From 555d16388385add08ae4f79bdf604e5a093b9092 Mon Sep 17 00:00:00 2001 From: Lukas Martinelli Date: Wed, 9 Aug 2017 16:48:16 -0400 Subject: [PATCH 3/5] Experiment with node lookups --- db.hpp | 64 ++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/db.hpp b/db.hpp index 9d17811..2d186da 100644 --- a/db.hpp +++ b/db.hpp @@ -150,8 +150,52 @@ class TagStore { } } + void lookup_nodes(const osmium::Way& way, const int closed_at) { + const osmium::WayNodeList& node_refs = way.nodes(); + for (const osmium::NodeRef& node_ref : node_refs) { + auto node_id = node_ref.ref(); + + // Find all the versions + int node_version{1}; + for(int v = 1; v < 1000; v++) { + std::string node_json; + + auto read_status = m_db->Get(rocksdb::ReadOptions(), m_cf_nodes, make_lookup(node_id, v), &node_json); + + if(read_status.ok()) { + rapidjson::Document node_doc; + if(!node_doc.Parse<0>(node_json.c_str()).HasParseError()) { + if(node_doc.HasMember("@timestamp")) { + auto ts = node_doc["@timestamp"].GetInt(); + if (ts > closed_at) { + break; + } else { + node_version = v; + } + } + } + } + } + + std::cout << "Found real node version " << node_version << std::endl; + } + } void store_tags(const osmium::Way& way) { + // Add closed at if found + std::string changeset_json; + auto read_status = m_db->Get(rocksdb::ReadOptions(), m_cf_changesets, std::to_string(way.changeset()), &changeset_json); + if (read_status.ok()) { + rapidjson::Document changeset_doc; + std::cout << changeset_json << std::endl; + if(!changeset_doc.Parse<0>(changeset_json.c_str()).HasParseError()) { + if(changeset_doc.HasMember("@closed_at")) { + auto closed_at = changeset_doc["@closed_at"].GetInt(); + lookup_nodes(way, closed_at); + } + } + } + if(store_tags(way, m_cf_ways)) { stored_ways_count++; } @@ -175,8 +219,8 @@ class TagStore { rapidjson::Document::AllocatorType& a = doc.GetAllocator(); - doc.AddMember("@created_at", changeset.created_at().to_iso(), a); - doc.AddMember("@closed_at", changeset.closed_at().to_iso(), a); + doc.AddMember("@created_at", static_cast(changeset.created_at().seconds_since_epoch()), a); + doc.AddMember("@closed_at", static_cast(changeset.closed_at().seconds_since_epoch()), a); doc.AddMember("@user", std::string{changeset.user()}, a); doc.AddMember("@uid", changeset.uid(), a); doc.AddMember("@num_changes", changeset.num_changes(), a); @@ -225,7 +269,7 @@ class TagStore { rapidjson::Document::AllocatorType& a = doc.GetAllocator(); - doc.AddMember("@timestamp", object.timestamp().to_iso(), a); //ISO is helpful for debugging, but we should leave it + doc.AddMember("@timestamp", static_cast(object.timestamp().seconds_since_epoch()), a); if (object.deleted()){ doc.AddMember("@deleted", object.deleted(), a); } @@ -235,20 +279,6 @@ class TagStore { doc.AddMember("@changeset", object.changeset(), a); doc.AddMember("@version", object.version(), a); - // Add closed at if found - std::string changeset_json; - auto read_status = m_db->Get(rocksdb::ReadOptions(), m_cf_changesets, std::to_string(object.changeset()), &changeset_json); - if (read_status.ok()) { - rapidjson::Document changeset_doc; - std::cout << changeset_json << std::endl; - if(!changeset_doc.Parse<0>(changeset_json.c_str()).HasParseError()) { - if(changeset_doc.HasMember("@closed_at")) { - // doc.AddMember("@closed_at", changeset_doc["@closed_at"], a); - // std::cout << "Found closed at" << changeset_doc["@closed_at"].GetString() << std::endl; - } - } - } - //Ignore trying to store geometries, but if we could scale that, it'd be awesome. const osmium::TagList& tags = object.tags(); From 1a8abab3d08e3ad5774cafa95510e54a2543f216 Mon Sep 17 00:00:00 2001 From: Lukas Martinelli Date: Wed, 23 Aug 2017 14:14:54 -0400 Subject: [PATCH 4/5] Encode changesets as pbf --- db.hpp | 30 +++--------------------------- pbf_encoder.hpp | 45 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 27 deletions(-) create mode 100644 pbf_encoder.hpp diff --git a/db.hpp b/db.hpp index 2d186da..22d155d 100644 --- a/db.hpp +++ b/db.hpp @@ -18,6 +18,7 @@ #include #include +#include "pbf_encoder.hpp" const std::string make_lookup(const int64_t osm_id, const int version){ return std::to_string(osm_id) + "!" + std::to_string(version); @@ -214,34 +215,9 @@ class TagStore { } bool store_changeset(const osmium::Changeset& changeset) { - rapidjson::Document doc; - doc.SetObject(); - - rapidjson::Document::AllocatorType& a = doc.GetAllocator(); - - doc.AddMember("@created_at", static_cast(changeset.created_at().seconds_since_epoch()), a); - doc.AddMember("@closed_at", static_cast(changeset.closed_at().seconds_since_epoch()), a); - doc.AddMember("@user", std::string{changeset.user()}, a); - doc.AddMember("@uid", changeset.uid(), a); - doc.AddMember("@num_changes", changeset.num_changes(), a); - doc.AddMember("@num_comments", changeset.num_comments(), a); - - const osmium::TagList& tags = changeset.tags(); - rapidjson::Value changeset_tags(rapidjson::kObjectType); - for (const osmium::Tag& tag : tags) { - rapidjson::Value key(rapidjson::StringRef(tag.key())); - rapidjson::Value value(rapidjson::StringRef(tag.value())); - changeset_tags.AddMember(key, value, a); - } - - doc.AddMember("@tags", changeset_tags, a); - - rapidjson::StringBuffer buffer; - rapidjson::Writer writer(buffer); - doc.Accept(writer); - auto lookup = std::to_string(changeset.id()); - rocksdb::Status stat = m_buffer_batch.Put(m_cf_changesets, lookup, buffer.GetString()); + auto value = osmwayback::encode_changeset(changeset); + rocksdb::Status stat = m_buffer_batch.Put(m_cf_changesets, lookup, value); stored_changesets_count++; if (m_buffer_batch.Count() > 1000) { diff --git a/pbf_encoder.hpp b/pbf_encoder.hpp new file mode 100644 index 0000000..f5f716b --- /dev/null +++ b/pbf_encoder.hpp @@ -0,0 +1,45 @@ +#pragma once + +#include +#include + +#include + +namespace osmwayback { + struct Changeset { + uint64_t created_at; + uint64_t closed_at; + }; + + const std::string encode_changeset(const osmium::Changeset& changeset) { + std::string data; + protozero::pbf_writer encoder(data); + + encoder.add_fixed64(1, static_cast(changeset.created_at().seconds_since_epoch())); + encoder.add_fixed64(2, static_cast(changeset.closed_at().seconds_since_epoch())); + + return data; + } + + const Changeset decode_changeset(std::string data) { + protozero::pbf_reader message(data); + Changeset changeset{}; + + while (message.next()) { + switch (message.tag()) { + case 1: + changeset.created_at = message.get_fixed64(); + break; + case 2: + changeset.closed_at = message.get_fixed64(); + break; + default: + message.skip(); + } + } + + return changeset; + } +} + + From 160580d279a0b3c68fd75dd04acbbbed1a0bda39 Mon Sep 17 00:00:00 2001 From: Lukas Martinelli Date: Thu, 24 Aug 2017 11:47:29 -0400 Subject: [PATCH 5/5] Extract rocks baseclass and pbf decoder --- add_tags.cpp | 4 ++-- build_changeset_lookup.cpp | 8 +++---- build_tag_lookup.cpp | 8 +++---- db.hpp | 48 ++++++++++++++++++++++++-------------- pbf_encoder.hpp | 43 ++++++++++++++++++++++++---------- 5 files changed, 72 insertions(+), 39 deletions(-) diff --git a/add_tags.cpp b/add_tags.cpp index abea05c..9fca336 100644 --- a/add_tags.cpp +++ b/add_tags.cpp @@ -46,7 +46,7 @@ typedef std::map StringStringMap; typedef std::map VersionTags; typedef std::vector < std::map > TagHistoryArray; -void write_with_history_tags(TagStore* store, const std::string line) { +void write_with_history_tags(osmwayback::TagStore* store, const std::string line) { rapidjson::Document geojson_doc; if(geojson_doc.Parse<0>(line.c_str()).HasParseError()) { std::cerr << "ERROR" << std::endl; @@ -213,7 +213,7 @@ int main(int argc, char* argv[]) { std::string index_dir = argv[1]; std::cout << "init tag dir" << std::endl; - TagStore store(index_dir, false); + osmwayback::TagStore store(index_dir, false); rapidjson::Document doc; for (std::string line; std::getline(std::cin, line);) { diff --git a/build_changeset_lookup.cpp b/build_changeset_lookup.cpp index 93cf59a..5214695 100644 --- a/build_changeset_lookup.cpp +++ b/build_changeset_lookup.cpp @@ -12,10 +12,10 @@ #include "db.hpp" class ChangesetHandler : public osmium::handler::Handler { - TagStore* m_store; + osmwayback::TagStore* m_store; public: - ChangesetHandler(TagStore* store) : m_store(store) {} + ChangesetHandler(osmwayback::TagStore* store) : m_store(store) {} void changeset(const osmium::Changeset& changeset) { m_store->store_changeset(changeset); } @@ -23,7 +23,7 @@ class ChangesetHandler : public osmium::handler::Handler { std::atomic_bool stop_progress{false}; -void report_progress(const TagStore* store) { +void report_progress(const osmwayback::TagStore* store) { unsigned long last_changesets_count{0}; auto start = std::chrono::steady_clock::now(); @@ -53,7 +53,7 @@ int main(int argc, char* argv[]) { std::string index_dir = argv[1]; std::string changeset_filename = argv[2]; - TagStore store(index_dir, true); + osmwayback::TagStore store(index_dir, true); ChangesetHandler handler(&store); std::thread t_progress(report_progress, &store); diff --git a/build_tag_lookup.cpp b/build_tag_lookup.cpp index 5f9329e..2ec9697 100644 --- a/build_tag_lookup.cpp +++ b/build_tag_lookup.cpp @@ -12,10 +12,10 @@ #include "db.hpp" class TagStoreHandler : public osmium::handler::Handler { - TagStore* m_store; + osmwayback::TagStore* m_store; public: - TagStoreHandler(TagStore* store) : m_store(store) {} + TagStoreHandler(osmwayback::TagStore* store) : m_store(store) {} long node_count = 0; int way_count = 0; int rel_count = 0; @@ -37,7 +37,7 @@ class TagStoreHandler : public osmium::handler::Handler { std::atomic_bool stop_progress{false}; -void report_progress(const TagStore* store) { +void report_progress(const osmwayback::TagStore* store) { unsigned long last_nodes_count{0}; unsigned long last_ways_count{0}; unsigned long last_relations_count{0}; @@ -77,7 +77,7 @@ int main(int argc, char* argv[]) { std::string index_dir = argv[1]; std::string osm_filename = argv[2]; - TagStore store(index_dir, false); + osmwayback::TagStore store(index_dir, false); TagStoreHandler tag_handler(&store); std::thread t_progress(report_progress, &store); diff --git a/db.hpp b/db.hpp index 22d155d..9bf9fa1 100644 --- a/db.hpp +++ b/db.hpp @@ -20,11 +20,15 @@ #include #include "pbf_encoder.hpp" +namespace osmwayback { + const std::string make_lookup(const int64_t osm_id, const int version){ return std::to_string(osm_id) + "!" + std::to_string(version); } -class TagStore { +class RocksDBStore { + protected: + rocksdb::DB* m_db; rocksdb::ColumnFamilyHandle* m_cf_ways; rocksdb::ColumnFamilyHandle* m_cf_nodes; @@ -32,8 +36,6 @@ class TagStore { rocksdb::ColumnFamilyHandle* m_cf_changesets; rocksdb::WriteOptions m_write_options; - rocksdb::WriteBatch m_buffer_batch; - void flush_family(const std::string type, rocksdb::ColumnFamilyHandle* cf) { const auto start = std::chrono::steady_clock::now(); std::cerr << "Flushing " << type << std::endl; @@ -70,20 +72,7 @@ class TagStore { std::cerr << "Stored ~" << changeset_keys << "/" << stored_changesets_count << " changesets" << std::endl; } -public: - unsigned long empty_objects_count{0}; - unsigned long stored_tags_count{0}; - - unsigned long stored_nodes_count{0}; - unsigned long stored_ways_count{0}; - unsigned long stored_relations_count{0}; - unsigned long stored_changesets_count{0}; - - unsigned long stored_objects_count() { - return stored_nodes_count + stored_ways_count + stored_relations_count; - } - - TagStore(const std::string index_dir, const bool create) { + RocksDBStore (const std::string index_dir, const bool create) { rocksdb::Options db_options; db_options.allow_mmap_writes = false; db_options.max_background_flushes = 4; @@ -138,6 +127,29 @@ class TagStore { m_cf_relations = handles[3]; m_cf_changesets = handles[4]; } + + } + + public: + + unsigned long empty_objects_count{0}; + unsigned long stored_tags_count{0}; + + unsigned long stored_nodes_count{0}; + unsigned long stored_ways_count{0}; + unsigned long stored_relations_count{0}; + unsigned long stored_changesets_count{0}; + + unsigned long stored_objects_count() { + return stored_nodes_count + stored_ways_count + stored_relations_count; + } +}; + +class TagStore : public RocksDBStore { + rocksdb::WriteBatch m_buffer_batch; + +public: + TagStore(const std::string index_dir, const bool create) : RocksDBStore(index_dir, create) { } rocksdb::Status get_tags(const int64_t osm_id, const int osm_type, const int version, std::string* json_value) { const auto lookup = make_lookup(osm_id, version); @@ -312,3 +324,5 @@ class TagStore { report_count_stats(); } }; + +} diff --git a/pbf_encoder.hpp b/pbf_encoder.hpp index f5f716b..95155f3 100644 --- a/pbf_encoder.hpp +++ b/pbf_encoder.hpp @@ -4,11 +4,13 @@ #include #include +#include namespace osmwayback { struct Changeset { uint64_t created_at; uint64_t closed_at; + std::map tags; }; const std::string encode_changeset(const osmium::Changeset& changeset) { @@ -18,25 +20,42 @@ namespace osmwayback { encoder.add_fixed64(1, static_cast(changeset.created_at().seconds_since_epoch())); encoder.add_fixed64(2, static_cast(changeset.closed_at().seconds_since_epoch())); + const osmium::TagList& tags = changeset.tags(); + for (const osmium::Tag& tag : tags) { + encoder.add_string(17, tag.key()); + encoder.add_string(17, tag.value()); + } + return data; } const Changeset decode_changeset(std::string data) { protozero::pbf_reader message(data); Changeset changeset{}; + changeset.tags = std::map{}; - while (message.next()) { - switch (message.tag()) { - case 1: - changeset.created_at = message.get_fixed64(); - break; - case 2: - changeset.closed_at = message.get_fixed64(); - break; - default: - message.skip(); - } - } + std::string previous_key{}; + while (message.next()) { + switch (message.tag()) { + case 1: + changeset.created_at = message.get_fixed64(); + break; + case 2: + changeset.closed_at = message.get_fixed64(); + break; + case 17: + changeset.closed_at = message.get_fixed64(); + if (previous_key.empty()) { + previous_key = message.get_string(); + } else { + changeset.tags[previous_key] = message.get_string(); + previous_key = ""; + } + break; + default: + message.skip(); + } + } return changeset; }