diff --git a/CMakeLists.txt b/CMakeLists.txt index f8eb88e..0a7317e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -50,7 +50,9 @@ set(ALL_LIBRARIES ${CMAKE_THREAD_LIBS_INIT} ${EXPAT_LIBRARIES} ${ROCKSDB_LIBRARI #---------------------------------------------------------------------- add_executable(build_tag_lookup build_tag_lookup.cpp) add_executable(add_tags add_tags.cpp) +add_executable(build_changeset_lookup build_changeset_lookup.cpp) +target_link_libraries(build_changeset_lookup ${ALL_LIBRARIES}) target_link_libraries(build_tag_lookup ${ALL_LIBRARIES}) target_link_libraries(add_tags ${ALL_LIBRARIES}) #----------------------------------------------------------------------------- diff --git a/README.md b/README.md index c2f9416..883196b 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,12 @@ make ## Run +First build up a changeset index (`http://planet.osm.org/planet/changesets-latest.osm.bz2`). + +``` +build_changeset_lookup INDEX_DIR OSM_CHANGESET_FILE +``` + First build up a historic tag index. ``` diff --git a/add_tags.cpp b/add_tags.cpp index abea05c..9fca336 100644 --- a/add_tags.cpp +++ b/add_tags.cpp @@ -46,7 +46,7 @@ typedef std::map StringStringMap; typedef std::map VersionTags; typedef std::vector < std::map > TagHistoryArray; -void write_with_history_tags(TagStore* store, const std::string line) { +void write_with_history_tags(osmwayback::TagStore* store, const std::string line) { rapidjson::Document geojson_doc; if(geojson_doc.Parse<0>(line.c_str()).HasParseError()) { std::cerr << "ERROR" << std::endl; @@ -213,7 +213,7 @@ int main(int argc, char* argv[]) { std::string index_dir = argv[1]; std::cout << "init tag dir" << std::endl; - TagStore store(index_dir, false); + osmwayback::TagStore store(index_dir, false); rapidjson::Document doc; for (std::string line; std::getline(std::cin, line);) { diff --git a/build_changeset_lookup.cpp b/build_changeset_lookup.cpp new file mode 100644 index 0000000..5214695 --- /dev/null +++ b/build_changeset_lookup.cpp @@ -0,0 +1,67 @@ +#include // for std::exit +#include // for std::strncmp +#include // for std::cout, std::cerr +#include +#include + +#include +#include +#include +#include + +#include "db.hpp" + +class ChangesetHandler : public osmium::handler::Handler { + osmwayback::TagStore* m_store; + +public: + ChangesetHandler(osmwayback::TagStore* store) : m_store(store) {} + void changeset(const osmium::Changeset& changeset) { + m_store->store_changeset(changeset); + } +}; + +std::atomic_bool stop_progress{false}; + +void report_progress(const osmwayback::TagStore* store) { + unsigned long last_changesets_count{0}; + auto start = std::chrono::steady_clock::now(); + + while(true) { + if(stop_progress) { + auto end = std::chrono::steady_clock::now(); + auto diff = end - start; + + std::cerr << "Processed " << last_changesets_count << " changesets in " << std::chrono::duration (diff).count() << " ms" << std::endl; + break; + } + + auto diff_changesets_count = store->stored_changesets_count - last_changesets_count; + std::cerr << "Processing " << diff_changesets_count << " changesets/s" << std::endl; + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + last_changesets_count += diff_changesets_count; + } +} + +int main(int argc, char* argv[]) { + if (argc != 3) { + std::cerr << "Usage: " << argv[0] << " INDEX_DIR CHANGESET_FILE" << std::endl; + std::exit(1); + } + + std::string index_dir = argv[1]; + std::string changeset_filename = argv[2]; + + osmwayback::TagStore store(index_dir, true); + ChangesetHandler handler(&store); + + std::thread t_progress(report_progress, &store); + + osmium::io::Reader reader{changeset_filename, osmium::osm_entity_bits::changeset}; + osmium::apply(reader, handler); + + stop_progress = true; + t_progress.join(); + store.flush(); +} diff --git a/build_tag_lookup.cpp b/build_tag_lookup.cpp index 34a2955..2ec9697 100644 --- a/build_tag_lookup.cpp +++ b/build_tag_lookup.cpp @@ -1,11 +1,3 @@ -/* - - Build Tag History Database - - (Based on osmium_pub_names example) - -*/ - #include // for std::exit #include // for std::strncmp #include // for std::cout, std::cerr @@ -20,10 +12,10 @@ #include "db.hpp" class TagStoreHandler : public osmium::handler::Handler { - TagStore* m_store; + osmwayback::TagStore* m_store; public: - TagStoreHandler(TagStore* store) : m_store(store) {} + TagStoreHandler(osmwayback::TagStore* store) : m_store(store) {} long node_count = 0; int way_count = 0; int rel_count = 0; @@ -45,7 +37,7 @@ class TagStoreHandler : public osmium::handler::Handler { std::atomic_bool stop_progress{false}; -void report_progress(const TagStore* store) { +void report_progress(const osmwayback::TagStore* store) { unsigned long last_nodes_count{0}; unsigned long last_ways_count{0}; unsigned long last_relations_count{0}; @@ -85,7 +77,7 @@ int main(int argc, char* argv[]) { std::string index_dir = argv[1]; std::string osm_filename = argv[2]; - TagStore store(index_dir, true); + osmwayback::TagStore store(index_dir, false); TagStoreHandler tag_handler(&store); std::thread t_progress(report_progress, &store); diff --git a/db.hpp b/db.hpp index c2a06e6..9bf9fa1 100644 --- a/db.hpp +++ b/db.hpp @@ -18,20 +18,24 @@ #include #include +#include "pbf_encoder.hpp" + +namespace osmwayback { const std::string make_lookup(const int64_t osm_id, const int version){ return std::to_string(osm_id) + "!" + std::to_string(version); } -class TagStore { +class RocksDBStore { + protected: + rocksdb::DB* m_db; rocksdb::ColumnFamilyHandle* m_cf_ways; rocksdb::ColumnFamilyHandle* m_cf_nodes; rocksdb::ColumnFamilyHandle* m_cf_relations; + rocksdb::ColumnFamilyHandle* m_cf_changesets; rocksdb::WriteOptions m_write_options; - rocksdb::WriteBatch m_buffer_batch; - void flush_family(const std::string type, rocksdb::ColumnFamilyHandle* cf) { const auto start = std::chrono::steady_clock::now(); std::cerr << "Flushing " << type << std::endl; @@ -62,21 +66,13 @@ class TagStore { uint64_t relation_keys{0}; m_db->GetIntProperty(m_cf_relations, "rocksdb.estimate-num-keys", &relation_keys); std::cerr << "Stored ~" << relation_keys << "/" << stored_relations_count << " relations" << std::endl; - } - -public: - unsigned long empty_objects_count{0}; - unsigned long stored_tags_count{0}; - - unsigned long stored_nodes_count{0}; - unsigned long stored_ways_count{0}; - unsigned long stored_relations_count{0}; - unsigned long stored_objects_count() { - return stored_nodes_count + stored_ways_count + stored_relations_count; + uint64_t changeset_keys{0}; + m_db->GetIntProperty(m_cf_changesets, "rocksdb.estimate-num-keys", &changeset_keys); + std::cerr << "Stored ~" << changeset_keys << "/" << stored_changesets_count << " changesets" << std::endl; } - TagStore(const std::string index_dir, const bool create) { + RocksDBStore (const std::string index_dir, const bool create) { rocksdb::Options db_options; db_options.allow_mmap_writes = false; db_options.max_background_flushes = 4; @@ -104,6 +100,7 @@ class TagStore { s = m_db->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "ways", &m_cf_ways); assert(s.ok()); s = m_db->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "relations", &m_cf_relations); + s = m_db->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "changesets", &m_cf_changesets); assert(s.ok()); } else { db_options.error_if_exists = false; @@ -118,6 +115,7 @@ class TagStore { column_families.push_back(rocksdb::ColumnFamilyDescriptor( "nodes", rocksdb::ColumnFamilyOptions())); column_families.push_back(rocksdb::ColumnFamilyDescriptor( "ways", rocksdb::ColumnFamilyOptions())); column_families.push_back(rocksdb::ColumnFamilyDescriptor( "relations", rocksdb::ColumnFamilyOptions())); + column_families.push_back(rocksdb::ColumnFamilyDescriptor( "changesets", rocksdb::ColumnFamilyOptions())); std::vector handles; @@ -127,7 +125,31 @@ class TagStore { m_cf_nodes = handles[1]; m_cf_ways = handles[2]; m_cf_relations = handles[3]; + m_cf_changesets = handles[4]; } + + } + + public: + + unsigned long empty_objects_count{0}; + unsigned long stored_tags_count{0}; + + unsigned long stored_nodes_count{0}; + unsigned long stored_ways_count{0}; + unsigned long stored_relations_count{0}; + unsigned long stored_changesets_count{0}; + + unsigned long stored_objects_count() { + return stored_nodes_count + stored_ways_count + stored_relations_count; + } +}; + +class TagStore : public RocksDBStore { + rocksdb::WriteBatch m_buffer_batch; + +public: + TagStore(const std::string index_dir, const bool create) : RocksDBStore(index_dir, create) { } rocksdb::Status get_tags(const int64_t osm_id, const int osm_type, const int version, std::string* json_value) { const auto lookup = make_lookup(osm_id, version); @@ -141,7 +163,52 @@ class TagStore { } } + void lookup_nodes(const osmium::Way& way, const int closed_at) { + const osmium::WayNodeList& node_refs = way.nodes(); + for (const osmium::NodeRef& node_ref : node_refs) { + auto node_id = node_ref.ref(); + + // Find all the versions + int node_version{1}; + for(int v = 1; v < 1000; v++) { + std::string node_json; + + auto read_status = m_db->Get(rocksdb::ReadOptions(), m_cf_nodes, make_lookup(node_id, v), &node_json); + + if(read_status.ok()) { + rapidjson::Document node_doc; + if(!node_doc.Parse<0>(node_json.c_str()).HasParseError()) { + if(node_doc.HasMember("@timestamp")) { + auto ts = node_doc["@timestamp"].GetInt(); + if (ts > closed_at) { + break; + } else { + node_version = v; + } + } + } + } + } + + std::cout << "Found real node version " << node_version << std::endl; + } + } + void store_tags(const osmium::Way& way) { + // Add closed at if found + std::string changeset_json; + auto read_status = m_db->Get(rocksdb::ReadOptions(), m_cf_changesets, std::to_string(way.changeset()), &changeset_json); + if (read_status.ok()) { + rapidjson::Document changeset_doc; + std::cout << changeset_json << std::endl; + if(!changeset_doc.Parse<0>(changeset_json.c_str()).HasParseError()) { + if(changeset_doc.HasMember("@closed_at")) { + auto closed_at = changeset_doc["@closed_at"].GetInt(); + lookup_nodes(way, closed_at); + } + } + } + if(store_tags(way, m_cf_ways)) { stored_ways_count++; } @@ -159,7 +226,26 @@ class TagStore { } } + bool store_changeset(const osmium::Changeset& changeset) { + auto lookup = std::to_string(changeset.id()); + auto value = osmwayback::encode_changeset(changeset); + rocksdb::Status stat = m_buffer_batch.Put(m_cf_changesets, lookup, value); + + stored_changesets_count++; + if (m_buffer_batch.Count() > 1000) { + m_db->Write(m_write_options, &m_buffer_batch); + m_buffer_batch.Clear(); + } + + if (stored_changesets_count != 0 && (stored_changesets_count % 1000000) == 0) { + flush_family("changesets", m_cf_changesets); + report_count_stats(); + } + return true; + } + bool store_tags(const osmium::OSMObject& object, rocksdb::ColumnFamilyHandle* cf) { + const auto lookup = make_lookup(object.id(), object.version()); if (object.tags().empty()) { empty_objects_count++; @@ -171,7 +257,7 @@ class TagStore { rapidjson::Document::AllocatorType& a = doc.GetAllocator(); - doc.AddMember("@timestamp", object.timestamp().to_iso(), a); //ISO is helpful for debugging, but we should leave it + doc.AddMember("@timestamp", static_cast(object.timestamp().seconds_since_epoch()), a); if (object.deleted()){ doc.AddMember("@deleted", object.deleted(), a); } @@ -228,11 +314,15 @@ class TagStore { flush_family("nodes", m_cf_nodes); flush_family("ways", m_cf_ways); flush_family("relations", m_cf_relations); + flush_family("changesets", m_cf_changesets); compact_family("nodes", m_cf_nodes); compact_family("ways", m_cf_ways); compact_family("relations", m_cf_relations); + compact_family("changesets", m_cf_changesets); report_count_stats(); } }; + +} diff --git a/pbf_encoder.hpp b/pbf_encoder.hpp new file mode 100644 index 0000000..95155f3 --- /dev/null +++ b/pbf_encoder.hpp @@ -0,0 +1,64 @@ +#pragma once + +#include +#include + +#include +#include + +namespace osmwayback { + struct Changeset { + uint64_t created_at; + uint64_t closed_at; + std::map tags; + }; + + const std::string encode_changeset(const osmium::Changeset& changeset) { + std::string data; + protozero::pbf_writer encoder(data); + + encoder.add_fixed64(1, static_cast(changeset.created_at().seconds_since_epoch())); + encoder.add_fixed64(2, static_cast(changeset.closed_at().seconds_since_epoch())); + + const osmium::TagList& tags = changeset.tags(); + for (const osmium::Tag& tag : tags) { + encoder.add_string(17, tag.key()); + encoder.add_string(17, tag.value()); + } + + return data; + } + + const Changeset decode_changeset(std::string data) { + protozero::pbf_reader message(data); + Changeset changeset{}; + changeset.tags = std::map{}; + + std::string previous_key{}; + while (message.next()) { + switch (message.tag()) { + case 1: + changeset.created_at = message.get_fixed64(); + break; + case 2: + changeset.closed_at = message.get_fixed64(); + break; + case 17: + changeset.closed_at = message.get_fixed64(); + if (previous_key.empty()) { + previous_key = message.get_string(); + } else { + changeset.tags[previous_key] = message.get_string(); + previous_key = ""; + } + break; + default: + message.skip(); + } + } + + return changeset; + } +} + +