diff --git a/include/raft_server.h b/include/raft_server.h index 09eb5d21..9beab1cd 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -177,7 +177,7 @@ public: int init_db(); - void reset_db(); + Store* get_store(); void do_snapshot(const std::string& snapshot_path, http_req& req, http_res& res); diff --git a/include/store.h b/include/store.h index 2b2cbc32..b700df0d 100644 --- a/include/store.h +++ b/include/store.h @@ -11,8 +11,11 @@ #include #include #include +#include +#include #include "string_utils.h" #include "logger.h" +#include "file_utils.h" class UInt64AddOperator : public rocksdb::AssociativeMergeOperator { public: @@ -48,6 +51,22 @@ private: rocksdb::Options options; rocksdb::WriteOptions write_options; + rocksdb::Status init_db() { + rocksdb::Status s = rocksdb::DB::Open(options, state_dir_path, &db); + if(!s.ok()) { + LOG(ERROR) << "Error while initializing store: " << s.ToString(); + if(s.code() == rocksdb::Status::Code::kIOError) { + LOG(ERROR) << "It seems like the data directory " << state_dir_path << " is already being used by " + << "another Typesense server. "; + LOG(ERROR) << "If you are SURE that this is not the case, delete the LOCK file " + << "in the data db directory and try again."; + } + } + + assert(s.ok()); + return s; + } + public: Store() = delete; @@ -82,22 +101,6 @@ public: close(); } - rocksdb::Status init_db() { - rocksdb::Status s = rocksdb::DB::Open(options, state_dir_path, &db); - if(!s.ok()) { - LOG(ERROR) << "Error while initializing store: " << s.ToString(); - if(s.code() == rocksdb::Status::Code::kIOError) { - LOG(ERROR) << "It seems like the data directory " << state_dir_path << " is already being used by " - << "another Typesense server. "; - LOG(ERROR) << "If you are SURE that this is not the case, delete the LOCK file " - << "in the data db directory and try again."; - } - } - - assert(s.ok()); - return s; - } - bool insert(const std::string& key, const std::string& value) { rocksdb::Status status = db->Put(write_options, key, value); return status.ok(); @@ -243,11 +246,66 @@ public: db = nullptr; } + int reload(bool clear_state_dir, const std::string& snapshot_path) { + close(); + + if(clear_state_dir) { + if (!butil::DeleteFile(butil::FilePath(state_dir_path), true)) { + LOG(WARNING) << "rm " << state_dir_path << " failed"; + return -1; + } + + LOG(INFO) << "rm " << state_dir_path << " success"; + } + + if(!snapshot_path.empty()) { + // tries to use link if possible, or else copies + if (!copy_dir(snapshot_path, state_dir_path)) { + LOG(WARNING) << "copy snapshot " << snapshot_path << " to " << state_dir_path << " failed"; + return -1; + } + + LOG(INFO) << "copy snapshot " << snapshot_path << " to " << state_dir_path << " success"; + } + + if (!butil::CreateDirectory(butil::FilePath(state_dir_path))) { + LOG(WARNING) << "CreateDirectory " << state_dir_path << " failed"; + return -1; + } + + const rocksdb::Status& status = init_db(); + if (!status.ok()) { + LOG(WARNING) << "Open DB " << state_dir_path << " failed, msg: " << status.ToString(); + return -1; + } + + LOG(INFO) << "DB open success!"; + + return 0; + } + void flush() { rocksdb::FlushOptions options; db->Flush(options); } + rocksdb::Status create_check_point(rocksdb::Checkpoint** checkpoint_ptr, const std::string& db_snapshot_path) { + rocksdb::Status status = rocksdb::Checkpoint::Create(db, checkpoint_ptr); + if(!status.ok()) { + LOG(ERROR) << "Checkpoint Create failed, msg:" << status.ToString(); + return status; + } + + status = (*checkpoint_ptr)->CreateCheckpoint(db_snapshot_path); + + if(!status.ok()) { + LOG(WARNING) << "Checkpoint CreateCheckpoint failed at snapshot path: " + << db_snapshot_path << ", msg:" << status.ToString(); + } + + return status; + } + // Only for internal tests rocksdb::DB* _get_db_unsafe() const { return db; diff --git a/src/http_client.cpp b/src/http_client.cpp index 1ebe9145..8908ecc0 100644 --- a/src/http_client.cpp +++ b/src/http_client.cpp @@ -231,7 +231,7 @@ size_t HttpClient::curl_write_async(char *buffer, size_t size, size_t nmemb, voi } size_t HttpClient::curl_write_async_done(void *context, curl_socket_t item) { - LOG(INFO) << "curl_write_async_done"; + //LOG(INFO) << "curl_write_async_done"; deferred_req_res_t* req_res = static_cast(context); if(req_res->req->_req == nullptr) { diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 2d419fde..f3afaf31 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -77,10 +77,9 @@ int ReplicationState::start(const butil::EndPoint & peering_endpoint, const int // `create_init_db_snapshot` can be handled separately only after leader starts LOG(INFO) << "Snapshot does not exist. We will remove db dir and init db fresh."; - reset_db(); - if (!butil::DeleteFile(butil::FilePath(store->get_state_dir_path()), true)) { - LOG(WARNING) << "rm " << store->get_state_dir_path() << " failed"; - return -1; + int reload_store = store->reload(true, ""); + if(reload_store != 0) { + return reload_store; } int init_db_status = init_db(); @@ -358,23 +357,14 @@ void* ReplicationState::save_snapshot(void* arg) { void ReplicationState::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) { LOG(INFO) << "on_snapshot_save"; - rocksdb::Checkpoint* checkpoint = nullptr; - rocksdb::Status status = rocksdb::Checkpoint::Create(store->_get_db_unsafe(), &checkpoint); - - if(!status.ok()) { - LOG(WARNING) << "Checkpoint Create failed, msg:" << status.ToString(); - done->status().set_error(EIO, "Checkpoint Create failed."); - } - std::string db_snapshot_path = writer->get_path() + "/" + db_snapshot_name; - + rocksdb::Checkpoint* checkpoint = nullptr; + rocksdb::Status status = store->create_check_point(&checkpoint, db_snapshot_path); std::unique_ptr checkpoint_guard(checkpoint); - status = checkpoint->CreateCheckpoint(db_snapshot_path); if(!status.ok()) { - LOG(WARNING) << "Checkpoint CreateCheckpoint failed at snapshot path: " - << db_snapshot_path << ", msg:" << status.ToString(); - done->status().set_error(EIO, "CreateCheckpoint failed."); + LOG(ERROR) << "Failure during checkpoint creation, msg:" << status.ToString(); + done->status().set_error(EIO, "Checkpoint creation failure."); } SnapshotArg* arg = new SnapshotArg; @@ -396,18 +386,6 @@ void ReplicationState::on_snapshot_save(braft::SnapshotWriter* writer, braft::Cl } int ReplicationState::init_db() { - if (!butil::CreateDirectory(butil::FilePath(store->get_state_dir_path()))) { - LOG(WARNING) << "CreateDirectory " << store->get_state_dir_path() << " failed"; - return -1; - } - - const rocksdb::Status& status = store->init_db(); - if (!status.ok()) { - LOG(WARNING) << "Open DB " << store->get_state_dir_path() << " failed, msg: " << status.ToString(); - return -1; - } - - LOG(INFO) << "DB open success!"; LOG(INFO) << "Loading collections from disk..."; Option init_op = CollectionManager::get_instance().load(); @@ -427,27 +405,15 @@ int ReplicationState::on_snapshot_load(braft::SnapshotReader* reader) { LOG(INFO) << "on_snapshot_load"; - // Load snapshot from reader, replacing the running StateMachine - - reset_db(); - if (!butil::DeleteFile(butil::FilePath(store->get_state_dir_path()), true)) { - LOG(WARNING) << "rm " << store->get_state_dir_path() << " failed"; - return -1; - } - - LOG(INFO) << "rm " << store->get_state_dir_path() << " success"; - + // Load snapshot from leader, replacing the running StateMachine std::string snapshot_path = reader->get_path(); snapshot_path.append(std::string("/") + db_snapshot_name); - // tries to use link if possible, or else copies - if (!copy_dir(snapshot_path, store->get_state_dir_path())) { - LOG(WARNING) << "copy snapshot " << snapshot_path << " to " << store->get_state_dir_path() << " failed"; - return -1; + int reload_store = store->reload(true, snapshot_path); + if(reload_store != 0) { + return reload_store; } - LOG(INFO) << "copy snapshot " << snapshot_path << " to " << store->get_state_dir_path() << " success"; - return init_db(); } @@ -539,10 +505,6 @@ ReplicationState::ReplicationState(Store *store, ThreadPool* thread_pool, http_m } -void ReplicationState::reset_db() { - store->close(); -} - bool ReplicationState::is_alive() const { if(node == nullptr || !is_ready()) { return false; @@ -612,13 +574,17 @@ http_message_dispatcher* ReplicationState::get_message_dispatcher() const { return message_dispatcher; } +Store* ReplicationState::get_store() { + return store; +} + void InitSnapshotClosure::Run() { // Auto delete this after Run() std::unique_ptr self_guard(this); if(status().ok()) { LOG(INFO) << "Init snapshot succeeded!"; - replication_state->reset_db(); + replication_state->get_store()->reload(false, ""); replication_state->init_db(); } else { LOG(ERROR) << "Init snapshot failed, error: " << status().error_str() << ", code: " << status().error_code();