Refactor - move more implementation code away from headers.

This commit is contained in:
Kishore Nallan 2018-08-03 07:01:44 +05:30
parent ba68cd0ac5
commit 3275ea877d
9 changed files with 258 additions and 223 deletions

View File

@ -78,7 +78,7 @@ public:
// symlinks
Option<std::string> resolve_symlink(const std::string & symlink_name);
spp::sparse_hash_map<std::string, std::string> get_symlinks();
spp::sparse_hash_map<std::string, std::string> & get_symlinks();
bool upsert_symlink(const std::string & symlink_name, const std::string & collection_name);

View File

@ -1,14 +1,8 @@
#pragma once
#include "http_client.h"
#include "collection_manager.h"
#include "collection.h"
#include "string_utils.h"
#include <json.hpp>
#include <thread>
#include <string>
#include <iostream>
#include "logger.h"
#include "http_server.h"
#include "store.h"
static constexpr const char* REPLICATION_EVENT_MSG = "replication_event";
@ -31,208 +25,16 @@ public:
}
void Put(const rocksdb::Slice& key, const rocksdb::Slice& value) {
std::vector<std::string> parts;
StringUtils::split(key.ToString(), parts, "_");
void Put(const rocksdb::Slice& key, const rocksdb::Slice& value);
std::string val = value.ToString() + "\n";
void Delete(const rocksdb::Slice& key);
if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_NEXT_SEQ_PREFIX) {
// nothing to do here as this is called only when a new collection is created and it's always "0"
}
if(parts.size() == 1 && parts[0] == CollectionManager::NEXT_COLLECTION_ID_KEY) {
ReplicationEvent* replication_event = new ReplicationEvent("UPDATE_NEXT_COLLECTION_ID", 0,
key.ToString(), value.ToString());
server->send_message(REPLICATION_EVENT_MSG, replication_event);
}
if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_META_PREFIX) {
ReplicationEvent* replication_event = new ReplicationEvent("ADD_COLLECTION_META", 0,
key.ToString(), value.ToString());
server->send_message(REPLICATION_EVENT_MSG, replication_event);
}
if(parts.size() == 3 && parts[1] == Collection::SEQ_ID_PREFIX) {
ReplicationEvent* replication_event = new ReplicationEvent("ADD_DOCUMENT", std::stoi(parts[0]),
key.ToString(), value.ToString());
server->send_message(REPLICATION_EVENT_MSG, replication_event);
}
if(parts.size() >= 2 && parts[0] == CollectionManager::SYMLINK_PREFIX) {
ReplicationEvent* replication_event = new ReplicationEvent("ADD_SYMLINK", 0,
key.ToString(), value.ToString());
server->send_message(REPLICATION_EVENT_MSG, replication_event);
}
}
void Delete(const rocksdb::Slice& key) {
std::vector<std::string> parts;
StringUtils::split(key.ToString(), parts, "_");
if(parts.size() == 3 && parts[1] == Collection::DOC_ID_PREFIX) {
ReplicationEvent* replication_event = new ReplicationEvent("REMOVE_DOCUMENT", 0, key.ToString(), "");
server->send_message(REPLICATION_EVENT_MSG, replication_event);
}
if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_META_PREFIX) {
ReplicationEvent* replication_event = new ReplicationEvent("DROP_COLLECTION", 0, key.ToString(), "");
server->send_message(REPLICATION_EVENT_MSG, replication_event);
}
if(parts.size() >= 2 && parts[0] == CollectionManager::SYMLINK_PREFIX) {
ReplicationEvent* replication_event = new ReplicationEvent("REMOVE_SYMLINK", 0,
key.ToString(), "");
server->send_message(REPLICATION_EVENT_MSG, replication_event);
}
}
void Merge(const rocksdb::Slice& key, const rocksdb::Slice& value) {
std::vector<std::string> parts;
StringUtils::split(key.ToString(), parts, "_");
if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_NEXT_SEQ_PREFIX) {
ReplicationEvent* replication_event = new ReplicationEvent("INCR_COLLECTION_NEXT_SEQ", 0,
key.ToString(), value.ToString());
server->send_message(REPLICATION_EVENT_MSG, replication_event);
}
}
void Merge(const rocksdb::Slice& key, const rocksdb::Slice& value);
};
class Replicator {
public:
static void start(HttpServer* server, const std::string master_host_port, const std::string api_key, Store& store) {
size_t total_runs = 0;
static void start(HttpServer* server, const std::string master_host_port, const std::string api_key, Store& store);
while(true) {
IterateBatchHandler handler(server);
uint64_t latest_seq_num = store.get_latest_seq_number();
if(total_runs++ % 20 == 0) {
// roughly every 60 seconds
LOG(INFO) << "Replica's latest sequence number: " << latest_seq_num;
}
HttpClient client(
master_host_port+"/replication/updates?seq_number="+std::to_string(latest_seq_num+1), api_key
);
std::string json_response;
long status_code = client.get_reponse(json_response);
if(status_code == 200) {
nlohmann::json json_content = nlohmann::json::parse(json_response);
nlohmann::json updates = json_content["updates"];
// first write to memory
for (nlohmann::json::iterator update = updates.begin(); update != updates.end(); ++update) {
const std::string update_decoded = StringUtils::base64_decode(*update);
rocksdb::WriteBatch write_batch(update_decoded);
write_batch.Iterate(&handler);
}
// now write to store
for (nlohmann::json::iterator update = updates.begin(); update != updates.end(); ++update) {
const std::string update_decoded = StringUtils::base64_decode(*update);
rocksdb::WriteBatch write_batch(update_decoded);
store._get_db_unsafe()->Write(rocksdb::WriteOptions(), &write_batch);
}
if(updates.size() > 0) {
LOG(INFO) << "Replica has consumed " << latest_seq_num+updates.size() << "/"
<< json_content["latest_seq_num"] << " updates from master.";
}
} else {
LOG(ERR) << "Replication error while fetching records from master, status_code=" << status_code;
if(status_code != 0) {
LOG(ERR) << json_response;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
}
}
static void on_replication_event(void *data) {
ReplicationEvent* replication_event = static_cast<ReplicationEvent*>(data);
if(replication_event->type == "UPDATE_NEXT_COLLECTION_ID") {
CollectionManager & collection_manager = CollectionManager::get_instance();
collection_manager.set_next_collection_id(std::stoi(replication_event->value));
}
if(replication_event->type == "ADD_COLLECTION_META") {
nlohmann::json collection_meta;
try {
collection_meta = nlohmann::json::parse(replication_event->value);
} catch(...) {
LOG(ERR) << "Failed to parse collection meta JSON.";
LOG(ERR) << "Replication event value: " << replication_event->value;
delete replication_event;
exit(1);
}
CollectionManager & collection_manager = CollectionManager::get_instance();
Collection* collection = collection_manager.init_collection(collection_meta, 0);
collection_manager.add_to_collections(collection);
}
if(replication_event->type == "ADD_DOCUMENT") {
CollectionManager & collection_manager = CollectionManager::get_instance();
std::vector<std::string> parts;
StringUtils::split(replication_event->key, parts, "_"); // collection_id, seq_id_prefix, seq_id
Collection* collection = collection_manager.get_collection_with_id(std::stoi(parts[0]));
nlohmann::json document = nlohmann::json::parse(replication_event->value);
// last 4 bytes of the key would be the serialized version of the sequence id
std::string serialized_seq_id = replication_event->key.substr(replication_event->key.length() - 4);
uint32_t seq_id = StringUtils::deserialize_uint32_t(serialized_seq_id);
collection->index_in_memory(document, seq_id);
}
if(replication_event->type == "ADD_SYMLINK") {
CollectionManager & collection_manager = CollectionManager::get_instance();
std::vector<std::string> parts;
std::string symlink_prefix_key = std::string(CollectionManager::SYMLINK_PREFIX) + "_";
StringUtils::split(replication_event->key, parts, symlink_prefix_key); // symlink_prefix, symlink_name
std::string & symlink_name = parts[0];
collection_manager.upsert_symlink(symlink_name, replication_event->value);
}
if(replication_event->type == "INCR_COLLECTION_NEXT_SEQ") {
CollectionManager & collection_manager = CollectionManager::get_instance();
const std::string & collection_name = replication_event->key.substr(strlen(Collection::COLLECTION_NEXT_SEQ_PREFIX)+1);
Collection* collection = collection_manager.get_collection(collection_name);
collection->increment_next_seq_id_field();
}
if(replication_event->type == "REMOVE_DOCUMENT") {
std::vector<std::string> parts;
StringUtils::split(replication_event->key, parts, "_"); // collection_id, doc_id_prefix, doc_id
CollectionManager & collection_manager = CollectionManager::get_instance();
Collection* collection = collection_manager.get_collection_with_id(std::stoi(parts[0]));
collection->remove(parts[2], false);
}
if(replication_event->type == "REMOVE_SYMLINK") {
CollectionManager & collection_manager = CollectionManager::get_instance();
std::vector<std::string> parts;
std::string symlink_prefix_key = std::string(CollectionManager::SYMLINK_PREFIX) + "_";
StringUtils::split(replication_event->key, parts, symlink_prefix_key); // symlink_prefix, symlink_name
std::string & symlink_name = parts[0];
collection_manager.delete_symlink(symlink_name);
}
if(replication_event->type == "DROP_COLLECTION") {
CollectionManager & collection_manager = CollectionManager::get_instance();
// <collection_meta_prefix>_<collection_name>
const std::string & collection_name = replication_event->key.substr(strlen(Collection::COLLECTION_META_PREFIX)+1);
collection_manager.drop_collection(collection_name, false);
}
delete replication_event;
}
static void on_replication_event(void *data);
};

View File

@ -164,6 +164,7 @@ public:
rocksdb::Status status = db->GetUpdatesSince(seq_number, &iter);
if(!status.ok()) {
LOG(ERR) << "Error while fetching updates for replication: " << status.ToString();
std::ostringstream error;
error << "Unable to fetch updates. " << "Master's latest sequence number is " << local_latest_seq_num;
return Option<std::vector<std::string>*>(400, error.str());

View File

@ -0,0 +1,25 @@
#pragma once
#include <string>
#include <iostream>
#include <cmdline.h>
#include "logger.h"
#include "store.h"
#include "collection_manager.h"
#include "http_server.h"
#include "replicator.h"
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
extern HttpServer* server;
void catch_interrupt(int sig);
bool directory_exists(const std::string & dir_path);
void init_cmdline_options(cmdline::parser & options, int argc, char **argv);
int init_logger(cmdline::parser & options, std::unique_ptr<g3::LogWorker> & log_worker);
int run_server(cmdline::parser & options, void (*master_server_routes)(), void (*replica_server_routes)());

View File

@ -276,7 +276,7 @@ void CollectionManager::set_next_collection_id(uint32_t next_id) {
next_collection_id = next_id;
}
spp::sparse_hash_map<std::string, std::string> CollectionManager::get_symlinks() {
spp::sparse_hash_map<std::string, std::string> & CollectionManager::get_symlinks() {
return collection_symlinks;
}

View File

@ -1,5 +1,5 @@
#include "core_api.h"
#include "typesense_server.h"
#include "typesense_server_utils.h"
void master_server_routes() {
// collection management

214
src/replicator.cpp Normal file
View File

@ -0,0 +1,214 @@
#include "replicator.h"
#include <iostream>
#include "http_client.h"
#include "collection_manager.h"
#include "collection.h"
#include "string_utils.h"
#include <json.hpp>
#include <thread>
#include "logger.h"
void IterateBatchHandler::Put(const rocksdb::Slice& key, const rocksdb::Slice& value) {
std::vector<std::string> parts;
StringUtils::split(key.ToString(), parts, "_");
std::string val = value.ToString() + "\n";
if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_NEXT_SEQ_PREFIX) {
// nothing to do here as this is called only when a new collection is created and it's always "0"
}
if(parts.size() == 1 && parts[0] == CollectionManager::NEXT_COLLECTION_ID_KEY) {
ReplicationEvent* replication_event = new ReplicationEvent("UPDATE_NEXT_COLLECTION_ID", 0,
key.ToString(), value.ToString());
server->send_message(REPLICATION_EVENT_MSG, replication_event);
}
if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_META_PREFIX) {
ReplicationEvent* replication_event = new ReplicationEvent("ADD_COLLECTION_META", 0,
key.ToString(), value.ToString());
server->send_message(REPLICATION_EVENT_MSG, replication_event);
}
if(parts.size() == 3 && parts[1] == Collection::SEQ_ID_PREFIX) {
ReplicationEvent* replication_event = new ReplicationEvent("ADD_DOCUMENT", std::stoi(parts[0]),
key.ToString(), value.ToString());
server->send_message(REPLICATION_EVENT_MSG, replication_event);
}
if(parts.size() >= 2 && parts[0] == CollectionManager::SYMLINK_PREFIX) {
ReplicationEvent* replication_event = new ReplicationEvent("ADD_SYMLINK", 0,
key.ToString(), value.ToString());
server->send_message(REPLICATION_EVENT_MSG, replication_event);
}
}
void IterateBatchHandler::Delete(const rocksdb::Slice& key) {
std::vector<std::string> parts;
StringUtils::split(key.ToString(), parts, "_");
if(parts.size() == 3 && parts[1] == Collection::DOC_ID_PREFIX) {
ReplicationEvent* replication_event = new ReplicationEvent("REMOVE_DOCUMENT", 0, key.ToString(), "");
server->send_message(REPLICATION_EVENT_MSG, replication_event);
}
if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_META_PREFIX) {
ReplicationEvent* replication_event = new ReplicationEvent("DROP_COLLECTION", 0, key.ToString(), "");
server->send_message(REPLICATION_EVENT_MSG, replication_event);
}
if(parts.size() >= 2 && parts[0] == CollectionManager::SYMLINK_PREFIX) {
ReplicationEvent* replication_event = new ReplicationEvent("REMOVE_SYMLINK", 0,
key.ToString(), "");
server->send_message(REPLICATION_EVENT_MSG, replication_event);
}
}
void IterateBatchHandler::Merge(const rocksdb::Slice& key, const rocksdb::Slice& value) {
std::vector<std::string> parts;
StringUtils::split(key.ToString(), parts, "_");
if(parts.size() >= 2 && parts[0] == Collection::COLLECTION_NEXT_SEQ_PREFIX) {
ReplicationEvent* replication_event = new ReplicationEvent("INCR_COLLECTION_NEXT_SEQ", 0,
key.ToString(), value.ToString());
server->send_message(REPLICATION_EVENT_MSG, replication_event);
}
}
void Replicator::start(HttpServer* server, const std::string master_host_port, const std::string api_key, Store& store) {
size_t total_runs = 0;
while(true) {
IterateBatchHandler handler(server);
uint64_t latest_seq_num = store.get_latest_seq_number();
if(total_runs++ % 20 == 0) {
// roughly every 60 seconds
LOG(INFO) << "Replica's latest sequence number: " << latest_seq_num;
}
HttpClient client(
master_host_port+"/replication/updates?seq_number="+std::to_string(latest_seq_num+1), api_key
);
std::string json_response;
long status_code = client.get_reponse(json_response);
if(status_code == 200) {
nlohmann::json json_content = nlohmann::json::parse(json_response);
nlohmann::json updates = json_content["updates"];
// first write to memory
for (nlohmann::json::iterator update = updates.begin(); update != updates.end(); ++update) {
const std::string update_decoded = StringUtils::base64_decode(*update);
rocksdb::WriteBatch write_batch(update_decoded);
write_batch.Iterate(&handler);
}
// now write to store
for (nlohmann::json::iterator update = updates.begin(); update != updates.end(); ++update) {
const std::string update_decoded = StringUtils::base64_decode(*update);
rocksdb::WriteBatch write_batch(update_decoded);
store._get_db_unsafe()->Write(rocksdb::WriteOptions(), &write_batch);
}
if(updates.size() > 0) {
LOG(INFO) << "Replica has consumed " << store.get_latest_seq_number() << "/"
<< json_content["latest_seq_num"] << " updates from master.";
}
} else {
LOG(ERR) << "Replication error while fetching records from master, status_code=" << status_code
<< ", replica's latest sequence number: " << latest_seq_num;
if(status_code != 0) {
LOG(ERR) << json_response;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
}
}
void Replicator::on_replication_event(void *data) {
ReplicationEvent* replication_event = static_cast<ReplicationEvent*>(data);
if(replication_event->type == "UPDATE_NEXT_COLLECTION_ID") {
CollectionManager & collection_manager = CollectionManager::get_instance();
collection_manager.set_next_collection_id(std::stoi(replication_event->value));
}
if(replication_event->type == "ADD_COLLECTION_META") {
nlohmann::json collection_meta;
try {
collection_meta = nlohmann::json::parse(replication_event->value);
} catch(...) {
LOG(ERR) << "Failed to parse collection meta JSON.";
LOG(ERR) << "Replication event value: " << replication_event->value;
delete replication_event;
exit(1);
}
CollectionManager & collection_manager = CollectionManager::get_instance();
Collection* collection = collection_manager.init_collection(collection_meta, 0);
collection_manager.add_to_collections(collection);
}
if(replication_event->type == "ADD_DOCUMENT") {
CollectionManager & collection_manager = CollectionManager::get_instance();
std::vector<std::string> parts;
StringUtils::split(replication_event->key, parts, "_"); // collection_id, seq_id_prefix, seq_id
Collection* collection = collection_manager.get_collection_with_id(std::stoi(parts[0]));
nlohmann::json document = nlohmann::json::parse(replication_event->value);
// last 4 bytes of the key would be the serialized version of the sequence id
std::string serialized_seq_id = replication_event->key.substr(replication_event->key.length() - 4);
uint32_t seq_id = StringUtils::deserialize_uint32_t(serialized_seq_id);
collection->index_in_memory(document, seq_id);
}
if(replication_event->type == "ADD_SYMLINK") {
CollectionManager & collection_manager = CollectionManager::get_instance();
std::vector<std::string> parts;
std::string symlink_prefix_key = std::string(CollectionManager::SYMLINK_PREFIX) + "_";
StringUtils::split(replication_event->key, parts, symlink_prefix_key); // symlink_prefix, symlink_name
std::string & symlink_name = parts[0];
spp::sparse_hash_map<std::string, std::string> & simlinks = collection_manager.get_symlinks();
simlinks[symlink_name] = replication_event->value;
}
if(replication_event->type == "INCR_COLLECTION_NEXT_SEQ") {
CollectionManager & collection_manager = CollectionManager::get_instance();
const std::string & collection_name = replication_event->key.substr(strlen(Collection::COLLECTION_NEXT_SEQ_PREFIX)+1);
Collection* collection = collection_manager.get_collection(collection_name);
collection->increment_next_seq_id_field();
}
if(replication_event->type == "REMOVE_DOCUMENT") {
std::vector<std::string> parts;
StringUtils::split(replication_event->key, parts, "_"); // collection_id, doc_id_prefix, doc_id
CollectionManager & collection_manager = CollectionManager::get_instance();
Collection* collection = collection_manager.get_collection_with_id(std::stoi(parts[0]));
collection->remove(parts[2], false);
}
if(replication_event->type == "REMOVE_SYMLINK") {
CollectionManager & collection_manager = CollectionManager::get_instance();
std::vector<std::string> parts;
std::string symlink_prefix_key = std::string(CollectionManager::SYMLINK_PREFIX) + "_";
StringUtils::split(replication_event->key, parts, symlink_prefix_key); // symlink_prefix, symlink_name
std::string & symlink_name = parts[0];
collection_manager.delete_symlink(symlink_name);
}
if(replication_event->type == "DROP_COLLECTION") {
CollectionManager & collection_manager = CollectionManager::get_instance();
// <collection_meta_prefix>_<collection_name>
const std::string & collection_name = replication_event->key.substr(strlen(Collection::COLLECTION_META_PREFIX)+1);
collection_manager.drop_collection(collection_name, false);
}
delete replication_event;
}

View File

@ -1,16 +1,5 @@
#pragma once
#include <string>
#include <iostream>
#include <cmdline.h>
#include "logger.h"
#include "store.h"
#include "collection_manager.h"
#include "http_server.h"
#include "replicator.h"
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
#include "core_api.h"
#include "typesense_server_utils.h"
HttpServer* server;

View File

@ -26,8 +26,8 @@ TEST(StoreTest, GetUpdatesSince) {
ASSERT_EQ(0, primary_store.get_latest_seq_number());
delete updates_op.get();
// querying for a seq_num > 0 on a fresh store
updates_op = primary_store.get_updates_since(10, 10);
// querying for a seq_num > 1 on a fresh store
updates_op = primary_store.get_updates_since(2, 10);
ASSERT_FALSE(updates_op.ok());
ASSERT_EQ("Unable to fetch updates. Master's latest sequence number is 0", updates_op.error());
@ -53,6 +53,10 @@ TEST(StoreTest, GetUpdatesSince) {
ASSERT_EQ(3, updates_op.get()->size());
delete updates_op.get();
updates_op = primary_store.get_updates_since(1, 10);
ASSERT_EQ(3, updates_op.get()->size());
delete updates_op.get();
updates_op = primary_store.get_updates_since(3, 10);
ASSERT_EQ(1, updates_op.get()->size());
delete updates_op.get();