diff --git a/include/raft_server.h b/include/raft_server.h index a19efa8d..c8ad0142 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -112,8 +112,6 @@ private: const bool api_uses_ssl; - bool create_init_db_snapshot; - std::string raft_dir_path; std::string ext_snapshot_path; @@ -134,8 +132,7 @@ public: static constexpr const char* snapshot_dir_name = "snapshot"; ReplicationState(Store* store, ThreadPool* thread_pool, http_message_dispatcher* message_dispatcher, - bool api_uses_ssl, size_t catchup_min_sequence_diff, size_t catch_up_threshold_percentage, - bool create_init_db_snapshot); + bool api_uses_ssl, size_t catchup_min_sequence_diff, size_t catch_up_threshold_percentage); // Starts this node int start(const butil::EndPoint & peering_endpoint, int api_port, @@ -221,16 +218,6 @@ private: void on_leader_start(int64_t term) { leader_term.store(term, butil::memory_order_release); - - // have to do a dummy write, otherwise snapshot will not trigger - if(create_init_db_snapshot) { - std::map params; - std::shared_ptr request = std::make_shared(nullptr, "POST", "/INIT_SNAPSHOT", 0, params, - "INIT_SNAPSHOT"); - std::shared_ptr response = std::make_shared(); - write(request, response); - } - LOG(INFO) << "Node becomes leader, term: " << term; } @@ -260,7 +247,7 @@ private: LOG(INFO) << "Node stops following " << ctx; } - void write_to_leader(const std::shared_ptr& request, const std::shared_ptr& response) const; + void write_to_leader(const std::shared_ptr& request, const std::shared_ptr& response); void do_dummy_write(); diff --git a/src/raft_server.cpp b/src/raft_server.cpp index d0eb94a7..cdd4fc82 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -73,8 +73,7 @@ int ReplicationState::start(const butil::EndPoint & peering_endpoint, const int if(snapshot_exists) { // we will be assured of on_snapshot_load() firing and we will wait for that to init_db() - } else if(!create_init_db_snapshot) { - // `create_init_db_snapshot` can be handled separately only after leader starts + } else { LOG(INFO) << "Snapshot does not exist. We will remove db dir and init db fresh."; int reload_store = store->reload(true, ""); @@ -116,6 +115,11 @@ std::string ReplicationState::to_nodes_config(const butil::EndPoint& peering_end void ReplicationState::write(const std::shared_ptr& request, const std::shared_ptr& response) { if(shutting_down) { + //LOG(INFO) << "write(), force shutdown"; + response->set_503("Shutting down."); + response->final = true; + request->_req = nullptr; + request->notify(); return ; } @@ -152,7 +156,7 @@ void ReplicationState::write(const std::shared_ptr& request, const std return node->apply(task); } -void ReplicationState::write_to_leader(const std::shared_ptr& request, const std::shared_ptr& response) const { +void ReplicationState::write_to_leader(const std::shared_ptr& request, const std::shared_ptr& response) { // no lock on `node` needed as caller uses the lock if(!node || node->leader_id().is_empty()) { // Handle no leader scenario @@ -174,7 +178,7 @@ void ReplicationState::write_to_leader(const std::shared_ptr& request, if (request->_req->proceed_req && response->proxied_stream) { // indicates async request body of in-flight request - //LOG(INFO) << "Inflight proxied request, returning control to caller, body_size=" << request->body.size(); + LOG(INFO) << "Inflight proxied request, returning control to caller, body_size=" << request->body.size(); request->notify(); return ; } @@ -191,6 +195,8 @@ void ReplicationState::write_to_leader(const std::shared_ptr& request, const std::string& scheme = std::string(raw_req->scheme->name.base, raw_req->scheme->name.len); const std::string url = get_leader_url_path(leader_addr, path, scheme); + pending_writes++; + std::map res_headers; if(request->http_method == "POST") { @@ -202,12 +208,11 @@ void ReplicationState::write_to_leader(const std::shared_ptr& request, response->proxied_stream = true; long status = HttpClient::post_response_async(url, request, response, server); - //LOG(INFO) << "Import call done."; - if(status == 500) { response->content_type_header = res_headers["content-type"]; response->set_500(""); } else { + pending_writes--; return ; } } else { @@ -240,6 +245,8 @@ void ReplicationState::write_to_leader(const std::shared_ptr& request, auto replication_arg = new request_response_t{request, response}; replication_arg->req->route_hash = static_cast(ROUTE_CODES::ALREADY_HANDLED); message_dispatcher->send_message(REPLICATION_MSG, replication_arg); + + pending_writes--; }); } @@ -283,14 +290,6 @@ void ReplicationState::on_apply(braft::Iterator& iter) { }*/ } - if(request_generated->_req == nullptr && request_generated->body == "INIT_SNAPSHOT") { - // We attempt to trigger a cold snapshot against an existing stand-alone DB for backward compatibility - InitSnapshotClosure* init_snapshot_closure = new InitSnapshotClosure(this); - std::shared_lock lock(mutex); - node->snapshot(init_snapshot_closure); - continue ; - } - // Now that the log has been parsed, perform the actual operation // Call http server thread for write and response back to client (if `response` is NOT null) // We use a future to block current thread until the async flow finishes @@ -536,12 +535,12 @@ void ReplicationState::refresh_nodes(const std::string & nodes) { } ReplicationState::ReplicationState(Store *store, ThreadPool* thread_pool, http_message_dispatcher *message_dispatcher, - bool api_uses_ssl, size_t catchup_min_sequence_diff, size_t catch_up_threshold_percentage, - bool create_init_db_snapshot): + bool api_uses_ssl, size_t catchup_min_sequence_diff, + size_t catch_up_threshold_percentage): node(nullptr), leader_term(-1), store(store), thread_pool(thread_pool), message_dispatcher(message_dispatcher), catchup_min_sequence_diff(catchup_min_sequence_diff), catch_up_threshold_percentage(catch_up_threshold_percentage), - api_uses_ssl(api_uses_ssl), create_init_db_snapshot(create_init_db_snapshot), + api_uses_ssl(api_uses_ssl), ready(false), shutting_down(false), pending_writes(0) { } diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index 39f1629d..b5fdcc17 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -340,26 +340,6 @@ int run_server(const Config & config, const std::string & version, void (*master std::string db_dir = config.get_data_dir() + "/db"; std::string state_dir = config.get_data_dir() + "/state"; - bool create_init_db_snapshot = false; // for importing raw DB from earlier versions - - if(!directory_exists(db_dir) && file_exists(data_dir+"/CURRENT") && file_exists(data_dir+"/IDENTITY")) { - if(!config.get_nodes().empty()) { - LOG(ERROR) << "Your data directory needs to be migrated to the new format."; - LOG(ERROR) << "To do that, please start Typesense server without the --nodes argument."; - return 1; - } - - LOG(INFO) << "Migrating contents of data directory in a `db` sub-directory, as per the new data layout."; - bool moved = mv_dir(data_dir, db_dir); - if(!moved) { - LOG(ERROR) << "CRITICAL ERROR! Failed to migrate all files in the data directory into a `db` sub-directory."; - LOG(ERROR) << "NOTE: Please move remaining files manually. Failure to do so **WILL** lead to **DATA LOSS**."; - return 1; - } - - create_init_db_snapshot = true; - } - const size_t proc_count = std::max(1, std::thread::hardware_concurrency()); const size_t num_threads = std::max(proc_count * 8, 16); @@ -398,8 +378,7 @@ int run_server(const Config & config, const std::string & version, void (*master ReplicationState replication_state(&store, &app_thread_pool, server->get_message_dispatcher(), ssl_enabled, config.get_catch_up_min_sequence_diff(), - config.get_catch_up_threshold_percentage(), - create_init_db_snapshot); + config.get_catch_up_threshold_percentage()); std::thread raft_thread([&replication_state, &config, &state_dir, &app_thread_pool, &server_thread_pool]() { std::string path_to_nodes = config.get_nodes();