diff --git a/TODO.md b/TODO.md index 74aecf53..91a28f60 100644 --- a/TODO.md +++ b/TODO.md @@ -79,6 +79,9 @@ - ~~get collection should show schema~~ - ~~API key should be allowed as a GET parameter also (for JSONP)~~ - ~~Don't crash when the data directory is not found~~ +- ~~When the first sequence ID is not zero, bail out~~ +- ~~Proper status code when sequence number to fetch is bad~~ +- Replica should be read-only - handle hyphens (replace them) - clean special chars before indexing - NOT operator support diff --git a/include/replicator.h b/include/replicator.h index 24aaf48f..5430ce6c 100644 --- a/include/replicator.h +++ b/include/replicator.h @@ -118,8 +118,12 @@ public: store._get_db_unsafe()->Write(rocksdb::WriteOptions(), &write_batch); } } else { - std::cout << "Replication error while fetching records from master, status_code=" + std::cerr << "Replication error while fetching records from master, status_code=" << status_code << std::endl; + + if(status_code != 0) { + std::cerr << json_response << std::endl; + } } std::this_thread::sleep_for(std::chrono::milliseconds(3000)); diff --git a/include/store.h b/include/store.h index 31781cb2..de410e30 100644 --- a/include/store.h +++ b/include/store.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -138,16 +139,27 @@ public: returned by GetLatestSequenceNumber() locally. */ Option*> get_updates_since(const uint64_t seq_number, const uint64_t max_updates) const { - rocksdb::unique_ptr iter; + const uint64_t local_latest_seq_num = db->GetLatestSequenceNumber(); - if(seq_number == db->GetLatestSequenceNumber()+1) { + if(seq_number == local_latest_seq_num+1) { + // replica has caught up, send an empty list as result std::vector* updates = new std::vector(); return Option*>(updates); } + rocksdb::unique_ptr iter; rocksdb::Status status = db->GetUpdatesSince(seq_number, &iter); + if(!status.ok()) { - return Option*>(204, "Invalid sequence number."); + std::ostringstream error; + error << "Unable to fetch updates. " << "Master's latest sequence number is " << local_latest_seq_num; + return Option*>(400, error.str()); + } + + if(!iter->Valid() && !(local_latest_seq_num == 0 && seq_number == 0)) { + std::ostringstream error; + error << "Invalid iterator. " << "Master's latest sequence number is " << local_latest_seq_num; + return Option*>(400, error.str()); } uint64_t num_updates = 0; diff --git a/src/api.cpp b/src/api.cpp index 2f667b35..1feed9c9 100644 --- a/src/api.cpp +++ b/src/api.cpp @@ -407,6 +407,9 @@ void get_replication_updates(http_req & req, http_res & res) { json_response["updates"].push_back(StringUtils::base64_encode(update)); } + uint64_t latest_seq_num = store->get_latest_seq_number(); + json_response["latest_seq_num"] = latest_seq_num; + res.send_200(json_response.dump()); res.server->send_message(SEND_RESPONSE_MSG, new request_response{&req, &res}); delete updates; diff --git a/src/main/typesense_server.cpp b/src/main/typesense_server.cpp index 6f678339..f43cbab2 100644 --- a/src/main/typesense_server.cpp +++ b/src/main/typesense_server.cpp @@ -33,8 +33,8 @@ int main(int argc, char **argv) { options.add("listen-address", 'h', "Address to which Typesense server binds.", false, "0.0.0.0"); options.add("listen-port", 'p', "Port on which Typesense server listens.", false, 8108); - options.add("master", 'm', "Master host in http(s)://: format " - "to start the server as a read-only replica.", false, ""); + options.add("master", 'm', "Provide the master's address in http(s)://: " + "format to start the server as a read-only replica.", false, ""); options.add("ssl-certificate", 'c', "Path to the SSL certificate file.", false, ""); options.add("ssl-certificate-key", 'e', "Path to the SSL certificate key file.", false, ""); diff --git a/test/store_test.cpp b/test/store_test.cpp index b25518fd..563a9050 100644 --- a/test/store_test.cpp +++ b/test/store_test.cpp @@ -17,12 +17,19 @@ TEST(StoreTest, GetUpdatesSince) { ASSERT_TRUE(updates_op.ok()); ASSERT_EQ(0, updates_op.get()->size()); ASSERT_EQ(0, primary_store.get_latest_seq_number()); + delete updates_op.get(); + + // querying for a seq_num > 0 on a fresh store + updates_op = primary_store.get_updates_since(10, 10); + ASSERT_FALSE(updates_op.ok()); + ASSERT_EQ("Unable to fetch updates. Master's latest sequence number is 0", updates_op.error()); primary_store.insert("foo1", "bar1"); ASSERT_EQ(1, primary_store.get_latest_seq_number()); updates_op = primary_store.get_updates_since(1, 10); ASSERT_TRUE(updates_op.ok()); ASSERT_EQ(1, updates_op.get()->size()); + delete updates_op.get(); primary_store.insert("foo2", "bar2"); primary_store.insert("foo3", "bar3"); @@ -57,20 +64,28 @@ TEST(StoreTest, GetUpdatesSince) { // Ensure that updates are limited to max_updates argument updates_op = primary_store.get_updates_since(0, 10); ASSERT_EQ(3, updates_op.get()->size()); + delete updates_op.get(); // sequence numbers 0 and 1 are the same updates_op = primary_store.get_updates_since(0, 10); ASSERT_EQ(3, updates_op.get()->size()); + delete updates_op.get(); + updates_op = primary_store.get_updates_since(1, 10); ASSERT_EQ(3, updates_op.get()->size()); + delete updates_op.get(); updates_op = primary_store.get_updates_since(3, 100); ASSERT_TRUE(updates_op.ok()); ASSERT_EQ(1, updates_op.get()->size()); + delete updates_op.get(); updates_op = primary_store.get_updates_since(4, 100); ASSERT_TRUE(updates_op.ok()); ASSERT_EQ(0, updates_op.get()->size()); - delete updates_op.get(); + + updates_op = primary_store.get_updates_since(50, 100); + ASSERT_FALSE(updates_op.ok()); + ASSERT_EQ("Unable to fetch updates. Master's latest sequence number is 3", updates_op.error()); } \ No newline at end of file