Parameterize API key used by replication.

This commit is contained in:
Kishore Nallan 2017-12-16 22:12:03 +05:30
parent bae67169d6
commit 3a9743aa90
5 changed files with 46 additions and 11 deletions

View File

@ -10,8 +10,10 @@ class HttpClient {
private:
std::string buffer;
std::string url;
std::string api_key;
public:
HttpClient(std::string url): url(url) {
HttpClient(std::string url, std::string api_key): url(url), api_key(api_key) {
}
@ -28,7 +30,8 @@ public:
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer);
struct curl_slist *chunk = NULL;
chunk = curl_slist_append(chunk, "x-typesense-api-key: abcd");
std::string api_key_header = std::string("x-typesense-api-key: ") + api_key;
chunk = curl_slist_append(chunk, api_key_header.c_str());
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
curl_easy_perform(curl);

View File

@ -90,13 +90,15 @@ public:
class Replicator {
public:
static void start(HttpServer* server, const std::string& master_host_port, Store& store) {
static void start(HttpServer* server, const std::string master_host_port, const std::string api_key, Store& store) {
while(true) {
IterateBatchHandler handler(server);
uint64_t latest_seq_num = store.get_latest_seq_number();
std::cout << "latest_seq_num: " << latest_seq_num << std::endl;
HttpClient client(master_host_port+"/replication/updates?seq_number="+std::to_string(latest_seq_num+1));
HttpClient client(
master_host_port+"/replication/updates?seq_number="+std::to_string(latest_seq_num+1), api_key
);
std::string json_response;
long status_code = client.get_reponse(json_response);

View File

@ -134,12 +134,17 @@ public:
}
/*
Since `GetLatestSequenceNumber` returns 0 when the DB is empty and when there is 1 record:
get_updates_since(0) == get_updates_since(1) - so always query for 1 sequence number greater than the number
Since: GetUpdatesSince(0) == GetUpdatesSince(1), always query for 1 sequence number greater than the number
returned by GetLatestSequenceNumber() locally.
*/
Option<std::vector<std::string>*> get_updates_since(const uint64_t seq_number, const uint64_t max_updates) const {
rocksdb::unique_ptr<rocksdb::TransactionLogIterator> iter;
if(seq_number == db->GetLatestSequenceNumber()+1) {
std::vector<std::string>* updates = new std::vector<std::string>();
return Option<std::vector<std::string>*>(updates);
}
rocksdb::Status status = db->GetUpdatesSince(seq_number, &iter);
if(!status.ok()) {
return Option<std::vector<std::string>*>(204, "Invalid sequence number.");

View File

@ -86,8 +86,8 @@ int main(int argc, char **argv) {
}
std::cout << "Typesense server started as a read-only replica... Spawning replication thread..." << std::endl;
std::thread replication_thread([&master_host_port, &store]() {
Replicator::start(::server, master_host_port, store);
std::thread replication_thread([&master_host_port, &store, &options]() {
Replicator::start(::server, master_host_port, options.get<std::string>("api-key"), store);
});
replication_thread.detach();

View File

@ -9,12 +9,26 @@ TEST(StoreTest, GetUpdatesSince) {
system(("rm -rf "+primary_store_path+" && mkdir -p "+primary_store_path).c_str());
// add some records, get the updates and restore them in a new store
Store primary_store(primary_store_path);
// on a fresh store, sequence number is 0
Option<std::vector<std::string>*> updates_op = primary_store.get_updates_since(0, 10);
ASSERT_TRUE(updates_op.ok());
ASSERT_EQ(0, updates_op.get()->size());
ASSERT_EQ(0, primary_store.get_latest_seq_number());
primary_store.insert("foo1", "bar1");
ASSERT_EQ(1, primary_store.get_latest_seq_number());
updates_op = primary_store.get_updates_since(1, 10);
ASSERT_TRUE(updates_op.ok());
ASSERT_EQ(1, updates_op.get()->size());
primary_store.insert("foo2", "bar2");
primary_store.insert("foo3", "bar3");
ASSERT_EQ(3, primary_store.get_latest_seq_number());
Option<std::vector<std::string>*> updates_op = primary_store.get_updates_since(0, 10);
updates_op = primary_store.get_updates_since(0, 10);
ASSERT_EQ(3, updates_op.get()->size());
std::string replica_store_path = "/tmp/typesense_test/replica_store_test";
@ -41,7 +55,18 @@ TEST(StoreTest, GetUpdatesSince) {
delete updates_op.get();
// Ensure that updates are limited to max_updates argument
updates_op = primary_store.get_updates_since(0, 2);
ASSERT_EQ(2, updates_op.get()->size());
updates_op = primary_store.get_updates_since(0, 10);
ASSERT_EQ(3, updates_op.get()->size());
// sequence numbers 0 and 1 are the same
updates_op = primary_store.get_updates_since(0, 10);
ASSERT_EQ(3, updates_op.get()->size());
updates_op = primary_store.get_updates_since(1, 10);
ASSERT_EQ(3, updates_op.get()->size());
updates_op = primary_store.get_updates_since(3, 100);
ASSERT_TRUE(updates_op.ok());
ASSERT_EQ(1, updates_op.get()->size());
delete updates_op.get();
}