From 53285096378f32479b43f99f8cb2d263b1425a67 Mon Sep 17 00:00:00 2001 From: kishorenc Date: Fri, 18 Oct 2019 07:50:48 +0530 Subject: [PATCH] When replication begins, ensure that it starts from sequence number 0. --- include/store.h | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/include/store.h b/include/store.h index b56a2c8e..55d81a10 100644 --- a/include/store.h +++ b/include/store.h @@ -154,13 +154,12 @@ public: return db->GetLatestSequenceNumber(); } - /* - Since: GetUpdatesSince(0) == GetUpdatesSince(1), always query for 1 sequence number greater than the number - returned by GetLatestSequenceNumber() locally. - */ - Option*> get_updates_since(const uint64_t seq_number, const uint64_t max_updates) const { + Option*> get_updates_since(const uint64_t seq_number_org, const uint64_t max_updates) const { const uint64_t local_latest_seq_num = db->GetLatestSequenceNumber(); + // Since GetUpdatesSince(0) == GetUpdatesSince(1) + const uint64_t seq_number = (seq_number_org == 0) ? 1 : seq_number_org; + if(seq_number == local_latest_seq_num+1) { // replica has caught up, send an empty list as result std::vector* updates = new std::vector(); @@ -177,7 +176,7 @@ public: return Option*>(400, error.str()); } - if(!iter->Valid() && !(local_latest_seq_num == 0 && seq_number == 0)) { + if(!iter->Valid()) { std::ostringstream error; error << "Invalid iterator. Master's latest sequence number is " << local_latest_seq_num << " but " << "updates are requested from sequence number " << seq_number << ". " @@ -188,9 +187,22 @@ public: uint64_t num_updates = 0; std::vector* updates = new std::vector(); + bool first_iteration = true; + while(iter->Valid() && num_updates < max_updates) { - rocksdb::BatchResult batch_result = iter->GetBatch(); - const std::string & write_batch_serialized = batch_result.writeBatchPtr->Data(); + const rocksdb::BatchResult & batch = iter->GetBatch(); + if(first_iteration) { + first_iteration = false; + if(batch.sequence != seq_number) { + std::ostringstream error; + error << "Invalid iterator. Requested sequence number is " << seq_number << " but " + << "updates are available only from sequence number " << batch.sequence << ". " + << "The master's WAL entries might have expired (they are kept only for 24 hours)."; + return Option*>(400, error.str()); + } + } + + const std::string & write_batch_serialized = batch.writeBatchPtr->Data(); updates->push_back(write_batch_serialized); num_updates += 1; iter->Next();