Improve handling of replication errors / edge cases.

This commit is contained in:
Kishore Nallan 2017-12-24 21:03:30 +05:30
parent a84a0a55bc
commit 21d3de6145
6 changed files with 44 additions and 7 deletions

View File

@ -79,6 +79,9 @@
- ~~get collection should show schema~~
- ~~API key should be allowed as a GET parameter also (for JSONP)~~
- ~~Don't crash when the data directory is not found~~
- ~~When the first sequence ID is not zero, bail out~~
- ~~Proper status code when sequence number to fetch is bad~~
- Replica should be read-only
- handle hyphens (replace them)
- clean special chars before indexing
- NOT operator support

View File

@ -118,8 +118,12 @@ public:
store._get_db_unsafe()->Write(rocksdb::WriteOptions(), &write_batch);
}
} else {
std::cout << "Replication error while fetching records from master, status_code="
std::cerr << "Replication error while fetching records from master, status_code="
<< status_code << std::endl;
if(status_code != 0) {
std::cerr << json_response << std::endl;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(3000));

View File

@ -3,6 +3,7 @@
#include <stdint.h>
#include <cstdlib>
#include <string>
#include <sstream>
#include <memory>
#include <option.h>
#include <rocksdb/db.h>
@ -138,16 +139,27 @@ public:
returned by GetLatestSequenceNumber() locally.
*/
Option<std::vector<std::string>*> get_updates_since(const uint64_t seq_number, const uint64_t max_updates) const {
rocksdb::unique_ptr<rocksdb::TransactionLogIterator> iter;
const uint64_t local_latest_seq_num = db->GetLatestSequenceNumber();
if(seq_number == db->GetLatestSequenceNumber()+1) {
if(seq_number == local_latest_seq_num+1) {
// replica has caught up, send an empty list as result
std::vector<std::string>* updates = new std::vector<std::string>();
return Option<std::vector<std::string>*>(updates);
}
rocksdb::unique_ptr<rocksdb::TransactionLogIterator> iter;
rocksdb::Status status = db->GetUpdatesSince(seq_number, &iter);
if(!status.ok()) {
return Option<std::vector<std::string>*>(204, "Invalid sequence number.");
std::ostringstream error;
error << "Unable to fetch updates. " << "Master's latest sequence number is " << local_latest_seq_num;
return Option<std::vector<std::string>*>(400, error.str());
}
if(!iter->Valid() && !(local_latest_seq_num == 0 && seq_number == 0)) {
std::ostringstream error;
error << "Invalid iterator. " << "Master's latest sequence number is " << local_latest_seq_num;
return Option<std::vector<std::string>*>(400, error.str());
}
uint64_t num_updates = 0;

View File

@ -407,6 +407,9 @@ void get_replication_updates(http_req & req, http_res & res) {
json_response["updates"].push_back(StringUtils::base64_encode(update));
}
uint64_t latest_seq_num = store->get_latest_seq_number();
json_response["latest_seq_num"] = latest_seq_num;
res.send_200(json_response.dump());
res.server->send_message(SEND_RESPONSE_MSG, new request_response{&req, &res});
delete updates;

View File

@ -33,8 +33,8 @@ int main(int argc, char **argv) {
options.add<std::string>("listen-address", 'h', "Address to which Typesense server binds.", false, "0.0.0.0");
options.add<uint32_t>("listen-port", 'p', "Port on which Typesense server listens.", false, 8108);
options.add<std::string>("master", 'm', "Master host in http(s)://<master_address>:<master_port> format "
"to start the server as a read-only replica.", false, "");
options.add<std::string>("master", 'm', "Provide the master's address in http(s)://<master_address>:<master_port> "
"format to start the server as a read-only replica.", false, "");
options.add<std::string>("ssl-certificate", 'c', "Path to the SSL certificate file.", false, "");
options.add<std::string>("ssl-certificate-key", 'e', "Path to the SSL certificate key file.", false, "");

View File

@ -17,12 +17,19 @@ TEST(StoreTest, GetUpdatesSince) {
ASSERT_TRUE(updates_op.ok());
ASSERT_EQ(0, updates_op.get()->size());
ASSERT_EQ(0, primary_store.get_latest_seq_number());
delete updates_op.get();
// querying for a seq_num > 0 on a fresh store
updates_op = primary_store.get_updates_since(10, 10);
ASSERT_FALSE(updates_op.ok());
ASSERT_EQ("Unable to fetch updates. Master's latest sequence number is 0", updates_op.error());
primary_store.insert("foo1", "bar1");
ASSERT_EQ(1, primary_store.get_latest_seq_number());
updates_op = primary_store.get_updates_since(1, 10);
ASSERT_TRUE(updates_op.ok());
ASSERT_EQ(1, updates_op.get()->size());
delete updates_op.get();
primary_store.insert("foo2", "bar2");
primary_store.insert("foo3", "bar3");
@ -57,20 +64,28 @@ TEST(StoreTest, GetUpdatesSince) {
// Ensure that updates are limited to max_updates argument
updates_op = primary_store.get_updates_since(0, 10);
ASSERT_EQ(3, updates_op.get()->size());
delete updates_op.get();
// sequence numbers 0 and 1 are the same
updates_op = primary_store.get_updates_since(0, 10);
ASSERT_EQ(3, updates_op.get()->size());
delete updates_op.get();
updates_op = primary_store.get_updates_since(1, 10);
ASSERT_EQ(3, updates_op.get()->size());
delete updates_op.get();
updates_op = primary_store.get_updates_since(3, 100);
ASSERT_TRUE(updates_op.ok());
ASSERT_EQ(1, updates_op.get()->size());
delete updates_op.get();
updates_op = primary_store.get_updates_since(4, 100);
ASSERT_TRUE(updates_op.ok());
ASSERT_EQ(0, updates_op.get()->size());
delete updates_op.get();
updates_op = primary_store.get_updates_since(50, 100);
ASSERT_FALSE(updates_op.ok());
ASSERT_EQ("Unable to fetch updates. Master's latest sequence number is 3", updates_op.error());
}