Basic write replication is working.

This commit is contained in:
kishorenc 2020-03-11 22:31:17 +05:30
parent a975f39685
commit b81ce1062d
10 changed files with 88 additions and 74 deletions

View File

@ -88,7 +88,7 @@ public:
this->indices_per_collection = indices_per_collection;
}
void set_raft_port(int v) {
void set_raft_port(int raft_port) {
this->raft_port = raft_port;
}

View File

@ -32,4 +32,6 @@ bool get_replication_updates(http_req &req, http_res &res);
bool collection_export_handler(http_req* req, http_res* res, void* data);
bool async_write_request(void *data);
static constexpr const char* SEND_RESPONSE_MSG = "send_response";

View File

@ -4,6 +4,7 @@
#include <string>
#include <map>
#include <vector>
#include <future>
#include "json.hpp"
#define H2O_USE_LIBUV 0
@ -89,20 +90,20 @@ struct http_res {
struct http_req {
h2o_req_t* _req;
std::string http_method;
std::string path_without_query;
int route_index;
std::map<std::string, std::string> params;
std::string body;
http_req() {}
http_req(): route_index(-1) {}
http_req(h2o_req_t* _req, const std::string & http_method, const std::string & path_without_query,
http_req(h2o_req_t* _req, const std::string & http_method, size_t route_index,
const std::map<std::string, std::string> & params,
std::string body): _req(_req), http_method(http_method), path_without_query(path_without_query),
std::string body): _req(_req), http_method(http_method), route_index(route_index),
params(params), body(body) {}
void deserialize(const std::string& serialized_content) {
nlohmann::json content = nlohmann::json::parse(serialized_content);
path_without_query = content["path"];
route_index = content["route_index"];
body = content["body"];
for (nlohmann::json::iterator it = content["params"].begin(); it != content["params"].end(); ++it) {
@ -114,7 +115,7 @@ struct http_req {
std::string serialize() const {
nlohmann::json content;
content["path"] = path_without_query;
content["route_index"] = route_index;
content["params"] = params;
content["body"] = body;
@ -197,4 +198,10 @@ struct http_message_dispatcher {
void on(const std::string & message, bool (*handler)(void*)) {
message_handlers.emplace(message, handler);
}
};
};
struct AsyncIndexArg {
http_req* req;
http_res* res;
std::promise<bool>* promise;
};

View File

@ -90,9 +90,11 @@ public:
void send_response(http_req* request, const http_res* response);
bool find_route(const std::vector<std::string> & path_parts, const std::string & http_method,
int find_route(const std::vector<std::string> & path_parts, const std::string & http_method,
route_path** found_rpath);
void get_route(size_t index, route_path** found_rpath);
int run(ReplicationState* replication_state);
void stop();

View File

@ -7,6 +7,7 @@
#include <braft/util.h> // braft::AsyncClosureGuard
#include <braft/protobuf_file.h> // braft::ProtoBufFile
#include <rocksdb/db.h>
#include <future>
#include "http_data.h"
@ -14,14 +15,11 @@
// Implements the callback for the state machine
class ReplicationClosure : public braft::Closure {
private:
http_message_dispatcher* message_dispatcher;
http_req* request;
http_res* response;
public:
ReplicationClosure(http_message_dispatcher* message_dispatcher,
http_req* request, http_res* response): message_dispatcher(message_dispatcher),
request(request), response(response) {
ReplicationClosure(http_req* request, http_res* response): request(request), response(response) {
}
@ -111,8 +109,6 @@ public:
static constexpr const char* REPLICATION_MSG = "raft_replication";
static bool on_raft_replication(void *data);
private:
friend class ReplicationClosure;
@ -120,12 +116,6 @@ private:
// redirecting request to leader
void redirect(http_res* response);
struct ReplicationArg {
http_req* req;
http_res* res;
braft::Closure* done;
};
// actual application of writes onto the WAL
void on_apply(braft::Iterator& iter);

View File

@ -47,6 +47,7 @@ private:
rocksdb::DB *db;
rocksdb::Options options;
rocksdb::WriteOptions write_options;
public:
@ -69,6 +70,10 @@ public:
options.WAL_ttl_seconds = wal_ttl_secs;
options.WAL_size_limit_MB = wal_size_mb;
// Disable WAL for master writes (Raft's WAL is used)
// The replica uses native WAL, though.
write_options.disableWAL = true;
// open DB
rocksdb::Status s = rocksdb::DB::Open(options, state_dir_path, &db);
@ -91,12 +96,12 @@ public:
}
bool insert(const std::string& key, const std::string& value) {
rocksdb::Status status = db->Put(rocksdb::WriteOptions(), key, value);
rocksdb::Status status = db->Put(write_options, key, value);
return status.ok();
}
bool batch_write(rocksdb::WriteBatch& batch) {
rocksdb::Status status = db->Write(rocksdb::WriteOptions(), &batch);
rocksdb::Status status = db->Write(write_options, &batch);
return status.ok();
}
@ -122,7 +127,7 @@ public:
}
bool remove(const std::string& key) {
rocksdb::Status status = db->Delete(rocksdb::WriteOptions(), key);
rocksdb::Status status = db->Delete(write_options, key);
return status.ok();
}
@ -147,7 +152,7 @@ public:
}
void increment(const std::string & key, uint32_t value) {
db->Merge(rocksdb::WriteOptions(), key, StringUtils::serialize_uint32_t(value));
db->Merge(write_options, key, StringUtils::serialize_uint32_t(value));
}
uint64_t get_latest_seq_number() const {

View File

@ -607,3 +607,26 @@ bool get_replication_updates(http_req & req, http_res & res) {
response_thread.detach();
return true;
}
bool async_write_request(void *data) {
LOG(INFO) << "async_write_request called";
AsyncIndexArg* index_arg = static_cast<AsyncIndexArg*>(data);
if(index_arg->req->route_index == -1) {
return false;
}
route_path* found_rpath;
server->get_route(index_arg->req->route_index, &found_rpath);
// call the underlying http handler
found_rpath->handler(*index_arg->req, *index_arg->res);
if(index_arg->req->_req != nullptr) {
// we have to return a response to the client
server->send_response(index_arg->req, index_arg->res);
}
index_arg->promise->set_value(true); // returns control back to caller
return true;
}

View File

@ -207,8 +207,9 @@ std::map<std::string, std::string> HttpServer::parse_query(const std::string& qu
return query_map;
}
bool HttpServer::find_route(const std::vector<std::string> & path_parts, const std::string & http_method, route_path** found_rpath) {
for(const route_path & rpath: routes) {
int HttpServer::find_route(const std::vector<std::string> & path_parts, const std::string & http_method, route_path** found_rpath) {
for(size_t i = 0; i < routes.size(); i++) {
const route_path & rpath = routes[i];
if(rpath.path_parts.size() != path_parts.size() || rpath.http_method != http_method) {
continue;
}
@ -226,11 +227,11 @@ bool HttpServer::find_route(const std::vector<std::string> & path_parts, const s
if(found) {
*found_rpath = const_cast<route_path *>(&rpath);
return true;
return i;
}
}
return false;
return -1;
}
int HttpServer::catch_all_handler(h2o_handler_t *_self, h2o_req_t *req) {
@ -302,9 +303,9 @@ int HttpServer::catch_all_handler(h2o_handler_t *_self, h2o_req_t *req) {
}
route_path *rpath = nullptr;
bool found = self->http_server->find_route(path_parts, http_method, &rpath);
int route_index = self->http_server->find_route(path_parts, http_method, &rpath);
if(found) {
if(route_index != -1) {
bool authenticated = self->http_server->auth_handler(*rpath, auth_key_from_header);
if(!authenticated) {
return send_401_unauthorized(req);
@ -318,12 +319,11 @@ int HttpServer::catch_all_handler(h2o_handler_t *_self, h2o_req_t *req) {
}
}
http_req* request = new http_req(req, http_method, path_without_query, query_map, req_body);
http_req* request = new http_req(req, http_method, route_index, query_map, req_body);
http_res* response = new http_res();
// for writes, we defer to replication_state
if(http_method != "GET") {
LOG(INFO) << "*** server thread id: " << std::this_thread::get_id();
self->http_server->get_replication_state()->write(request, response);
return 0;
}
@ -453,3 +453,9 @@ http_message_dispatcher* HttpServer::get_message_dispatcher() const {
ReplicationState* HttpServer::get_replication_state() const {
return replication_state;
}
void HttpServer::get_route(size_t index, route_path** found_rpath) {
if(index >= 0 && index < routes.size()) {
*found_rpath = &routes[index];
}
}

View File

@ -70,7 +70,7 @@ void ReplicationState::write(http_req* request, http_res* response) {
braft::Task task;
task.data = &log;
// This callback would be invoked when the task actually executes or fails
task.done = new ReplicationClosure(message_dispatcher, request, response);
task.done = new ReplicationClosure(request, response);
// To avoid ABA problem
task.expected_term = _leader_term.load(butil::memory_order_relaxed);
@ -108,6 +108,9 @@ void ReplicationState::on_apply(braft::Iterator& iter) {
http_res* response;
http_req* request;
// Guard invokes replication_arg->done->Run() asynchronously to avoid the callback blocking the main thread
braft::AsyncClosureGuard closure_guard(iter.done());
if (iter.done()) {
// This task is applied by this node, get value from the closure to avoid additional parsing.
ReplicationClosure* c = dynamic_cast<ReplicationClosure*>(iter.done());
@ -125,20 +128,21 @@ void ReplicationState::on_apply(braft::Iterator& iter) {
// Now that the log has been parsed, perform the actual operation
// Call http server thread for write and response back to client (if `response` is NOT null)
//auto replication_arg = new ReplicationArg{request, response, iter.done()};
//message_dispatcher->send_message(REPLICATION_MSG, replication_arg);
// We use a future to block current thread until the async flow finishes
std::promise<bool> promise;
std::future<bool> future = promise.get_future();
auto replication_arg = new AsyncIndexArg{request, response, &promise};
message_dispatcher->send_message(REPLICATION_MSG, replication_arg);
bool value = future.get();
LOG(INFO) << "*** on_apply thread id: " << std::this_thread::get_id();
LOG(INFO) << request->body;
LOG(INFO) << "Done";
//iter.done()->Run();
}
}
void* ReplicationState::save_snapshot(void* arg) {
ReplicationState::SnapshotArg* sa = (ReplicationState::SnapshotArg*) arg;
std::unique_ptr<ReplicationState::SnapshotArg> arg_guard(sa);
SnapshotArg* sa = static_cast<SnapshotArg*>(arg);
std::unique_ptr<SnapshotArg> arg_guard(sa);
brpc::ClosureGuard done_guard(sa->done);
std::string snapshot_path = sa->writer->get_path() + "/rocksdb_snapshot";
@ -245,6 +249,10 @@ int ReplicationState::on_snapshot_load(braft::SnapshotReader* reader) {
}
void ReplicationState::refresh_peers(const std::string & peers) {
if(db == NULL) {
LOG(ERROR) << "DB IS NULL!";
}
if(_node && is_leader()) {
LOG(INFO) << "Refreshing peer config";
braft::Configuration conf;
@ -254,29 +262,3 @@ void ReplicationState::refresh_peers(const std::string & peers) {
}
}
bool ReplicationState::on_raft_replication(void *data) {
ReplicationArg* replication_arg = static_cast<ReplicationArg*>(data);
// Guard invokes replication_arg->done->Run() asynchronously to avoid the callback blocking the main thread
braft::AsyncClosureGuard closure_guard(replication_arg->done);
std::vector<std::string> path_parts;
StringUtils::split(replication_arg->req->path_without_query, path_parts, "/");
/*route_path* found_rpath = nullptr;
bool found = replication_arg->res->server->find_route(path_parts, replication_arg->req->http_method, &found_rpath);
if(found) {
// call handler function -- this effectively does the write
found_rpath->handler(*replication_arg->req, *replication_arg->res);
if(replication_arg->req->_req != nullptr) {
// local request, respond to request
replication_arg->res->server->send_response(replication_arg->req, replication_arg->res);
}
}
delete replication_arg;*/
return true;
}

View File

@ -5,9 +5,6 @@
#include <brpc/controller.h>
#include <brpc/server.h>
#include <braft/raft.h>
#include <braft/storage.h>
#include <braft/util.h>
#include <braft/protobuf_file.h>
#include <raft_server.h>
#include <fstream>
@ -198,7 +195,7 @@ int run_server(const Config & config, const std::string & version,
server->on(SEND_RESPONSE_MSG, on_send_response);
server->on(REPLICATION_EVENT_MSG, Replicator::on_replication_event);
server->on(ReplicationState::REPLICATION_MSG, ReplicationState::on_raft_replication);
server->on(ReplicationState::REPLICATION_MSG, async_write_request);
if(config.get_master().empty()) {
master_server_routes();
@ -225,7 +222,7 @@ int run_server(const Config & config, const std::string & version,
ReplicationState replication_state(server->get_message_dispatcher(), store._get_db_unsafe());
std::thread raft_thread([&replication_state, &store, &config]() {
std::thread raft_thread([&replication_state, &config]() {
std::string path_to_peers = config.get_raft_peers();
if(path_to_peers.empty()) {