Handle stack trace on batched indexing.

This commit is contained in:
Kishore Nallan 2021-08-21 21:50:37 +05:30
parent 76915eed20
commit ced4163062
8 changed files with 40 additions and 16 deletions

View File

@ -4148,9 +4148,9 @@ private:
bool loaded() const { return _loaded; }
static void (*_callback)(StackTrace&);
static void (*_callback)(int sig, StackTrace&);
static void handleSignal(int, siginfo_t *info, void *_ctx) {
static void handleSignal(int sig, siginfo_t *info, void *_ctx) {
ucontext_t *uctx = static_cast<ucontext_t *>(_ctx);
StackTrace st;
@ -4189,10 +4189,6 @@ private:
st.load_here(32, reinterpret_cast<void *>(uctx), info->si_addr);
}
if(_callback) {
_callback(st);
}
Printer printer;
printer.address = true;
//printer.print(st, stderr);
@ -4208,6 +4204,10 @@ private:
#else
(void)info;
#endif
if(_callback) {
_callback(sig, st);
}
}
private:

View File

@ -239,6 +239,7 @@ struct http_req {
std::condition_variable cv;
bool ready;
int64_t log_index;
http_req(): _req(nullptr), route_hash(1),
first_chunk_aggregate(true), last_chunk_aggregate(false),
@ -320,6 +321,7 @@ struct http_req {
first_chunk_aggregate = content.count("first_chunk_aggregate") != 0 ? content["first_chunk_aggregate"].get<bool>() : true;
last_chunk_aggregate = content.count("last_chunk_aggregate") != 0 ? content["last_chunk_aggregate"].get<bool>() : false;
start_ts = content.count("start_ts") != 0 ? content["start_ts"].get<uint64_t>() : 0;
log_index = content.count("log_index") != 0 ? content["log_index"].get<int64_t>() : 0;
_req = nullptr;
deserialized_request = true;
@ -334,6 +336,7 @@ struct http_req {
content["body"] = body;
content["metadata"] = metadata;
content["start_ts"] = start_ts;
content["log_index"] = log_index;
return content.dump(-1, ' ', false, nlohmann::detail::error_handler_t::ignore);
}

View File

@ -0,0 +1 @@
extern thread_local int64_t write_log_index;

View File

@ -1,5 +1,6 @@
#include "batched_indexer.h"
#include "core_api.h"
#include "thread_local_vars.h"
BatchedIndexer::BatchedIndexer(HttpServer* server, Store* store, const size_t num_threads):
server(server), store(store), num_threads(num_threads),
@ -111,6 +112,9 @@ void BatchedIndexer::run() {
orig_req->deserialize(iter->value().ToString());
orig_req->_req = _req;
// update thread local for reference during a crash
write_log_index = orig_req->log_index;
//LOG(INFO) << "original request: " << orig_req_res.req << ", _req: " << orig_req_res.req->_req;
route_path* found_rpath = nullptr;

View File

@ -18,6 +18,7 @@
#include <posting.h>
#include "topster.h"
#include "logger.h"
#include "thread_local_vars.h"
const std::string override_t::MATCH_EXACT = "exact";
const std::string override_t::MATCH_CONTAINS = "contains";
@ -375,8 +376,15 @@ size_t Collection::par_index_in_memory(std::vector<std::vector<index_record>> &
for(size_t index_id = 0; index_id < indices.size(); index_id++) {
Index* index = indices[index_id];
auto parent_write_log_index = write_log_index;
CollectionManager::get_instance().get_thread_pool()->enqueue(
[index, index_id, &num_indexed_vec, &iter_batch, this, &m_process, &num_processed, &cv_process]() {
[parent_write_log_index, index, index_id, &num_indexed_vec, &iter_batch, this,
&m_process, &num_processed, &cv_process]() {
// ensures that a crash can be traced back to the write log index
write_log_index = parent_write_log_index;
size_t num_indexed = Index::batch_memory_index(index, std::ref(iter_batch[index_id]), default_sorting_field,
search_schema, facet_schema, fallback_field_type);
std::unique_lock<std::mutex> lock(m_process);

View File

@ -72,17 +72,21 @@ void master_server_routes() {
server->post("/config", post_config, false, false);
}
void (*backward::SignalHandling::_callback)(backward::StackTrace&) = nullptr;
void (*backward::SignalHandling::_callback)(int sig, backward::StackTrace&) = nullptr;
void crash_callback(backward::StackTrace& st) {
void crash_callback(int sig, backward::StackTrace& st) {
backward::TraceResolver tr; tr.load_stacktrace(st);
for (size_t i = 0; i < st.size(); ++i) {
backward::ResolvedTrace trace = tr.resolve(st[i]);
if(trace.object_function.find("ReplicationState::on_apply") != std::string::npos) {
if(trace.object_function.find("BatchedIndexer") != std::string::npos ||
trace.object_function.find("batch_memory_index") != std::string::npos) {
server->persist_applying_index();
break;
}
}
LOG(ERROR) << "Typesense is terminating abruptly.";
exit(-1);
}
int main(int argc, char **argv) {

View File

@ -8,6 +8,7 @@
#include <collection_manager.h>
#include <http_client.h>
#include "rocksdb/utilities/checkpoint.h"
#include "thread_local_vars.h"
namespace braft {
DECLARE_int32(raft_do_snapshot_min_index_gap);
@ -366,6 +367,8 @@ void ReplicationState::on_apply(braft::Iterator& iter) {
request_generated->deserialize(iter.data().to_string());
}
request_generated->log_index = iter.index();
// To avoid blocking the serial Raft write thread persist the log entry in local storage.
// Actual operations will be done in collection-sharded batch indexing threads.
@ -764,15 +767,12 @@ void ReplicationState::persist_applying_index() {
return ;
}
braft::NodeStatus node_status;
node->get_status(&node_status);
lock.unlock();
LOG(INFO) << "Saving currently applying index: " << node_status.applying_index;
LOG(INFO) << "Saving currently applying index: " << write_log_index;
std::string key = SKIP_INDICES_PREFIX + std::to_string(node_status.applying_index);
meta_store->insert(key, std::to_string(node_status.applying_index));
std::string key = SKIP_INDICES_PREFIX + std::to_string(write_log_index);
meta_store->insert(key, std::to_string(write_log_index));
}
void OnDemandSnapshotClosure::Run() {

View File

@ -0,0 +1,4 @@
#include <cstdint>
#include "thread_local_vars.h"
thread_local int64_t write_log_index = 0;