When replication begins, ensure that it starts from sequence number 0.

This commit is contained in:
kishorenc 2019-10-18 07:50:48 +05:30
parent bfa975ef85
commit 5328509637

View File

@ -154,13 +154,12 @@ public:
return db->GetLatestSequenceNumber();
}
/*
Since: GetUpdatesSince(0) == GetUpdatesSince(1), always query for 1 sequence number greater than the number
returned by GetLatestSequenceNumber() locally.
*/
Option<std::vector<std::string>*> get_updates_since(const uint64_t seq_number, const uint64_t max_updates) const {
Option<std::vector<std::string>*> get_updates_since(const uint64_t seq_number_org, const uint64_t max_updates) const {
const uint64_t local_latest_seq_num = db->GetLatestSequenceNumber();
// Since GetUpdatesSince(0) == GetUpdatesSince(1)
const uint64_t seq_number = (seq_number_org == 0) ? 1 : seq_number_org;
if(seq_number == local_latest_seq_num+1) {
// replica has caught up, send an empty list as result
std::vector<std::string>* updates = new std::vector<std::string>();
@ -177,7 +176,7 @@ public:
return Option<std::vector<std::string>*>(400, error.str());
}
if(!iter->Valid() && !(local_latest_seq_num == 0 && seq_number == 0)) {
if(!iter->Valid()) {
std::ostringstream error;
error << "Invalid iterator. Master's latest sequence number is " << local_latest_seq_num << " but "
<< "updates are requested from sequence number " << seq_number << ". "
@ -188,9 +187,22 @@ public:
uint64_t num_updates = 0;
std::vector<std::string>* updates = new std::vector<std::string>();
bool first_iteration = true;
while(iter->Valid() && num_updates < max_updates) {
rocksdb::BatchResult batch_result = iter->GetBatch();
const std::string & write_batch_serialized = batch_result.writeBatchPtr->Data();
const rocksdb::BatchResult & batch = iter->GetBatch();
if(first_iteration) {
first_iteration = false;
if(batch.sequence != seq_number) {
std::ostringstream error;
error << "Invalid iterator. Requested sequence number is " << seq_number << " but "
<< "updates are available only from sequence number " << batch.sequence << ". "
<< "The master's WAL entries might have expired (they are kept only for 24 hours).";
return Option<std::vector<std::string>*>(400, error.str());
}
}
const std::string & write_batch_serialized = batch.writeBatchPtr->Data();
updates->push_back(write_batch_serialized);
num_updates += 1;
iter->Next();