Return document with bad JSON in import response.

This commit is contained in:
kishorenc 2020-08-28 18:50:35 +05:30
parent 1e5dd2bdbc
commit 9dc7450a09
2 changed files with 13 additions and 22 deletions

View File

@ -104,11 +104,11 @@ Option<uint32_t> Collection::to_doc(const std::string & json_str, nlohmann::json
document = nlohmann::json::parse(json_str);
} catch(const std::exception& e) {
LOG(ERROR) << "JSON error: " << e.what();
return Option<uint32_t>(400, "Bad JSON.");
return Option<uint32_t>(400, std::string("Bad JSON: ") + e.what());
}
if(!document.is_object()) {
return Option<uint32_t>(400, "Bad JSON.");
return Option<uint32_t>(400, "Bad JSON: not a properly formed document.");
}
uint32_t seq_id = get_next_seq_id();
@ -206,15 +206,14 @@ nlohmann::json Collection::add_many(std::vector<std::string>& json_lines) {
nlohmann::json document;
Option<uint32_t> doc_seq_id_op = to_doc(json_line, document);
// NOTE: we overwrite the input json_lines with result to avoid memory pressure
if(!doc_seq_id_op.ok()) {
nlohmann::json index_res;
index_res["error"] = doc_seq_id_op.error();
index_res["success"] = false;
index_res["document"] = json_line;
// FIXME:
LOG(INFO) << "Document parsing error, bad json_line is: " << json_line;
// NOTE: we overwrite the input json_lines with result to avoid memory pressure
json_lines[i] = index_res.dump();
continue;
}
@ -229,7 +228,6 @@ nlohmann::json Collection::add_many(std::vector<std::string>& json_lines) {
index_res["error"] = "Max memory ratio exceeded.";
index_res["success"] = false;
// NOTE: we overwrite the input json_lines with result to avoid memory pressure
json_lines[i] = index_res.dump();
continue;
}
@ -736,8 +734,7 @@ Option<nlohmann::json> Collection::search(const std::string & query, const std::
size_t total_found = 0;
spp::sparse_hash_set<uint64_t> groups_processed; // used to calculate total_found for grouped query
// FIXME:
LOG(INFO) << "Num indices used for querying: " << indices.size();
//LOG(INFO) << "Num indices used for querying: " << indices.size();
// send data to individual index threads
size_t index_id = 0;

View File

@ -589,9 +589,8 @@ bool post_import_documents(http_req& req, http_res& res) {
if(req.body_index == 0) {
// will log for every major chunk of request body
LOG(INFO) << "Import, req.body.size=" << req.body.size() << ", batch_size=" << IMPORT_BATCH_SIZE;
// FIXME:
int nminusten_pos = std::max(0, int(req.body.size())-10);
LOG(INFO) << "Last 10 chars: " << req.body.substr(nminusten_pos);
//int nminusten_pos = std::max(0, int(req.body.size())-10);
//LOG(INFO) << "Last 10 chars: " << req.body.substr(nminusten_pos);
}
CollectionManager & collectionManager = CollectionManager::get_instance();
@ -604,15 +603,13 @@ bool post_import_documents(http_req& req, http_res& res) {
return false;
}
// FIXME:
LOG(INFO) << "Import, " << "req.body_index=" << req.body_index << ", req.body.size: " << req.body.size();
//LOG(INFO) << "Import, " << "req.body_index=" << req.body_index << ", req.body.size: " << req.body.size();
//LOG(INFO) << "req body %: " << (float(req.body_index)/req.body.size())*100;
std::vector<std::string> json_lines;
req.body_index = StringUtils::split(req.body, json_lines, "\n", false, req.body_index, IMPORT_BATCH_SIZE);
// FIXME:
LOG(INFO) << "json_lines.size before: " << json_lines.size();
//LOG(INFO) << "json_lines.size before: " << json_lines.size() << ", req.body_index: " << req.body_index;
bool stream_proceed = false; // default state
@ -622,8 +619,7 @@ bool post_import_documents(http_req& req, http_res& res) {
stream_proceed = true;
if(req.last_chunk_aggregate) {
// FIXME:
LOG(INFO) << "req.last_chunk_aggregate is true";
//LOG(INFO) << "req.last_chunk_aggregate is true";
req.body = "";
} else {
if(!json_lines.empty()) {
@ -648,16 +644,14 @@ bool post_import_documents(http_req& req, http_res& res) {
}
}
// FIXME:
LOG(INFO) << "json_lines.size after: " << json_lines.size();
//LOG(INFO) << "json_lines.size after: " << json_lines.size() << ", stream_proceed: " << stream_proceed;
//LOG(INFO) << "json_lines.size: " << json_lines.size() << ", req.stream_state: " << req.stream_state;
// When only one partial record arrives as a chunk, an empty body is pushed to response stream
bool single_partial_record_body = (json_lines.empty() && !req.body.empty());
std::stringstream response_stream;
// FIXME:
LOG(INFO) << "single_partial_record_body: " << single_partial_record_body;
//LOG(INFO) << "single_partial_record_body: " << single_partial_record_body;
if(!single_partial_record_body) {
nlohmann::json json_res = collection->add_many(json_lines);