Fix race condition in snapshot restoration.

This commit is contained in:
kishorenc 2021-02-12 16:15:32 +05:30
parent 0175ac0df4
commit e2882c5828
4 changed files with 92 additions and 68 deletions

View File

@ -177,7 +177,7 @@ public:
int init_db();
void reset_db();
Store* get_store();
void do_snapshot(const std::string& snapshot_path, http_req& req, http_res& res);

View File

@ -11,8 +11,11 @@
#include <rocksdb/options.h>
#include <rocksdb/merge_operator.h>
#include <rocksdb/transaction_log.h>
#include <butil/file_util.h>
#include <rocksdb/utilities/checkpoint.h>
#include "string_utils.h"
#include "logger.h"
#include "file_utils.h"
class UInt64AddOperator : public rocksdb::AssociativeMergeOperator {
public:
@ -48,6 +51,22 @@ private:
rocksdb::Options options;
rocksdb::WriteOptions write_options;
rocksdb::Status init_db() {
rocksdb::Status s = rocksdb::DB::Open(options, state_dir_path, &db);
if(!s.ok()) {
LOG(ERROR) << "Error while initializing store: " << s.ToString();
if(s.code() == rocksdb::Status::Code::kIOError) {
LOG(ERROR) << "It seems like the data directory " << state_dir_path << " is already being used by "
<< "another Typesense server. ";
LOG(ERROR) << "If you are SURE that this is not the case, delete the LOCK file "
<< "in the data db directory and try again.";
}
}
assert(s.ok());
return s;
}
public:
Store() = delete;
@ -82,22 +101,6 @@ public:
close();
}
rocksdb::Status init_db() {
rocksdb::Status s = rocksdb::DB::Open(options, state_dir_path, &db);
if(!s.ok()) {
LOG(ERROR) << "Error while initializing store: " << s.ToString();
if(s.code() == rocksdb::Status::Code::kIOError) {
LOG(ERROR) << "It seems like the data directory " << state_dir_path << " is already being used by "
<< "another Typesense server. ";
LOG(ERROR) << "If you are SURE that this is not the case, delete the LOCK file "
<< "in the data db directory and try again.";
}
}
assert(s.ok());
return s;
}
bool insert(const std::string& key, const std::string& value) {
rocksdb::Status status = db->Put(write_options, key, value);
return status.ok();
@ -243,11 +246,66 @@ public:
db = nullptr;
}
int reload(bool clear_state_dir, const std::string& snapshot_path) {
close();
if(clear_state_dir) {
if (!butil::DeleteFile(butil::FilePath(state_dir_path), true)) {
LOG(WARNING) << "rm " << state_dir_path << " failed";
return -1;
}
LOG(INFO) << "rm " << state_dir_path << " success";
}
if(!snapshot_path.empty()) {
// tries to use link if possible, or else copies
if (!copy_dir(snapshot_path, state_dir_path)) {
LOG(WARNING) << "copy snapshot " << snapshot_path << " to " << state_dir_path << " failed";
return -1;
}
LOG(INFO) << "copy snapshot " << snapshot_path << " to " << state_dir_path << " success";
}
if (!butil::CreateDirectory(butil::FilePath(state_dir_path))) {
LOG(WARNING) << "CreateDirectory " << state_dir_path << " failed";
return -1;
}
const rocksdb::Status& status = init_db();
if (!status.ok()) {
LOG(WARNING) << "Open DB " << state_dir_path << " failed, msg: " << status.ToString();
return -1;
}
LOG(INFO) << "DB open success!";
return 0;
}
void flush() {
rocksdb::FlushOptions options;
db->Flush(options);
}
rocksdb::Status create_check_point(rocksdb::Checkpoint** checkpoint_ptr, const std::string& db_snapshot_path) {
rocksdb::Status status = rocksdb::Checkpoint::Create(db, checkpoint_ptr);
if(!status.ok()) {
LOG(ERROR) << "Checkpoint Create failed, msg:" << status.ToString();
return status;
}
status = (*checkpoint_ptr)->CreateCheckpoint(db_snapshot_path);
if(!status.ok()) {
LOG(WARNING) << "Checkpoint CreateCheckpoint failed at snapshot path: "
<< db_snapshot_path << ", msg:" << status.ToString();
}
return status;
}
// Only for internal tests
rocksdb::DB* _get_db_unsafe() const {
return db;

View File

@ -231,7 +231,7 @@ size_t HttpClient::curl_write_async(char *buffer, size_t size, size_t nmemb, voi
}
size_t HttpClient::curl_write_async_done(void *context, curl_socket_t item) {
LOG(INFO) << "curl_write_async_done";
//LOG(INFO) << "curl_write_async_done";
deferred_req_res_t* req_res = static_cast<deferred_req_res_t *>(context);
if(req_res->req->_req == nullptr) {

View File

@ -77,10 +77,9 @@ int ReplicationState::start(const butil::EndPoint & peering_endpoint, const int
// `create_init_db_snapshot` can be handled separately only after leader starts
LOG(INFO) << "Snapshot does not exist. We will remove db dir and init db fresh.";
reset_db();
if (!butil::DeleteFile(butil::FilePath(store->get_state_dir_path()), true)) {
LOG(WARNING) << "rm " << store->get_state_dir_path() << " failed";
return -1;
int reload_store = store->reload(true, "");
if(reload_store != 0) {
return reload_store;
}
int init_db_status = init_db();
@ -358,23 +357,14 @@ void* ReplicationState::save_snapshot(void* arg) {
void ReplicationState::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) {
LOG(INFO) << "on_snapshot_save";
rocksdb::Checkpoint* checkpoint = nullptr;
rocksdb::Status status = rocksdb::Checkpoint::Create(store->_get_db_unsafe(), &checkpoint);
if(!status.ok()) {
LOG(WARNING) << "Checkpoint Create failed, msg:" << status.ToString();
done->status().set_error(EIO, "Checkpoint Create failed.");
}
std::string db_snapshot_path = writer->get_path() + "/" + db_snapshot_name;
rocksdb::Checkpoint* checkpoint = nullptr;
rocksdb::Status status = store->create_check_point(&checkpoint, db_snapshot_path);
std::unique_ptr<rocksdb::Checkpoint> checkpoint_guard(checkpoint);
status = checkpoint->CreateCheckpoint(db_snapshot_path);
if(!status.ok()) {
LOG(WARNING) << "Checkpoint CreateCheckpoint failed at snapshot path: "
<< db_snapshot_path << ", msg:" << status.ToString();
done->status().set_error(EIO, "CreateCheckpoint failed.");
LOG(ERROR) << "Failure during checkpoint creation, msg:" << status.ToString();
done->status().set_error(EIO, "Checkpoint creation failure.");
}
SnapshotArg* arg = new SnapshotArg;
@ -396,18 +386,6 @@ void ReplicationState::on_snapshot_save(braft::SnapshotWriter* writer, braft::Cl
}
int ReplicationState::init_db() {
if (!butil::CreateDirectory(butil::FilePath(store->get_state_dir_path()))) {
LOG(WARNING) << "CreateDirectory " << store->get_state_dir_path() << " failed";
return -1;
}
const rocksdb::Status& status = store->init_db();
if (!status.ok()) {
LOG(WARNING) << "Open DB " << store->get_state_dir_path() << " failed, msg: " << status.ToString();
return -1;
}
LOG(INFO) << "DB open success!";
LOG(INFO) << "Loading collections from disk...";
Option<bool> init_op = CollectionManager::get_instance().load();
@ -427,27 +405,15 @@ int ReplicationState::on_snapshot_load(braft::SnapshotReader* reader) {
LOG(INFO) << "on_snapshot_load";
// Load snapshot from reader, replacing the running StateMachine
reset_db();
if (!butil::DeleteFile(butil::FilePath(store->get_state_dir_path()), true)) {
LOG(WARNING) << "rm " << store->get_state_dir_path() << " failed";
return -1;
}
LOG(INFO) << "rm " << store->get_state_dir_path() << " success";
// Load snapshot from leader, replacing the running StateMachine
std::string snapshot_path = reader->get_path();
snapshot_path.append(std::string("/") + db_snapshot_name);
// tries to use link if possible, or else copies
if (!copy_dir(snapshot_path, store->get_state_dir_path())) {
LOG(WARNING) << "copy snapshot " << snapshot_path << " to " << store->get_state_dir_path() << " failed";
return -1;
int reload_store = store->reload(true, snapshot_path);
if(reload_store != 0) {
return reload_store;
}
LOG(INFO) << "copy snapshot " << snapshot_path << " to " << store->get_state_dir_path() << " success";
return init_db();
}
@ -539,10 +505,6 @@ ReplicationState::ReplicationState(Store *store, ThreadPool* thread_pool, http_m
}
void ReplicationState::reset_db() {
store->close();
}
bool ReplicationState::is_alive() const {
if(node == nullptr || !is_ready()) {
return false;
@ -612,13 +574,17 @@ http_message_dispatcher* ReplicationState::get_message_dispatcher() const {
return message_dispatcher;
}
Store* ReplicationState::get_store() {
return store;
}
void InitSnapshotClosure::Run() {
// Auto delete this after Run()
std::unique_ptr<InitSnapshotClosure> self_guard(this);
if(status().ok()) {
LOG(INFO) << "Init snapshot succeeded!";
replication_state->reset_db();
replication_state->get_store()->reload(false, "");
replication_state->init_db();
} else {
LOG(ERROR) << "Init snapshot failed, error: " << status().error_str() << ", code: " << status().error_code();