mirror of
https://github.com/typesense/typesense.git
synced 2025-05-19 21:22:25 +08:00
Remove support for init db snapshot.
Also, handle termination of follower process during import gracefully.
This commit is contained in:
parent
9f159b5d3d
commit
3159712ca3
@ -112,8 +112,6 @@ private:
|
||||
|
||||
const bool api_uses_ssl;
|
||||
|
||||
bool create_init_db_snapshot;
|
||||
|
||||
std::string raft_dir_path;
|
||||
|
||||
std::string ext_snapshot_path;
|
||||
@ -134,8 +132,7 @@ public:
|
||||
static constexpr const char* snapshot_dir_name = "snapshot";
|
||||
|
||||
ReplicationState(Store* store, ThreadPool* thread_pool, http_message_dispatcher* message_dispatcher,
|
||||
bool api_uses_ssl, size_t catchup_min_sequence_diff, size_t catch_up_threshold_percentage,
|
||||
bool create_init_db_snapshot);
|
||||
bool api_uses_ssl, size_t catchup_min_sequence_diff, size_t catch_up_threshold_percentage);
|
||||
|
||||
// Starts this node
|
||||
int start(const butil::EndPoint & peering_endpoint, int api_port,
|
||||
@ -221,16 +218,6 @@ private:
|
||||
|
||||
void on_leader_start(int64_t term) {
|
||||
leader_term.store(term, butil::memory_order_release);
|
||||
|
||||
// have to do a dummy write, otherwise snapshot will not trigger
|
||||
if(create_init_db_snapshot) {
|
||||
std::map<std::string, std::string> params;
|
||||
std::shared_ptr<http_req> request = std::make_shared<http_req>(nullptr, "POST", "/INIT_SNAPSHOT", 0, params,
|
||||
"INIT_SNAPSHOT");
|
||||
std::shared_ptr<http_res> response = std::make_shared<http_res>();
|
||||
write(request, response);
|
||||
}
|
||||
|
||||
LOG(INFO) << "Node becomes leader, term: " << term;
|
||||
}
|
||||
|
||||
@ -260,7 +247,7 @@ private:
|
||||
LOG(INFO) << "Node stops following " << ctx;
|
||||
}
|
||||
|
||||
void write_to_leader(const std::shared_ptr<http_req>& request, const std::shared_ptr<http_res>& response) const;
|
||||
void write_to_leader(const std::shared_ptr<http_req>& request, const std::shared_ptr<http_res>& response);
|
||||
|
||||
void do_dummy_write();
|
||||
|
||||
|
@ -73,8 +73,7 @@ int ReplicationState::start(const butil::EndPoint & peering_endpoint, const int
|
||||
|
||||
if(snapshot_exists) {
|
||||
// we will be assured of on_snapshot_load() firing and we will wait for that to init_db()
|
||||
} else if(!create_init_db_snapshot) {
|
||||
// `create_init_db_snapshot` can be handled separately only after leader starts
|
||||
} else {
|
||||
LOG(INFO) << "Snapshot does not exist. We will remove db dir and init db fresh.";
|
||||
|
||||
int reload_store = store->reload(true, "");
|
||||
@ -116,6 +115,11 @@ std::string ReplicationState::to_nodes_config(const butil::EndPoint& peering_end
|
||||
|
||||
void ReplicationState::write(const std::shared_ptr<http_req>& request, const std::shared_ptr<http_res>& response) {
|
||||
if(shutting_down) {
|
||||
//LOG(INFO) << "write(), force shutdown";
|
||||
response->set_503("Shutting down.");
|
||||
response->final = true;
|
||||
request->_req = nullptr;
|
||||
request->notify();
|
||||
return ;
|
||||
}
|
||||
|
||||
@ -152,7 +156,7 @@ void ReplicationState::write(const std::shared_ptr<http_req>& request, const std
|
||||
return node->apply(task);
|
||||
}
|
||||
|
||||
void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request, const std::shared_ptr<http_res>& response) const {
|
||||
void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request, const std::shared_ptr<http_res>& response) {
|
||||
// no lock on `node` needed as caller uses the lock
|
||||
if(!node || node->leader_id().is_empty()) {
|
||||
// Handle no leader scenario
|
||||
@ -174,7 +178,7 @@ void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request,
|
||||
|
||||
if (request->_req->proceed_req && response->proxied_stream) {
|
||||
// indicates async request body of in-flight request
|
||||
//LOG(INFO) << "Inflight proxied request, returning control to caller, body_size=" << request->body.size();
|
||||
LOG(INFO) << "Inflight proxied request, returning control to caller, body_size=" << request->body.size();
|
||||
request->notify();
|
||||
return ;
|
||||
}
|
||||
@ -191,6 +195,8 @@ void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request,
|
||||
const std::string& scheme = std::string(raw_req->scheme->name.base, raw_req->scheme->name.len);
|
||||
const std::string url = get_leader_url_path(leader_addr, path, scheme);
|
||||
|
||||
pending_writes++;
|
||||
|
||||
std::map<std::string, std::string> res_headers;
|
||||
|
||||
if(request->http_method == "POST") {
|
||||
@ -202,12 +208,11 @@ void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request,
|
||||
response->proxied_stream = true;
|
||||
long status = HttpClient::post_response_async(url, request, response, server);
|
||||
|
||||
//LOG(INFO) << "Import call done.";
|
||||
|
||||
if(status == 500) {
|
||||
response->content_type_header = res_headers["content-type"];
|
||||
response->set_500("");
|
||||
} else {
|
||||
pending_writes--;
|
||||
return ;
|
||||
}
|
||||
} else {
|
||||
@ -240,6 +245,8 @@ void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request,
|
||||
auto replication_arg = new request_response_t{request, response};
|
||||
replication_arg->req->route_hash = static_cast<uint64_t>(ROUTE_CODES::ALREADY_HANDLED);
|
||||
message_dispatcher->send_message(REPLICATION_MSG, replication_arg);
|
||||
|
||||
pending_writes--;
|
||||
});
|
||||
}
|
||||
|
||||
@ -283,14 +290,6 @@ void ReplicationState::on_apply(braft::Iterator& iter) {
|
||||
}*/
|
||||
}
|
||||
|
||||
if(request_generated->_req == nullptr && request_generated->body == "INIT_SNAPSHOT") {
|
||||
// We attempt to trigger a cold snapshot against an existing stand-alone DB for backward compatibility
|
||||
InitSnapshotClosure* init_snapshot_closure = new InitSnapshotClosure(this);
|
||||
std::shared_lock lock(mutex);
|
||||
node->snapshot(init_snapshot_closure);
|
||||
continue ;
|
||||
}
|
||||
|
||||
// Now that the log has been parsed, perform the actual operation
|
||||
// Call http server thread for write and response back to client (if `response` is NOT null)
|
||||
// We use a future to block current thread until the async flow finishes
|
||||
@ -536,12 +535,12 @@ void ReplicationState::refresh_nodes(const std::string & nodes) {
|
||||
}
|
||||
|
||||
ReplicationState::ReplicationState(Store *store, ThreadPool* thread_pool, http_message_dispatcher *message_dispatcher,
|
||||
bool api_uses_ssl, size_t catchup_min_sequence_diff, size_t catch_up_threshold_percentage,
|
||||
bool create_init_db_snapshot):
|
||||
bool api_uses_ssl, size_t catchup_min_sequence_diff,
|
||||
size_t catch_up_threshold_percentage):
|
||||
node(nullptr), leader_term(-1), store(store), thread_pool(thread_pool),
|
||||
message_dispatcher(message_dispatcher),
|
||||
catchup_min_sequence_diff(catchup_min_sequence_diff), catch_up_threshold_percentage(catch_up_threshold_percentage),
|
||||
api_uses_ssl(api_uses_ssl), create_init_db_snapshot(create_init_db_snapshot),
|
||||
api_uses_ssl(api_uses_ssl),
|
||||
ready(false), shutting_down(false), pending_writes(0) {
|
||||
|
||||
}
|
||||
|
@ -340,26 +340,6 @@ int run_server(const Config & config, const std::string & version, void (*master
|
||||
std::string db_dir = config.get_data_dir() + "/db";
|
||||
std::string state_dir = config.get_data_dir() + "/state";
|
||||
|
||||
bool create_init_db_snapshot = false; // for importing raw DB from earlier versions
|
||||
|
||||
if(!directory_exists(db_dir) && file_exists(data_dir+"/CURRENT") && file_exists(data_dir+"/IDENTITY")) {
|
||||
if(!config.get_nodes().empty()) {
|
||||
LOG(ERROR) << "Your data directory needs to be migrated to the new format.";
|
||||
LOG(ERROR) << "To do that, please start Typesense server without the --nodes argument.";
|
||||
return 1;
|
||||
}
|
||||
|
||||
LOG(INFO) << "Migrating contents of data directory in a `db` sub-directory, as per the new data layout.";
|
||||
bool moved = mv_dir(data_dir, db_dir);
|
||||
if(!moved) {
|
||||
LOG(ERROR) << "CRITICAL ERROR! Failed to migrate all files in the data directory into a `db` sub-directory.";
|
||||
LOG(ERROR) << "NOTE: Please move remaining files manually. Failure to do so **WILL** lead to **DATA LOSS**.";
|
||||
return 1;
|
||||
}
|
||||
|
||||
create_init_db_snapshot = true;
|
||||
}
|
||||
|
||||
const size_t proc_count = std::max<size_t>(1, std::thread::hardware_concurrency());
|
||||
const size_t num_threads = std::max<size_t>(proc_count * 8, 16);
|
||||
|
||||
@ -398,8 +378,7 @@ int run_server(const Config & config, const std::string & version, void (*master
|
||||
|
||||
ReplicationState replication_state(&store, &app_thread_pool, server->get_message_dispatcher(),
|
||||
ssl_enabled, config.get_catch_up_min_sequence_diff(),
|
||||
config.get_catch_up_threshold_percentage(),
|
||||
create_init_db_snapshot);
|
||||
config.get_catch_up_threshold_percentage());
|
||||
|
||||
std::thread raft_thread([&replication_state, &config, &state_dir, &app_thread_pool, &server_thread_pool]() {
|
||||
std::string path_to_nodes = config.get_nodes();
|
||||
|
Loading…
x
Reference in New Issue
Block a user