diff --git a/include/backward.hpp b/include/backward.hpp index 14a58a69..2a2a94a8 100644 --- a/include/backward.hpp +++ b/include/backward.hpp @@ -4148,9 +4148,9 @@ private: bool loaded() const { return _loaded; } - static void (*_callback)(StackTrace&); + static void (*_callback)(int sig, StackTrace&); - static void handleSignal(int, siginfo_t *info, void *_ctx) { + static void handleSignal(int sig, siginfo_t *info, void *_ctx) { ucontext_t *uctx = static_cast(_ctx); StackTrace st; @@ -4189,10 +4189,6 @@ private: st.load_here(32, reinterpret_cast(uctx), info->si_addr); } - if(_callback) { - _callback(st); - } - Printer printer; printer.address = true; //printer.print(st, stderr); @@ -4208,6 +4204,10 @@ private: #else (void)info; #endif + + if(_callback) { + _callback(sig, st); + } } private: diff --git a/include/http_data.h b/include/http_data.h index a9f051bc..9f7f980b 100644 --- a/include/http_data.h +++ b/include/http_data.h @@ -239,6 +239,7 @@ struct http_req { std::condition_variable cv; bool ready; + int64_t log_index; http_req(): _req(nullptr), route_hash(1), first_chunk_aggregate(true), last_chunk_aggregate(false), @@ -320,6 +321,7 @@ struct http_req { first_chunk_aggregate = content.count("first_chunk_aggregate") != 0 ? content["first_chunk_aggregate"].get() : true; last_chunk_aggregate = content.count("last_chunk_aggregate") != 0 ? content["last_chunk_aggregate"].get() : false; start_ts = content.count("start_ts") != 0 ? content["start_ts"].get() : 0; + log_index = content.count("log_index") != 0 ? content["log_index"].get() : 0; _req = nullptr; deserialized_request = true; @@ -334,6 +336,7 @@ struct http_req { content["body"] = body; content["metadata"] = metadata; content["start_ts"] = start_ts; + content["log_index"] = log_index; return content.dump(-1, ' ', false, nlohmann::detail::error_handler_t::ignore); } diff --git a/include/thread_local_vars.h b/include/thread_local_vars.h new file mode 100644 index 00000000..37845ac5 --- /dev/null +++ b/include/thread_local_vars.h @@ -0,0 +1 @@ +extern thread_local int64_t write_log_index; \ No newline at end of file diff --git a/src/batched_indexer.cpp b/src/batched_indexer.cpp index 4acd83ed..0ab631bc 100644 --- a/src/batched_indexer.cpp +++ b/src/batched_indexer.cpp @@ -1,5 +1,6 @@ #include "batched_indexer.h" #include "core_api.h" +#include "thread_local_vars.h" BatchedIndexer::BatchedIndexer(HttpServer* server, Store* store, const size_t num_threads): server(server), store(store), num_threads(num_threads), @@ -111,6 +112,9 @@ void BatchedIndexer::run() { orig_req->deserialize(iter->value().ToString()); orig_req->_req = _req; + // update thread local for reference during a crash + write_log_index = orig_req->log_index; + //LOG(INFO) << "original request: " << orig_req_res.req << ", _req: " << orig_req_res.req->_req; route_path* found_rpath = nullptr; diff --git a/src/collection.cpp b/src/collection.cpp index 683ad911..ef05bc78 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -18,6 +18,7 @@ #include #include "topster.h" #include "logger.h" +#include "thread_local_vars.h" const std::string override_t::MATCH_EXACT = "exact"; const std::string override_t::MATCH_CONTAINS = "contains"; @@ -375,8 +376,15 @@ size_t Collection::par_index_in_memory(std::vector> & for(size_t index_id = 0; index_id < indices.size(); index_id++) { Index* index = indices[index_id]; + auto parent_write_log_index = write_log_index; + CollectionManager::get_instance().get_thread_pool()->enqueue( - [index, index_id, &num_indexed_vec, &iter_batch, this, &m_process, &num_processed, &cv_process]() { + [parent_write_log_index, index, index_id, &num_indexed_vec, &iter_batch, this, + &m_process, &num_processed, &cv_process]() { + + // ensures that a crash can be traced back to the write log index + write_log_index = parent_write_log_index; + size_t num_indexed = Index::batch_memory_index(index, std::ref(iter_batch[index_id]), default_sorting_field, search_schema, facet_schema, fallback_field_type); std::unique_lock lock(m_process); diff --git a/src/main/typesense_server.cpp b/src/main/typesense_server.cpp index a2645912..c265ba18 100644 --- a/src/main/typesense_server.cpp +++ b/src/main/typesense_server.cpp @@ -72,17 +72,21 @@ void master_server_routes() { server->post("/config", post_config, false, false); } -void (*backward::SignalHandling::_callback)(backward::StackTrace&) = nullptr; +void (*backward::SignalHandling::_callback)(int sig, backward::StackTrace&) = nullptr; -void crash_callback(backward::StackTrace& st) { +void crash_callback(int sig, backward::StackTrace& st) { backward::TraceResolver tr; tr.load_stacktrace(st); for (size_t i = 0; i < st.size(); ++i) { backward::ResolvedTrace trace = tr.resolve(st[i]); - if(trace.object_function.find("ReplicationState::on_apply") != std::string::npos) { + if(trace.object_function.find("BatchedIndexer") != std::string::npos || + trace.object_function.find("batch_memory_index") != std::string::npos) { server->persist_applying_index(); break; } } + + LOG(ERROR) << "Typesense is terminating abruptly."; + exit(-1); } int main(int argc, char **argv) { diff --git a/src/raft_server.cpp b/src/raft_server.cpp index bea91fdb..9367bafc 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -8,6 +8,7 @@ #include #include #include "rocksdb/utilities/checkpoint.h" +#include "thread_local_vars.h" namespace braft { DECLARE_int32(raft_do_snapshot_min_index_gap); @@ -366,6 +367,8 @@ void ReplicationState::on_apply(braft::Iterator& iter) { request_generated->deserialize(iter.data().to_string()); } + request_generated->log_index = iter.index(); + // To avoid blocking the serial Raft write thread persist the log entry in local storage. // Actual operations will be done in collection-sharded batch indexing threads. @@ -764,15 +767,12 @@ void ReplicationState::persist_applying_index() { return ; } - braft::NodeStatus node_status; - node->get_status(&node_status); - lock.unlock(); - LOG(INFO) << "Saving currently applying index: " << node_status.applying_index; + LOG(INFO) << "Saving currently applying index: " << write_log_index; - std::string key = SKIP_INDICES_PREFIX + std::to_string(node_status.applying_index); - meta_store->insert(key, std::to_string(node_status.applying_index)); + std::string key = SKIP_INDICES_PREFIX + std::to_string(write_log_index); + meta_store->insert(key, std::to_string(write_log_index)); } void OnDemandSnapshotClosure::Run() { diff --git a/src/thread_local_vars.cpp b/src/thread_local_vars.cpp new file mode 100644 index 00000000..79ba502b --- /dev/null +++ b/src/thread_local_vars.cpp @@ -0,0 +1,4 @@ +#include +#include "thread_local_vars.h" + +thread_local int64_t write_log_index = 0;