#pragma once #include "http_client.h" #include "collection_manager.h" #include "collection.h" #include "string_utils.h" #include #include #include #include struct ReplicationEvent { std::string type; std::string key; std::string value; ReplicationEvent(const std::string& type, const uint32_t collection_id, const std::string& key, const std::string& value): type(type), key(key), value(value) { } }; class IterateBatchHandler: public rocksdb::WriteBatch::Handler { private: HttpServer* server; public: IterateBatchHandler(HttpServer* server): server(server) { } void Put(const rocksdb::Slice& key, const rocksdb::Slice& value) { CollectionManager & collection_manager = CollectionManager::get_instance(); std::vector parts; StringUtils::split(key.ToString(), parts, "_"); std::string val = value.ToString() + "\n"; if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_NEXT_SEQ_PREFIX) { // nothing to do here as this is called only when a new collection is created and it's always "0" } if(parts.size() == 1 && parts[0] == CollectionManager::NEXT_COLLECTION_ID_KEY) { ReplicationEvent* replication_event = new ReplicationEvent("UPDATE_NEXT_COLLECTION_ID", 0, key.ToString(), value.ToString()); server->send_message(REPLICATION_EVENT_MSG, replication_event); } if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_META_PREFIX) { ReplicationEvent* replication_event = new ReplicationEvent("ADD_COLLECTION_META", 0, key.ToString(), value.ToString()); server->send_message(REPLICATION_EVENT_MSG, replication_event); } if(parts.size() == 3 && parts[1] == Collection::SEQ_ID_PREFIX) { ReplicationEvent* replication_event = new ReplicationEvent("ADD_DOCUMENT", std::stoi(parts[0]), key.ToString(), value.ToString()); server->send_message(REPLICATION_EVENT_MSG, replication_event); } } void Delete(const rocksdb::Slice& key) { std::vector parts; StringUtils::split(key.ToString(), parts, "_"); if(parts.size() == 3 && parts[1] == Collection::DOC_ID_PREFIX) { ReplicationEvent* replication_event = new ReplicationEvent("REMOVE_DOCUMENT", 0, key.ToString(), ""); server->send_message(REPLICATION_EVENT_MSG, replication_event); } if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_META_PREFIX) { ReplicationEvent* replication_event = new ReplicationEvent("DROP_COLLECTION", 0, key.ToString(), ""); server->send_message(REPLICATION_EVENT_MSG, replication_event); } } void Merge(const rocksdb::Slice& key, const rocksdb::Slice& value) { std::vector parts; StringUtils::split(key.ToString(), parts, "_"); if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_NEXT_SEQ_PREFIX) { ReplicationEvent* replication_event = new ReplicationEvent("INCR_COLLECTION_NEXT_SEQ", 0, key.ToString(), value.ToString()); server->send_message(REPLICATION_EVENT_MSG, replication_event); } } }; class Replicator { public: static void start(HttpServer* server, const std::string& master_host_port, Store& store) { while(true) { IterateBatchHandler handler(server); uint64_t latest_seq_num = store.get_latest_seq_number(); std::cout << "latest_seq_num: " << latest_seq_num << std::endl; HttpClient client(master_host_port+"/replication/updates?seq_number="+std::to_string(latest_seq_num+1)); std::string json_response; long status_code = client.get_reponse(json_response); if(status_code == 200) { nlohmann::json json_content = nlohmann::json::parse(json_response); nlohmann::json updates = json_content["updates"]; for (nlohmann::json::iterator update = updates.begin(); update != updates.end(); ++update) { const std::string update_decoded = StringUtils::base64_decode(*update); rocksdb::WriteBatch write_batch(update_decoded); write_batch.Iterate(&handler); } for (nlohmann::json::iterator update = updates.begin(); update != updates.end(); ++update) { const std::string update_decoded = StringUtils::base64_decode(*update); rocksdb::WriteBatch write_batch(update_decoded); store._get_db_unsafe()->Write(rocksdb::WriteOptions(), &write_batch); } } std::this_thread::sleep_for(std::chrono::milliseconds(3000)); } } static void on_replication_event(void *data) { ReplicationEvent* replication_event = static_cast(data); if(replication_event->type == "UPDATE_NEXT_COLLECTION_ID") { CollectionManager & collection_manager = CollectionManager::get_instance(); collection_manager.set_next_collection_id(std::stoi(replication_event->value)); } if(replication_event->type == "ADD_COLLECTION_META") { nlohmann::json collection_meta; try { collection_meta = nlohmann::json::parse(replication_event->value); } catch(...) { std::cerr << "Failed to parse collection meta JSON." << std::endl; std::cerr << "Replication event value: " << replication_event->value << std::endl; delete replication_event; exit(1); } CollectionManager & collection_manager = CollectionManager::get_instance(); Collection* collection = collection_manager.init_collection(replication_event->value, 0); collection_manager.add_to_collections(collection); } if(replication_event->type == "ADD_DOCUMENT") { CollectionManager & collection_manager = CollectionManager::get_instance(); std::vector parts; StringUtils::split(replication_event->key, parts, "_"); // collection_id, seq_id_prefix, seq_id Collection* collection = collection_manager.get_collection_with_id(std::stoi(parts[0])); nlohmann::json document = nlohmann::json::parse(replication_event->value); // last 4 bytes of the key would be the serialized version of the sequence id std::string serialized_seq_id = replication_event->key.substr(replication_event->key.length() - 4); uint32_t seq_id = Collection::deserialize_seq_id_key(serialized_seq_id); collection->index_in_memory(document, seq_id); } if(replication_event->type == "INCR_COLLECTION_NEXT_SEQ") { CollectionManager & collection_manager = CollectionManager::get_instance(); const std::string & collection_name = replication_event->key.substr(strlen(Collection::COLLECTION_NEXT_SEQ_PREFIX)+1); Collection* collection = collection_manager.get_collection(collection_name); collection->increment_next_seq_id_field(); } if(replication_event->type == "REMOVE_DOCUMENT") { std::vector parts; StringUtils::split(replication_event->key, parts, "_"); // collection_id, doc_id_prefix, doc_id CollectionManager & collection_manager = CollectionManager::get_instance(); Collection* collection = collection_manager.get_collection_with_id(std::stoi(parts[0])); collection->remove(parts[2], false); } if(replication_event->type == "DROP_COLLECTION") { CollectionManager & collection_manager = CollectionManager::get_instance(); // _ const std::string & collection_name = replication_event->key.substr(strlen(Collection::COLLECTION_META_PREFIX)+1); collection_manager.drop_collection(collection_name, false); } delete replication_event; } };