Follower should forward write requests to leader.

This commit is contained in:
kishorenc 2020-03-23 06:53:05 +05:30
parent 3d8350aaaa
commit b16614eb20
8 changed files with 261 additions and 86 deletions

View File

@ -1,3 +1,5 @@
#pragma once
bool directory_exists(const std::string & dir_path);
bool file_exists(const std::string & file_path);

View File

@ -2,76 +2,37 @@
#include <string>
#include <curl/curl.h>
#include "file_utils.h"
/*
NOTE: This is a really primitive blocking client meant only for specific Typesense use cases.
*/
class HttpClient {
private:
std::string buffer;
std::string url;
std::string api_key;
static std::string api_key;
static std::string ca_cert_path;
std::string ca_cert_path;
HttpClient() = default;
inline bool file_exists (const std::string & name) {
struct stat buffer;
return (stat (name.c_str(), &buffer) == 0);
}
~HttpClient() = default;
static size_t curl_write (void *contents, size_t size, size_t nmemb, std::string *s);
static CURL* init_curl(const std::string & url, std::string & buffer);
static long perform_curl(CURL *curl);
public:
HttpClient(std::string url, std::string api_key): url(url), api_key(api_key) {
// try to locate ca cert file (from: https://serverfault.com/a/722646/117601)
std::vector<std::string> locations = {
"/etc/ssl/certs/ca-certificates.crt", // Debian/Ubuntu/Gentoo etc.
"/etc/pki/tls/certs/ca-bundle.crt", // Fedora/RHEL 6
"/etc/ssl/ca-bundle.pem", // OpenSUSE
"/etc/pki/tls/cacert.pem", // OpenELEC
"/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", // CentOS/RHEL 7
"/usr/local/etc/openssl/cert.pem", // OSX
};
ca_cert_path = "";
for(const std::string & location: locations) {
if(file_exists(location)) {
ca_cert_path = location;
break;
}
}
static HttpClient & get_instance() {
static HttpClient instance;
return instance;
}
static size_t curl_write (void *contents, size_t size, size_t nmemb, std::string *s) {
s->append((char*)contents, size*nmemb);
return size*nmemb;
}
HttpClient(HttpClient const&) = delete;
void operator=(HttpClient const&) = delete;
long get_reponse(std::string & response) {
CURL *curl = curl_easy_init();
void init(const std::string & api_key);
if(!ca_cert_path.empty()) {
curl_easy_setopt(curl, CURLOPT_CAINFO, ca_cert_path.c_str());
} else {
LOG(ERROR) << "Unable to locate system SSL certificates.";
return 0;
}
static long get_response(const std::string & url, std::string & response);
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L); // to allow self-signed certs
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, HttpClient::curl_write);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer);
struct curl_slist *chunk = NULL;
std::string api_key_header = std::string("x-typesense-api-key: ") + api_key;
chunk = curl_slist_append(chunk, api_key_header.c_str());
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
curl_easy_perform(curl);
long http_code = 0;
curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &http_code);
curl_easy_cleanup(curl);
response = buffer;
return http_code;
}
};
static long post_response(const std::string & url, const std::string & body, std::string & response);
};

View File

@ -10,6 +10,7 @@
#include <future>
#include "http_data.h"
#include "threadpool.h"
class Store;
class ReplicationState;
@ -81,6 +82,7 @@ private:
butil::atomic<int64_t> leader_term;
Store *store;
ThreadPool* thread_pool;
http_message_dispatcher* message_dispatcher;
butil::atomic<bool> has_initialized;
@ -93,7 +95,7 @@ public:
static constexpr const char* meta_dir_name = "meta";
static constexpr const char* snapshot_dir_name = "snapshot";
ReplicationState(Store* store, http_message_dispatcher* message_dispatcher,
ReplicationState(Store* store, ThreadPool* thread_pool, http_message_dispatcher* message_dispatcher,
std::promise<bool>* ready, bool create_init_db_snapshot);
~ReplicationState() {
@ -101,7 +103,7 @@ public:
}
// Starts this node
int start(int port, int election_timeout_ms, int snapshot_interval_s,
int start(int api_port, int raft_port, int election_timeout_ms, int snapshot_interval_s,
const std::string & raft_dir, const std::string & peers);
// Generic write method for synchronizing all writes

110
include/threadpool.h Normal file
View File

@ -0,0 +1,110 @@
/*
Copyright (c) 2012 Jakob Progsch, Václav Zeman
This software is provided 'as-is', without any express or implied
warranty. In no event will the authors be held liable for any damages
arising from the use of this software.
Permission is granted to anyone to use this software for any purpose,
including commercial applications, and to alter it and redistribute it
freely, subject to the following restrictions:
1. The origin of this software must not be misrepresented; you must not
claim that you wrote the original software. If you use this software
in a product, an acknowledgment in the product documentation would be
appreciated but is not required.
2. Altered source versions must be plainly marked as such, and must not be
misrepresented as being the original software.
3. This notice may not be removed or altered from any source
distribution.
*/
#pragma once
// containers
#include <vector>
#include <queue>
// threading
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <future>
// utility wrappers
#include <memory>
#include <functional>
// exceptions
#include <stdexcept>
// std::thread pool for resources recycling
class ThreadPool {
public:
// the constructor just launches some amount of workers
ThreadPool(size_t threads_n = std::thread::hardware_concurrency()) : stop(false)
{
if(!threads_n)
throw std::invalid_argument("more than zero threads expected");
this->workers.reserve(threads_n);
for(; threads_n; --threads_n)
this->workers.emplace_back(
[this]
{
while(true)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
// deleted copy&move ctors&assignments
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;
// add new work item to the pool
template<class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> enqueue(F&& f, Args&&... args)
{
using packaged_task_t = std::packaged_task<typename std::result_of<F(Args...)>::type ()>;
std::shared_ptr<packaged_task_t> task(new packaged_task_t(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
));
auto res = task->get_future();
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->tasks.emplace([task](){ (*task)(); });
}
this->condition.notify_one();
return res;
}
// the destructor joins all threads
virtual ~ThreadPool()
{
this->stop = true;
this->condition.notify_all();
for(std::thread& worker : this->workers)
worker.join();
}
private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function<void()> > tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
// workers finalization flag
std::atomic_bool stop;
};

81
src/http_client.cpp Normal file
View File

@ -0,0 +1,81 @@
#include "http_client.h"
#include "file_utils.h"
#include "logger.h"
#include <vector>
std::string HttpClient::api_key = "";
std::string HttpClient::ca_cert_path = "";
long HttpClient::post_response(const std::string &url, const std::string &body, std::string &response) {
CURL *curl = init_curl(url, response);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str());
return perform_curl(curl);
}
long HttpClient::get_response(const std::string &url, std::string &response) {
CURL *curl = init_curl(url, response);
if(curl == nullptr) {
return 0;
}
return perform_curl(curl);
}
void HttpClient::init(const std::string &api_key) {
HttpClient::api_key = api_key;
// try to locate ca cert file (from: https://serverfault.com/a/722646/117601)
std::vector<std::string> locations = {
"/etc/ssl/certs/ca-certificates.crt", // Debian/Ubuntu/Gentoo etc.
"/etc/pki/tls/certs/ca-bundle.crt", // Fedora/RHEL 6
"/etc/ssl/ca-bundle.pem", // OpenSUSE
"/etc/pki/tls/cacert.pem", // OpenELEC
"/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", // CentOS/RHEL 7
"/usr/local/etc/openssl/cert.pem", // OSX
};
HttpClient::ca_cert_path = "";
for(const std::string & location: locations) {
if(file_exists(location)) {
HttpClient::ca_cert_path = location;
break;
}
}
}
long HttpClient::perform_curl(CURL *curl) {
struct curl_slist *chunk = nullptr;
std::string api_key_header = std::string("x-typesense-api-key: ") + HttpClient::api_key;
chunk = curl_slist_append(chunk, api_key_header.c_str());
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
curl_easy_perform(curl);
long http_code = 0;
curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &http_code);
curl_easy_cleanup(curl);
return http_code;
}
CURL *HttpClient::init_curl(const std::string &url, std::string &buffer) {
CURL *curl = curl_easy_init();
if(!ca_cert_path.empty()) {
curl_easy_setopt(curl, CURLOPT_CAINFO, ca_cert_path.c_str());
} else {
LOG(ERROR) << "Unable to locate system SSL certificates.";
return nullptr;
}
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L); // to allow self-signed certs
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, HttpClient::curl_write);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer);
return curl;
}
size_t HttpClient::curl_write(void *contents, size_t size, size_t nmemb, std::string *s) {
s->append((char*)contents, size*nmemb);
return size*nmemb;
}

View File

@ -5,6 +5,7 @@
#include <string_utils.h>
#include <file_utils.h>
#include <collection_manager.h>
#include <http_client.h>
#include "rocksdb/utilities/checkpoint.h"
@ -16,10 +17,10 @@ void ReplicationClosure::Run() {
// State machine implementation
int ReplicationState::start(int port, int election_timeout_ms, int snapshot_interval_s,
int ReplicationState::start(const int api_port, int raft_port, int election_timeout_ms, int snapshot_interval_s,
const std::string & raft_dir, const std::string & peers) {
butil::EndPoint addr(butil::my_ip(), port);
butil::EndPoint addr(butil::my_ip(), raft_port);
braft::NodeOptions node_options;
std::string actual_peers = peers;
@ -27,7 +28,7 @@ int ReplicationState::start(int port, int election_timeout_ms, int snapshot_inte
if(actual_peers.empty()) {
char str[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &(addr.ip.s_addr), str, INET_ADDRSTRLEN);
actual_peers = std::string(str) + ":" + std::to_string(port) + ":0";
actual_peers = std::string(str) + ":" + std::to_string(raft_port) + ":" + std::to_string(api_port);
}
if(node_options.initial_conf.parse_from(actual_peers) != 0) {
@ -45,7 +46,8 @@ int ReplicationState::start(int port, int election_timeout_ms, int snapshot_inte
node_options.snapshot_uri = prefix + "/" + snapshot_dir_name;
node_options.disable_cli = true;
braft::Node* node = new braft::Node("default_group", braft::PeerId(addr));
// api_port is used as the node identifier
braft::Node* node = new braft::Node("default_group", braft::PeerId(addr, api_port));
std::string snapshot_dir = raft_dir + "/" + snapshot_dir_name;
bool snapshot_exists = dir_enum_count(snapshot_dir) > 0;
@ -89,12 +91,27 @@ int ReplicationState::start(int port, int election_timeout_ms, int snapshot_inte
void ReplicationState::write(http_req* request, http_res* response) {
if (!is_leader()) {
LOG(INFO) << "Rejecting write sent to follower.";
LOG(INFO) << "Redirecting write to leader.";
thread_pool->enqueue([](http_req* request, http_res* response,
http_message_dispatcher* message_dispatcher, braft::Node* node) {
auto raw_req = request->_req;
std::string scheme = std::string(raw_req->scheme->name.base, raw_req->scheme->name.len);
std::vector<std::string> addr_parts;
StringUtils::split(node->leader_id().to_string(), addr_parts, ":");
std::string leader_host_port = addr_parts[0] + ":" + addr_parts[2];
const std::string & path = std::string(raw_req->path.base, raw_req->path.len);
std::string url = scheme + "://" + leader_host_port + path;
std::string api_res;
long status = HttpClient::post_response(url, request->body, api_res);
response->send(status, api_res);
auto replication_arg = new AsyncIndexArg{request, response, nullptr};
replication_arg->req->route_index = static_cast<int>(ROUTE_CODES::RETURN_EARLY);
message_dispatcher->send_message(REPLICATION_MSG, replication_arg);
}, request, response, message_dispatcher, node);
response->send_405("Not the leader.");
auto replication_arg = new AsyncIndexArg{request, response, nullptr};
replication_arg->req->route_index = static_cast<int>(ROUTE_CODES::RETURN_EARLY);
message_dispatcher->send_message(REPLICATION_MSG, replication_arg);
return ;
}
@ -287,10 +304,11 @@ void ReplicationState::refresh_peers(const std::string & peers) {
}
}
ReplicationState::ReplicationState(Store *store, http_message_dispatcher *message_dispatcher,
ReplicationState::ReplicationState(Store *store, ThreadPool* thread_pool, http_message_dispatcher *message_dispatcher,
std::promise<bool> *ready, bool create_init_db_snapshot):
node(nullptr), leader_term(-1), store(store), message_dispatcher(message_dispatcher),
has_initialized(false), ready(ready), create_init_db_snapshot(create_init_db_snapshot) {
node(nullptr), leader_term(-1), store(store), thread_pool(thread_pool),
message_dispatcher(message_dispatcher),has_initialized(false), ready(ready),
create_init_db_snapshot(create_init_db_snapshot) {
}

View File

@ -91,10 +91,9 @@ void Replicator::start(http_message_dispatcher* message_dispatcher, const std::s
}
std::string url = master_host_port+"/replication/updates?seq_number="+std::to_string(latest_seq_num+1);
HttpClient client(url, api_key);
std::string json_response;
long status_code = client.get_reponse(json_response);
long status_code = HttpClient::get_response(url, json_response);
if(status_code == 200) {
nlohmann::json json_content = nlohmann::json::parse(json_response);

View File

@ -8,10 +8,12 @@
#include <raft_server.h>
#include <fstream>
#include <execinfo.h>
#include <http_client.h>
#include "core_api.h"
#include "typesense_server_utils.h"
#include "file_utils.h"
#include "threadpool.h"
HttpServer* server;
std::atomic<bool> quit_raft_service;
@ -169,9 +171,9 @@ bool on_send_response(void *data) {
}
int start_raft_server(ReplicationState& replication_state, const std::string& state_dir,
const std::string& path_to_peers, uint32_t raft_port) {
const std::string& path_to_peers, uint32_t api_port, uint32_t raft_port) {
std::string peer_ips_string;
std::string peer_ips;
if(path_to_peers.empty()) {
LOG(INFO) << "Since no --peers argument is provided, starting a single node Typesense cluster.";
@ -182,12 +184,12 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st
LOG(ERROR) << peers_op.error();
exit(-1);
} else {
peer_ips_string = peers_op.get();
if(peer_ips_string.empty()) {
peer_ips = peers_op.get();
if(peer_ips.empty()) {
LOG(ERROR) << "File containing raft peers is empty.";
exit(-1);
} else {
peer_ips_string = peers_op.get();
peer_ips = peers_op.get();
}
}
}
@ -205,11 +207,7 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st
exit(-1);
}
std::vector<std::string> peer_ips;
StringUtils::split(peer_ips_string, peer_ips, ",");
std::string peers = StringUtils::join(peer_ips, ":0,");
if (replication_state.start(raft_port, 1000, 600, state_dir, peers) != 0) {
if (replication_state.start(api_port, raft_port, 1000, 600, state_dir, peer_ips) != 0) {
LOG(ERROR) << "Failed to start raft state";
exit(-1);
}
@ -288,6 +286,8 @@ int run_server(const Config & config, const std::string & version,
config.get_api_key(), config.get_search_only_api_key());
curl_global_init(CURL_GLOBAL_SSL);
HttpClient & httpClient = HttpClient::get_instance();
httpClient.init(config.get_api_key());
server = new HttpServer(
version,
@ -308,11 +308,13 @@ int run_server(const Config & config, const std::string & version,
std::promise<bool> ready_promise;
std::future<bool> ready_future = ready_promise.get_future();
ReplicationState replication_state(&store, server->get_message_dispatcher(), &ready_promise, create_init_db_snapshot);
ThreadPool thread_pool(4);
ReplicationState replication_state(&store, &thread_pool, server->get_message_dispatcher(), &ready_promise,
create_init_db_snapshot);
std::thread raft_thread([&replication_state, &config, &state_dir]() {
std::string path_to_peers = config.get_raft_peers();
start_raft_server(replication_state, state_dir, path_to_peers, config.get_raft_port());
start_raft_server(replication_state, state_dir, path_to_peers, config.get_listen_port(), config.get_raft_port());
});
// wait for raft service to be ready before starting http