From 3275ea877db358d2b679d15e0efa5e0682b3fbaa Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Fri, 3 Aug 2018 07:01:44 +0530 Subject: [PATCH] Refactor - move more implementation code away from headers. --- include/collection_manager.h | 2 +- include/replicator.h | 212 +---------------- include/store.h | 1 + include/typesense_server_utils.h | 25 ++ src/collection_manager.cpp | 2 +- src/main/typesense_server.cpp | 2 +- src/replicator.cpp | 214 ++++++++++++++++++ .../typesense_server_utils.cpp | 15 +- test/store_test.cpp | 8 +- 9 files changed, 258 insertions(+), 223 deletions(-) create mode 100644 include/typesense_server_utils.h create mode 100644 src/replicator.cpp rename include/typesense_server.h => src/typesense_server_utils.cpp (95%) diff --git a/include/collection_manager.h b/include/collection_manager.h index 3c8e504a..5011cb41 100644 --- a/include/collection_manager.h +++ b/include/collection_manager.h @@ -78,7 +78,7 @@ public: // symlinks Option resolve_symlink(const std::string & symlink_name); - spp::sparse_hash_map get_symlinks(); + spp::sparse_hash_map & get_symlinks(); bool upsert_symlink(const std::string & symlink_name, const std::string & collection_name); diff --git a/include/replicator.h b/include/replicator.h index 6a5bb94c..7b6f27bf 100644 --- a/include/replicator.h +++ b/include/replicator.h @@ -1,14 +1,8 @@ #pragma once -#include "http_client.h" -#include "collection_manager.h" -#include "collection.h" -#include "string_utils.h" -#include -#include #include -#include -#include "logger.h" +#include "http_server.h" +#include "store.h" static constexpr const char* REPLICATION_EVENT_MSG = "replication_event"; @@ -31,208 +25,16 @@ public: } - void Put(const rocksdb::Slice& key, const rocksdb::Slice& value) { - std::vector parts; - StringUtils::split(key.ToString(), parts, "_"); + void Put(const rocksdb::Slice& key, const rocksdb::Slice& value); - std::string val = value.ToString() + "\n"; + void Delete(const rocksdb::Slice& key); - if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_NEXT_SEQ_PREFIX) { - // nothing to do here as this is called only when a new collection is created and it's always "0" - } - - if(parts.size() == 1 && parts[0] == CollectionManager::NEXT_COLLECTION_ID_KEY) { - ReplicationEvent* replication_event = new ReplicationEvent("UPDATE_NEXT_COLLECTION_ID", 0, - key.ToString(), value.ToString()); - server->send_message(REPLICATION_EVENT_MSG, replication_event); - } - - if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_META_PREFIX) { - ReplicationEvent* replication_event = new ReplicationEvent("ADD_COLLECTION_META", 0, - key.ToString(), value.ToString()); - server->send_message(REPLICATION_EVENT_MSG, replication_event); - } - - if(parts.size() == 3 && parts[1] == Collection::SEQ_ID_PREFIX) { - ReplicationEvent* replication_event = new ReplicationEvent("ADD_DOCUMENT", std::stoi(parts[0]), - key.ToString(), value.ToString()); - server->send_message(REPLICATION_EVENT_MSG, replication_event); - } - - if(parts.size() >= 2 && parts[0] == CollectionManager::SYMLINK_PREFIX) { - ReplicationEvent* replication_event = new ReplicationEvent("ADD_SYMLINK", 0, - key.ToString(), value.ToString()); - server->send_message(REPLICATION_EVENT_MSG, replication_event); - } - } - - void Delete(const rocksdb::Slice& key) { - std::vector parts; - StringUtils::split(key.ToString(), parts, "_"); - - if(parts.size() == 3 && parts[1] == Collection::DOC_ID_PREFIX) { - ReplicationEvent* replication_event = new ReplicationEvent("REMOVE_DOCUMENT", 0, key.ToString(), ""); - server->send_message(REPLICATION_EVENT_MSG, replication_event); - } - - if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_META_PREFIX) { - ReplicationEvent* replication_event = new ReplicationEvent("DROP_COLLECTION", 0, key.ToString(), ""); - server->send_message(REPLICATION_EVENT_MSG, replication_event); - } - - if(parts.size() >= 2 && parts[0] == CollectionManager::SYMLINK_PREFIX) { - ReplicationEvent* replication_event = new ReplicationEvent("REMOVE_SYMLINK", 0, - key.ToString(), ""); - server->send_message(REPLICATION_EVENT_MSG, replication_event); - } - - } - - void Merge(const rocksdb::Slice& key, const rocksdb::Slice& value) { - std::vector parts; - StringUtils::split(key.ToString(), parts, "_"); - - if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_NEXT_SEQ_PREFIX) { - ReplicationEvent* replication_event = new ReplicationEvent("INCR_COLLECTION_NEXT_SEQ", 0, - key.ToString(), value.ToString()); - server->send_message(REPLICATION_EVENT_MSG, replication_event); - } - } + void Merge(const rocksdb::Slice& key, const rocksdb::Slice& value); }; class Replicator { public: - static void start(HttpServer* server, const std::string master_host_port, const std::string api_key, Store& store) { - size_t total_runs = 0; + static void start(HttpServer* server, const std::string master_host_port, const std::string api_key, Store& store); - while(true) { - IterateBatchHandler handler(server); - uint64_t latest_seq_num = store.get_latest_seq_number(); - - if(total_runs++ % 20 == 0) { - // roughly every 60 seconds - LOG(INFO) << "Replica's latest sequence number: " << latest_seq_num; - } - - HttpClient client( - master_host_port+"/replication/updates?seq_number="+std::to_string(latest_seq_num+1), api_key - ); - - std::string json_response; - long status_code = client.get_reponse(json_response); - - if(status_code == 200) { - nlohmann::json json_content = nlohmann::json::parse(json_response); - nlohmann::json updates = json_content["updates"]; - - // first write to memory - for (nlohmann::json::iterator update = updates.begin(); update != updates.end(); ++update) { - const std::string update_decoded = StringUtils::base64_decode(*update); - rocksdb::WriteBatch write_batch(update_decoded); - write_batch.Iterate(&handler); - } - - // now write to store - for (nlohmann::json::iterator update = updates.begin(); update != updates.end(); ++update) { - const std::string update_decoded = StringUtils::base64_decode(*update); - rocksdb::WriteBatch write_batch(update_decoded); - store._get_db_unsafe()->Write(rocksdb::WriteOptions(), &write_batch); - } - - if(updates.size() > 0) { - LOG(INFO) << "Replica has consumed " << latest_seq_num+updates.size() << "/" - << json_content["latest_seq_num"] << " updates from master."; - } - - } else { - LOG(ERR) << "Replication error while fetching records from master, status_code=" << status_code; - - if(status_code != 0) { - LOG(ERR) << json_response; - } - } - - std::this_thread::sleep_for(std::chrono::milliseconds(3000)); - } - } - - static void on_replication_event(void *data) { - ReplicationEvent* replication_event = static_cast(data); - - if(replication_event->type == "UPDATE_NEXT_COLLECTION_ID") { - CollectionManager & collection_manager = CollectionManager::get_instance(); - collection_manager.set_next_collection_id(std::stoi(replication_event->value)); - } - - if(replication_event->type == "ADD_COLLECTION_META") { - nlohmann::json collection_meta; - try { - collection_meta = nlohmann::json::parse(replication_event->value); - } catch(...) { - LOG(ERR) << "Failed to parse collection meta JSON."; - LOG(ERR) << "Replication event value: " << replication_event->value; - delete replication_event; - exit(1); - } - - CollectionManager & collection_manager = CollectionManager::get_instance(); - Collection* collection = collection_manager.init_collection(collection_meta, 0); - collection_manager.add_to_collections(collection); - } - - if(replication_event->type == "ADD_DOCUMENT") { - CollectionManager & collection_manager = CollectionManager::get_instance(); - std::vector parts; - StringUtils::split(replication_event->key, parts, "_"); // collection_id, seq_id_prefix, seq_id - Collection* collection = collection_manager.get_collection_with_id(std::stoi(parts[0])); - nlohmann::json document = nlohmann::json::parse(replication_event->value); - - // last 4 bytes of the key would be the serialized version of the sequence id - std::string serialized_seq_id = replication_event->key.substr(replication_event->key.length() - 4); - uint32_t seq_id = StringUtils::deserialize_uint32_t(serialized_seq_id); - collection->index_in_memory(document, seq_id); - } - - if(replication_event->type == "ADD_SYMLINK") { - CollectionManager & collection_manager = CollectionManager::get_instance(); - std::vector parts; - std::string symlink_prefix_key = std::string(CollectionManager::SYMLINK_PREFIX) + "_"; - StringUtils::split(replication_event->key, parts, symlink_prefix_key); // symlink_prefix, symlink_name - std::string & symlink_name = parts[0]; - collection_manager.upsert_symlink(symlink_name, replication_event->value); - } - - if(replication_event->type == "INCR_COLLECTION_NEXT_SEQ") { - CollectionManager & collection_manager = CollectionManager::get_instance(); - const std::string & collection_name = replication_event->key.substr(strlen(Collection::COLLECTION_NEXT_SEQ_PREFIX)+1); - Collection* collection = collection_manager.get_collection(collection_name); - collection->increment_next_seq_id_field(); - } - - if(replication_event->type == "REMOVE_DOCUMENT") { - std::vector parts; - StringUtils::split(replication_event->key, parts, "_"); // collection_id, doc_id_prefix, doc_id - CollectionManager & collection_manager = CollectionManager::get_instance(); - Collection* collection = collection_manager.get_collection_with_id(std::stoi(parts[0])); - collection->remove(parts[2], false); - } - - if(replication_event->type == "REMOVE_SYMLINK") { - CollectionManager & collection_manager = CollectionManager::get_instance(); - std::vector parts; - std::string symlink_prefix_key = std::string(CollectionManager::SYMLINK_PREFIX) + "_"; - StringUtils::split(replication_event->key, parts, symlink_prefix_key); // symlink_prefix, symlink_name - std::string & symlink_name = parts[0]; - collection_manager.delete_symlink(symlink_name); - } - - if(replication_event->type == "DROP_COLLECTION") { - CollectionManager & collection_manager = CollectionManager::get_instance(); - // _ - const std::string & collection_name = replication_event->key.substr(strlen(Collection::COLLECTION_META_PREFIX)+1); - collection_manager.drop_collection(collection_name, false); - } - - delete replication_event; - } + static void on_replication_event(void *data); }; diff --git a/include/store.h b/include/store.h index 809fe50e..fe58fded 100644 --- a/include/store.h +++ b/include/store.h @@ -164,6 +164,7 @@ public: rocksdb::Status status = db->GetUpdatesSince(seq_number, &iter); if(!status.ok()) { + LOG(ERR) << "Error while fetching updates for replication: " << status.ToString(); std::ostringstream error; error << "Unable to fetch updates. " << "Master's latest sequence number is " << local_latest_seq_num; return Option*>(400, error.str()); diff --git a/include/typesense_server_utils.h b/include/typesense_server_utils.h new file mode 100644 index 00000000..13a962dd --- /dev/null +++ b/include/typesense_server_utils.h @@ -0,0 +1,25 @@ +#pragma once + +#include +#include +#include +#include "logger.h" +#include "store.h" +#include "collection_manager.h" +#include "http_server.h" +#include "replicator.h" +#include +#include +#include + +extern HttpServer* server; + +void catch_interrupt(int sig); + +bool directory_exists(const std::string & dir_path); + +void init_cmdline_options(cmdline::parser & options, int argc, char **argv); + +int init_logger(cmdline::parser & options, std::unique_ptr & log_worker); + +int run_server(cmdline::parser & options, void (*master_server_routes)(), void (*replica_server_routes)()); \ No newline at end of file diff --git a/src/collection_manager.cpp b/src/collection_manager.cpp index 4bafcbb6..f49fc241 100644 --- a/src/collection_manager.cpp +++ b/src/collection_manager.cpp @@ -276,7 +276,7 @@ void CollectionManager::set_next_collection_id(uint32_t next_id) { next_collection_id = next_id; } -spp::sparse_hash_map CollectionManager::get_symlinks() { +spp::sparse_hash_map & CollectionManager::get_symlinks() { return collection_symlinks; } diff --git a/src/main/typesense_server.cpp b/src/main/typesense_server.cpp index 10bf456a..e0b5ae79 100644 --- a/src/main/typesense_server.cpp +++ b/src/main/typesense_server.cpp @@ -1,5 +1,5 @@ #include "core_api.h" -#include "typesense_server.h" +#include "typesense_server_utils.h" void master_server_routes() { // collection management diff --git a/src/replicator.cpp b/src/replicator.cpp new file mode 100644 index 00000000..16bdaafe --- /dev/null +++ b/src/replicator.cpp @@ -0,0 +1,214 @@ +#include "replicator.h" +#include +#include "http_client.h" +#include "collection_manager.h" +#include "collection.h" +#include "string_utils.h" +#include +#include +#include "logger.h" + + +void IterateBatchHandler::Put(const rocksdb::Slice& key, const rocksdb::Slice& value) { + std::vector parts; + StringUtils::split(key.ToString(), parts, "_"); + + std::string val = value.ToString() + "\n"; + + if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_NEXT_SEQ_PREFIX) { + // nothing to do here as this is called only when a new collection is created and it's always "0" + } + + if(parts.size() == 1 && parts[0] == CollectionManager::NEXT_COLLECTION_ID_KEY) { + ReplicationEvent* replication_event = new ReplicationEvent("UPDATE_NEXT_COLLECTION_ID", 0, + key.ToString(), value.ToString()); + server->send_message(REPLICATION_EVENT_MSG, replication_event); + } + + if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_META_PREFIX) { + ReplicationEvent* replication_event = new ReplicationEvent("ADD_COLLECTION_META", 0, + key.ToString(), value.ToString()); + server->send_message(REPLICATION_EVENT_MSG, replication_event); + } + + if(parts.size() == 3 && parts[1] == Collection::SEQ_ID_PREFIX) { + ReplicationEvent* replication_event = new ReplicationEvent("ADD_DOCUMENT", std::stoi(parts[0]), + key.ToString(), value.ToString()); + server->send_message(REPLICATION_EVENT_MSG, replication_event); + } + + if(parts.size() >= 2 && parts[0] == CollectionManager::SYMLINK_PREFIX) { + ReplicationEvent* replication_event = new ReplicationEvent("ADD_SYMLINK", 0, + key.ToString(), value.ToString()); + server->send_message(REPLICATION_EVENT_MSG, replication_event); + } +} + +void IterateBatchHandler::Delete(const rocksdb::Slice& key) { + std::vector parts; + StringUtils::split(key.ToString(), parts, "_"); + + if(parts.size() == 3 && parts[1] == Collection::DOC_ID_PREFIX) { + ReplicationEvent* replication_event = new ReplicationEvent("REMOVE_DOCUMENT", 0, key.ToString(), ""); + server->send_message(REPLICATION_EVENT_MSG, replication_event); + } + + if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_META_PREFIX) { + ReplicationEvent* replication_event = new ReplicationEvent("DROP_COLLECTION", 0, key.ToString(), ""); + server->send_message(REPLICATION_EVENT_MSG, replication_event); + } + + if(parts.size() >= 2 && parts[0] == CollectionManager::SYMLINK_PREFIX) { + ReplicationEvent* replication_event = new ReplicationEvent("REMOVE_SYMLINK", 0, + key.ToString(), ""); + server->send_message(REPLICATION_EVENT_MSG, replication_event); + } + +} + +void IterateBatchHandler::Merge(const rocksdb::Slice& key, const rocksdb::Slice& value) { + std::vector parts; + StringUtils::split(key.ToString(), parts, "_"); + + if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_NEXT_SEQ_PREFIX) { + ReplicationEvent* replication_event = new ReplicationEvent("INCR_COLLECTION_NEXT_SEQ", 0, + key.ToString(), value.ToString()); + server->send_message(REPLICATION_EVENT_MSG, replication_event); + } +} + +void Replicator::start(HttpServer* server, const std::string master_host_port, const std::string api_key, Store& store) { + size_t total_runs = 0; + + while(true) { + IterateBatchHandler handler(server); + uint64_t latest_seq_num = store.get_latest_seq_number(); + + if(total_runs++ % 20 == 0) { + // roughly every 60 seconds + LOG(INFO) << "Replica's latest sequence number: " << latest_seq_num; + } + + HttpClient client( + master_host_port+"/replication/updates?seq_number="+std::to_string(latest_seq_num+1), api_key + ); + + std::string json_response; + long status_code = client.get_reponse(json_response); + + if(status_code == 200) { + nlohmann::json json_content = nlohmann::json::parse(json_response); + nlohmann::json updates = json_content["updates"]; + + // first write to memory + for (nlohmann::json::iterator update = updates.begin(); update != updates.end(); ++update) { + const std::string update_decoded = StringUtils::base64_decode(*update); + rocksdb::WriteBatch write_batch(update_decoded); + write_batch.Iterate(&handler); + } + + // now write to store + for (nlohmann::json::iterator update = updates.begin(); update != updates.end(); ++update) { + const std::string update_decoded = StringUtils::base64_decode(*update); + rocksdb::WriteBatch write_batch(update_decoded); + store._get_db_unsafe()->Write(rocksdb::WriteOptions(), &write_batch); + } + + if(updates.size() > 0) { + LOG(INFO) << "Replica has consumed " << store.get_latest_seq_number() << "/" + << json_content["latest_seq_num"] << " updates from master."; + } + + } else { + LOG(ERR) << "Replication error while fetching records from master, status_code=" << status_code + << ", replica's latest sequence number: " << latest_seq_num; + + if(status_code != 0) { + LOG(ERR) << json_response; + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(3000)); + } +} + +void Replicator::on_replication_event(void *data) { + ReplicationEvent* replication_event = static_cast(data); + + if(replication_event->type == "UPDATE_NEXT_COLLECTION_ID") { + CollectionManager & collection_manager = CollectionManager::get_instance(); + collection_manager.set_next_collection_id(std::stoi(replication_event->value)); + } + + if(replication_event->type == "ADD_COLLECTION_META") { + nlohmann::json collection_meta; + try { + collection_meta = nlohmann::json::parse(replication_event->value); + } catch(...) { + LOG(ERR) << "Failed to parse collection meta JSON."; + LOG(ERR) << "Replication event value: " << replication_event->value; + delete replication_event; + exit(1); + } + + CollectionManager & collection_manager = CollectionManager::get_instance(); + Collection* collection = collection_manager.init_collection(collection_meta, 0); + collection_manager.add_to_collections(collection); + } + + if(replication_event->type == "ADD_DOCUMENT") { + CollectionManager & collection_manager = CollectionManager::get_instance(); + std::vector parts; + StringUtils::split(replication_event->key, parts, "_"); // collection_id, seq_id_prefix, seq_id + Collection* collection = collection_manager.get_collection_with_id(std::stoi(parts[0])); + nlohmann::json document = nlohmann::json::parse(replication_event->value); + + // last 4 bytes of the key would be the serialized version of the sequence id + std::string serialized_seq_id = replication_event->key.substr(replication_event->key.length() - 4); + uint32_t seq_id = StringUtils::deserialize_uint32_t(serialized_seq_id); + collection->index_in_memory(document, seq_id); + } + + if(replication_event->type == "ADD_SYMLINK") { + CollectionManager & collection_manager = CollectionManager::get_instance(); + std::vector parts; + std::string symlink_prefix_key = std::string(CollectionManager::SYMLINK_PREFIX) + "_"; + StringUtils::split(replication_event->key, parts, symlink_prefix_key); // symlink_prefix, symlink_name + std::string & symlink_name = parts[0]; + spp::sparse_hash_map & simlinks = collection_manager.get_symlinks(); + simlinks[symlink_name] = replication_event->value; + } + + if(replication_event->type == "INCR_COLLECTION_NEXT_SEQ") { + CollectionManager & collection_manager = CollectionManager::get_instance(); + const std::string & collection_name = replication_event->key.substr(strlen(Collection::COLLECTION_NEXT_SEQ_PREFIX)+1); + Collection* collection = collection_manager.get_collection(collection_name); + collection->increment_next_seq_id_field(); + } + + if(replication_event->type == "REMOVE_DOCUMENT") { + std::vector parts; + StringUtils::split(replication_event->key, parts, "_"); // collection_id, doc_id_prefix, doc_id + CollectionManager & collection_manager = CollectionManager::get_instance(); + Collection* collection = collection_manager.get_collection_with_id(std::stoi(parts[0])); + collection->remove(parts[2], false); + } + + if(replication_event->type == "REMOVE_SYMLINK") { + CollectionManager & collection_manager = CollectionManager::get_instance(); + std::vector parts; + std::string symlink_prefix_key = std::string(CollectionManager::SYMLINK_PREFIX) + "_"; + StringUtils::split(replication_event->key, parts, symlink_prefix_key); // symlink_prefix, symlink_name + std::string & symlink_name = parts[0]; + collection_manager.delete_symlink(symlink_name); + } + + if(replication_event->type == "DROP_COLLECTION") { + CollectionManager & collection_manager = CollectionManager::get_instance(); + // _ + const std::string & collection_name = replication_event->key.substr(strlen(Collection::COLLECTION_META_PREFIX)+1); + collection_manager.drop_collection(collection_name, false); + } + + delete replication_event; +} diff --git a/include/typesense_server.h b/src/typesense_server_utils.cpp similarity index 95% rename from include/typesense_server.h rename to src/typesense_server_utils.cpp index 8bcd356b..6ba769a9 100644 --- a/include/typesense_server.h +++ b/src/typesense_server_utils.cpp @@ -1,16 +1,5 @@ -#pragma once - -#include -#include -#include -#include "logger.h" -#include "store.h" -#include "collection_manager.h" -#include "http_server.h" -#include "replicator.h" -#include -#include -#include +#include "core_api.h" +#include "typesense_server_utils.h" HttpServer* server; diff --git a/test/store_test.cpp b/test/store_test.cpp index 7e96e9ce..9dbb7528 100644 --- a/test/store_test.cpp +++ b/test/store_test.cpp @@ -26,8 +26,8 @@ TEST(StoreTest, GetUpdatesSince) { ASSERT_EQ(0, primary_store.get_latest_seq_number()); delete updates_op.get(); - // querying for a seq_num > 0 on a fresh store - updates_op = primary_store.get_updates_since(10, 10); + // querying for a seq_num > 1 on a fresh store + updates_op = primary_store.get_updates_since(2, 10); ASSERT_FALSE(updates_op.ok()); ASSERT_EQ("Unable to fetch updates. Master's latest sequence number is 0", updates_op.error()); @@ -53,6 +53,10 @@ TEST(StoreTest, GetUpdatesSince) { ASSERT_EQ(3, updates_op.get()->size()); delete updates_op.get(); + updates_op = primary_store.get_updates_since(1, 10); + ASSERT_EQ(3, updates_op.get()->size()); + delete updates_op.get(); + updates_op = primary_store.get_updates_since(3, 10); ASSERT_EQ(1, updates_op.get()->size()); delete updates_op.get();