mirror of
https://github.com/typesense/typesense.git
synced 2025-05-19 21:22:25 +08:00
Improve error message logged when a replica is pointed to an old master.
In such a case, the WAL entries would have expired - logging that explicitly now.
This commit is contained in:
parent
0a9e5e1562
commit
b451a559e0
2
TODO.md
2
TODO.md
@ -92,7 +92,7 @@
|
||||
- ~~Handle store-get() not finding a key~~
|
||||
- ~~Deprecate converting integer to string verbatim~~
|
||||
- ~~Deprecate union type punning~~
|
||||
- Replica server should fail when pointed to "old" master
|
||||
- ~~Replica server should fail when pointed to "old" master~~
|
||||
- Parameterize replica's MAX_UPDATES_TO_SEND
|
||||
- gzip compress responses
|
||||
- NOT operator support
|
||||
|
@ -52,7 +52,9 @@ public:
|
||||
|
||||
Store() = delete;
|
||||
|
||||
Store(const std::string & state_dir_path): state_dir_path(state_dir_path) {
|
||||
Store(const std::string & state_dir_path,
|
||||
const size_t wal_ttl_secs = 24*60*60,
|
||||
const size_t wal_size_mb = 1024): state_dir_path(state_dir_path) {
|
||||
// Optimize RocksDB
|
||||
options.IncreaseParallelism();
|
||||
options.OptimizeLevelStyleCompaction();
|
||||
@ -63,8 +65,8 @@ public:
|
||||
options.merge_operator.reset(new UInt64AddOperator);
|
||||
|
||||
// these need to be high for replication scenarios
|
||||
options.WAL_ttl_seconds = 24*60*60;
|
||||
options.WAL_size_limit_MB = 1024;
|
||||
options.WAL_ttl_seconds = wal_ttl_secs;
|
||||
options.WAL_size_limit_MB = wal_size_mb;
|
||||
|
||||
// open DB
|
||||
rocksdb::Status s = rocksdb::DB::Open(options, state_dir_path, &db);
|
||||
@ -168,7 +170,9 @@ public:
|
||||
|
||||
if(!iter->Valid() && !(local_latest_seq_num == 0 && seq_number == 0)) {
|
||||
std::ostringstream error;
|
||||
error << "Invalid iterator. " << "Master's latest sequence number is " << local_latest_seq_num;
|
||||
error << "Invalid iterator. Master's latest sequence number is " << local_latest_seq_num << " but "
|
||||
<< "updates are requested from sequence number " << seq_number << ". "
|
||||
<< "The master's WAL entries might have expired (they are kept only for 24 hours).";
|
||||
return Option<std::vector<std::string>*>(400, error.str());
|
||||
}
|
||||
|
||||
@ -191,6 +195,11 @@ public:
|
||||
db = nullptr;
|
||||
}
|
||||
|
||||
void flush() {
|
||||
rocksdb::FlushOptions options;
|
||||
db->Flush(options);
|
||||
}
|
||||
|
||||
// Only for internal tests
|
||||
rocksdb::DB* _get_db_unsafe() const {
|
||||
return db;
|
||||
|
@ -19,11 +19,19 @@ TEST(StoreTest, GetUpdatesSince) {
|
||||
ASSERT_EQ(0, primary_store.get_latest_seq_number());
|
||||
delete updates_op.get();
|
||||
|
||||
// get_updates_since(1) == get_updates_since(0)
|
||||
updates_op = primary_store.get_updates_since(1, 10);
|
||||
ASSERT_TRUE(updates_op.ok());
|
||||
ASSERT_EQ(0, updates_op.get()->size());
|
||||
ASSERT_EQ(0, primary_store.get_latest_seq_number());
|
||||
delete updates_op.get();
|
||||
|
||||
// querying for a seq_num > 0 on a fresh store
|
||||
updates_op = primary_store.get_updates_since(10, 10);
|
||||
ASSERT_FALSE(updates_op.ok());
|
||||
ASSERT_EQ("Unable to fetch updates. Master's latest sequence number is 0", updates_op.error());
|
||||
|
||||
// get_updates_since(1) == get_updates_since(0) even after inserting a record
|
||||
primary_store.insert("foo1", "bar1");
|
||||
ASSERT_EQ(1, primary_store.get_latest_seq_number());
|
||||
updates_op = primary_store.get_updates_since(1, 10);
|
||||
@ -31,12 +39,23 @@ TEST(StoreTest, GetUpdatesSince) {
|
||||
ASSERT_EQ(1, updates_op.get()->size());
|
||||
delete updates_op.get();
|
||||
|
||||
updates_op = primary_store.get_updates_since(0, 10);
|
||||
ASSERT_TRUE(updates_op.ok());
|
||||
ASSERT_EQ(1, updates_op.get()->size());
|
||||
delete updates_op.get();
|
||||
|
||||
// add more records
|
||||
primary_store.insert("foo2", "bar2");
|
||||
primary_store.insert("foo3", "bar3");
|
||||
ASSERT_EQ(3, primary_store.get_latest_seq_number());
|
||||
|
||||
updates_op = primary_store.get_updates_since(0, 10);
|
||||
ASSERT_EQ(3, updates_op.get()->size());
|
||||
delete updates_op.get();
|
||||
|
||||
updates_op = primary_store.get_updates_since(3, 10);
|
||||
ASSERT_EQ(1, updates_op.get()->size());
|
||||
delete updates_op.get();
|
||||
|
||||
std::string replica_store_path = "/tmp/typesense_test/replica_store_test";
|
||||
LOG(INFO) << "Truncating and creating: " << replica_store_path;
|
||||
@ -88,4 +107,25 @@ TEST(StoreTest, GetUpdatesSince) {
|
||||
updates_op = primary_store.get_updates_since(50, 100);
|
||||
ASSERT_FALSE(updates_op.ok());
|
||||
ASSERT_EQ("Unable to fetch updates. Master's latest sequence number is 3", updates_op.error());
|
||||
}
|
||||
|
||||
TEST(StoreTest, GetUpdateSinceInvalidIterator) {
|
||||
std::string primary_store_path = "/tmp/typesense_test/primary_store_test";
|
||||
LOG(INFO) << "Truncating and creating: " << primary_store_path;
|
||||
system(("rm -rf "+primary_store_path+" && mkdir -p "+primary_store_path).c_str());
|
||||
|
||||
// add some records, get the updates and restore them in a new store
|
||||
|
||||
Store primary_store(primary_store_path, 0, 0); // disable WAL
|
||||
primary_store.insert("foo1", "bar1");
|
||||
primary_store.insert("foo2", "bar2");
|
||||
primary_store.insert("foo3", "bar3");
|
||||
primary_store.insert("foo4", "bar4");
|
||||
|
||||
primary_store.flush();
|
||||
|
||||
Option<std::vector<std::string>*> updates_op = primary_store.get_updates_since(2, 10);
|
||||
ASSERT_FALSE(updates_op.ok());
|
||||
ASSERT_EQ("Invalid iterator. Master's latest sequence number is 4 but updates are requested from sequence number 2. "
|
||||
"The master's WAL entries might have expired (they are kept only for 24 hours).", updates_op.error());
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user