Free request & response objects via generator dispose callback.

This commit is contained in:
kishorenc 2020-08-17 07:35:00 +05:30
parent 4a173ee49d
commit e2c0b52671
10 changed files with 149 additions and 187 deletions

View File

@ -223,7 +223,7 @@ public:
Option<nlohmann::json> add(const std::string & json_str);
Option<nlohmann::json> add_many(std::vector<std::string>& json_lines);
nlohmann::json add_many(std::vector<std::string>& json_lines);
Option<nlohmann::json> search(const std::string & query, const std::vector<std::string> & search_fields,
const std::string & simple_filter_query, const std::vector<std::string> & facet_fields,

View File

@ -7,65 +7,65 @@ bool handle_authentication(std::map<std::string, std::string>& req_params, const
// Collections
bool get_collections(http_req & req, http_res & res);
bool get_collections(http_req& req, http_res& res);
bool post_create_collection(http_req & req, http_res & res);
bool post_create_collection(http_req& req, http_res& res);
bool del_drop_collection(http_req & req, http_res & res);
bool del_drop_collection(http_req& req, http_res& res);
bool get_collection_summary(http_req & req, http_res & res);
bool get_collection_summary(http_req& req, http_res& res);
// Documents
bool get_search(http_req & req, http_res & res);
bool get_search(http_req& req, http_res& res);
bool get_export_documents(http_req & req, http_res & res);
bool get_export_documents(http_req& req, http_res& res);
bool post_add_document(http_req & req, http_res & res);
bool post_add_document(http_req& req, http_res& res);
bool post_import_documents(http_req & req, http_res & res);
bool post_import_documents(http_req& req, http_res& res);
bool get_fetch_document(http_req & req, http_res & res);
bool get_fetch_document(http_req& req, http_res& res);
bool del_remove_document(http_req & req, http_res & res);
bool del_remove_document(http_req& req, http_res& res);
// Alias
bool get_alias(http_req & req, http_res & res);
bool get_alias(http_req& req, http_res& res);
bool get_aliases(http_req & req, http_res & res);
bool get_aliases(http_req& req, http_res& res);
bool put_upsert_alias(http_req & req, http_res & res);
bool put_upsert_alias(http_req& req, http_res& res);
bool del_alias(http_req & req, http_res & res);
bool del_alias(http_req& req, http_res& res);
// Overrides
bool get_overrides(http_req & req, http_res & res);
bool get_overrides(http_req& req, http_res& res);
bool get_override(http_req & req, http_res & res);
bool get_override(http_req& req, http_res& res);
bool put_override(http_req & req, http_res & res);
bool put_override(http_req& req, http_res& res);
bool del_override(http_req & req, http_res & res);
bool del_override(http_req& req, http_res& res);
// Keys
bool get_keys(http_req & req, http_res & res);
bool get_keys(http_req& req, http_res& res);
bool post_create_key(http_req & req, http_res & res);
bool post_create_key(http_req& req, http_res& res);
bool get_key(http_req & req, http_res & res);
bool get_key(http_req& req, http_res& res);
bool del_key(http_req & req, http_res & res);
bool del_key(http_req& req, http_res& res);
// Metrics
bool get_debug(http_req & req, http_res & res);
bool get_debug(http_req& req, http_res& res);
bool get_health(http_req & req, http_res & res);
bool get_health(http_req& req, http_res& res);
bool get_metrics_json(http_req & req, http_res & res);
bool get_metrics_json(http_req& req, http_res& res);
// Misc helpers

View File

@ -119,7 +119,6 @@ struct http_req {
uint64_t route_hash;
std::map<std::string, std::string> params;
bool streaming;
std::string stream_state;
size_t chunk_length;
@ -133,7 +132,7 @@ struct http_req {
http_req(h2o_req_t* _req, const std::string & http_method, uint64_t route_hash,
const std::map<std::string, std::string> & params, const std::string& body):
_req(_req), http_method(http_method), route_hash(route_hash), params(params),
streaming(false), stream_state("NON_STREAMING"), body(body) {
stream_state("NON_STREAMING"), body(body) {
}
@ -150,7 +149,6 @@ struct http_req {
}
metadata = content.count("metadata") != 0 ? content["metadata"] : "";
streaming = content.count("streaming") != 0 ? content["streaming"] : "";
stream_state = content.count("stream_state") != 0 ? content["stream_state"] : "";
_req = nullptr;
@ -160,7 +158,6 @@ struct http_req {
nlohmann::json content;
content["route_hash"] = route_hash;
content["params"] = params;
content["streaming"] = streaming;
content["stream_state"] = stream_state;
content["body"] = body;
content["metadata"] = metadata;
@ -187,6 +184,10 @@ struct route_path {
http_method(httpMethod), path_parts(pathParts), handler(handler),
async_req(async_req), async_res(async_res) {
action = _get_action();
if(async_req) {
// once a request is async, response also needs to be async
this->async_res = true;
}
}
inline bool operator< (const route_path& rhs) const {
@ -252,13 +253,6 @@ struct route_path {
}
};
struct h2o_custom_generator_t {
h2o_generator_t super;
bool (*req_handler)(http_req& req, http_res& res);
http_req* req;
http_res* res;
};
struct h2o_custom_res_message_t {
h2o_multithread_message_t super;
std::map<std::string, bool (*)(void*)> *message_handlers;

View File

@ -34,16 +34,12 @@ struct h2o_custom_req_handler_t {
HttpServer* http_server;
};
struct async_req_ctx_t {
struct h2o_custom_generator_t {
h2o_generator_t super;
h2o_custom_req_handler_t* h2o_handler;
route_path *rpath;
http_req* request;
http_res* response;
h2o_custom_req_handler_t* handler;
route_path *rpath;
async_req_ctx_t(http_req *request, http_res* response, h2o_custom_req_handler_t *handler, route_path *rpath) :
request(request), response(response), handler(handler), rpath(rpath) {
}
};
class HttpServer {
@ -136,6 +132,8 @@ public:
void send_response(http_req* request, http_res* response);
static void stream_response(http_req& request, http_res& response);
uint64_t find_route(const std::vector<std::string> & path_parts, const std::string & http_method,
route_path** found_rpath);

View File

@ -16,14 +16,11 @@ extern HttpServer* server;
void catch_interrupt(int sig);
bool directory_exists(const std::string & dir_path);
bool directory_exists(const std::string& dir_path);
void stream_response(bool (*req_handler)(http_req* req, http_res* res, void* data),
http_req & request, http_res & response, void* data);
void init_cmdline_options(cmdline::parser& options, int argc, char **argv);
void init_cmdline_options(cmdline::parser & options, int argc, char **argv);
int init_logger(Config& config, const std::string& server_version);
int init_logger(Config & config, const std::string & server_version);
int run_server(const Config & config, const std::string & version,
int run_server(const Config& config, const std::string& version,
void (*master_server_routes)());

View File

@ -185,11 +185,7 @@ Option<nlohmann::json> Collection::add(const std::string & json_str) {
return Option<nlohmann::json>(document);
}
Option<nlohmann::json> Collection::add_many(std::vector<std::string>& json_lines) {
if(json_lines.empty()) {
return Option<nlohmann::json>(400, "The request body was empty. So, no records were imported.");
}
nlohmann::json Collection::add_many(std::vector<std::string>& json_lines) {
LOG(INFO) << "Memory ratio. Max = " << max_memory_ratio << ", Used = " << SystemMetrics::used_memory_ratio();
std::vector<std::vector<index_record>> iter_batch;
@ -249,7 +245,7 @@ Option<nlohmann::json> Collection::add_many(std::vector<std::string>& json_lines
resp_summary["num_imported"] = num_indexed;
resp_summary["success"] = (num_indexed == json_lines.size());
return Option<nlohmann::json>(resp_summary);
return resp_summary;
}
bool Collection::is_exceeding_memory_threshold() const {

View File

@ -534,12 +534,14 @@ bool collection_export_handler(http_req* req, http_res* res, void* data) {
}
bool get_export_documents(http_req & req, http_res & res) {
// NOTE: this is a streaming response end-point so this handler will be called multiple times
CollectionManager & collectionManager = CollectionManager::get_instance();
Collection* collection = collectionManager.get_collection(req.params["collection"]);
if(collection == nullptr) {
req.stream_state = "NON_STREAMING";
res.set_404();
server->send_message(SEND_RESPONSE_MSG, new request_response{&req, &res});
server->stream_response(req, res);
return false;
}
@ -550,50 +552,23 @@ bool get_export_documents(http_req & req, http_res & res) {
res.content_type_header = "application/octet-stream";
res.status_code = 200;
stream_response(collection_export_handler, req, res, (void *) it);
//stream_response(collection_export_handler, req, res, (void *) it);
return true;
}
bool post_add_document(http_req & req, http_res & res) {
bool post_import_documents(http_req& req, http_res& res) {
CollectionManager & collectionManager = CollectionManager::get_instance();
Collection* collection = collectionManager.get_collection(req.params["collection"]);
if(collection == nullptr) {
req.stream_state = "NON_STREAMING";
res.set_404();
server->stream_response(req, res);
return false;
}
Option<nlohmann::json> inserted_doc_op = collection->add(req.body);
if(!inserted_doc_op.ok()) {
res.set(inserted_doc_op.code(), inserted_doc_op.error());
return false;
}
res.set_201(inserted_doc_op.get().dump());
return true;
}
bool collection_import_handler(http_req* request, http_res* response, void* data) {
if(request->_req->proceed_req) {
int is_end_stream = (response->final) ? 1: 0;
size_t written = request->chunk_length;
request->_req->proceed_req(request->_req, written, is_end_stream);
}
return true;
}
bool post_import_documents(http_req & req, http_res & res) {
CollectionManager & collectionManager = CollectionManager::get_instance();
Collection* collection = collectionManager.get_collection(req.params["collection"]);
if(collection == nullptr) {
res.set_404();
return false;
}
//LOG(INFO) << "req.body.size: " << req.body.size();
LOG(INFO) << "Import, req.body.size: " << req.body.size() << ", state: " << req.stream_state;
std::vector<std::string> json_lines;
StringUtils::split(req.body, json_lines, "\n");
@ -618,16 +593,10 @@ bool post_import_documents(http_req & req, http_res & res) {
return true;
}
const Option<nlohmann::json>& res_op = collection->add_many(json_lines);
if(!res_op.ok()) {
// FIXME: cannot set new status code midst of an ongoing import response
res.set(res_op.code(), res_op.error());
return false;
}
nlohmann::json json_res = collection->add_many(json_lines);
std::stringstream ss;
const std::string& import_summary_json = res_op.get().dump();
const std::string& import_summary_json = json_res.dump();
ss << import_summary_json << "\n";
@ -642,10 +611,27 @@ bool post_import_documents(http_req & req, http_res & res) {
res.content_type_header = "text/plain; charset=utf8";
res.set_200(ss.str());
if(req._req) {
stream_response(collection_import_handler, req, res, nullptr);
server->stream_response(req, res);
return true;
}
bool post_add_document(http_req & req, http_res & res) {
CollectionManager & collectionManager = CollectionManager::get_instance();
Collection* collection = collectionManager.get_collection(req.params["collection"]);
if(collection == nullptr) {
res.set_404();
return false;
}
Option<nlohmann::json> inserted_doc_op = collection->add(req.body);
if(!inserted_doc_op.ok()) {
res.set(inserted_doc_op.code(), inserted_doc_op.error());
return false;
}
res.set_201(inserted_doc_op.get().dump());
return true;
}

View File

@ -269,28 +269,16 @@ uint64_t HttpServer::find_route(const std::vector<std::string> & path_parts, con
}
void HttpServer::on_res_generator_dispose(void *self) {
//LOG(INFO) << "on_res_generator_dispose fires";
LOG(INFO) << "on_res_generator_dispose fires";
h2o_custom_generator_t* res_generator = static_cast<h2o_custom_generator_t*>(self);
// res_generator itself is reference counted, so we only free members
delete res_generator->req;
delete res_generator->res;
}
void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) {
//LOG(INFO) << "response_proceed called";
h2o_custom_generator_t* custom_generator = reinterpret_cast<h2o_custom_generator_t*>(generator);
custom_generator->req_handler(*custom_generator->req, *custom_generator->res);
if(custom_generator->req->_req->proceed_req) {
int is_end_stream = (custom_generator->res->final) ? 1: 0;
size_t written = custom_generator->req->chunk_length;
custom_generator->req->_req->proceed_req(custom_generator->req->_req, written, is_end_stream);
}
// res_generator itself is reference counted, so we only delete the members
delete res_generator->request;
delete res_generator->response;
}
int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) {
h2o_custom_req_handler_t *h2o_handler = (h2o_custom_req_handler_t *)_h2o_handler;
h2o_custom_req_handler_t* h2o_handler = (h2o_custom_req_handler_t *)_h2o_handler;
const std::string & http_method = std::string(req->method.base, req->method.len);
const std::string & path = std::string(req->path.base, req->path.len);
@ -387,17 +375,19 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) {
}
const std::string & body = std::string(req->entity.base, req->entity.len);
http_req* request = new http_req(req, rpath->http_method, route_hash, query_map, body);
http_res* response = new http_res();
// add custom generator with a dipose function for cleaning up resources
h2o_custom_generator_t* custom_generator = static_cast<h2o_custom_generator_t*>(
h2o_mem_alloc_shared(&req->pool, sizeof(*custom_generator), on_res_generator_dispose)
h2o_mem_alloc_shared(&req->pool, sizeof(*custom_generator), on_res_generator_dispose)
);
custom_generator->super = h2o_generator_t {response_proceed, nullptr};
custom_generator->req = request;
custom_generator->res = response;
custom_generator->req_handler = rpath->handler;
custom_generator->request = request;
custom_generator->response = response;
custom_generator->rpath = rpath;
custom_generator->h2o_handler = h2o_handler;
response->generator = &custom_generator->super;
// routes match and is an authenticated request
@ -412,14 +402,12 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) {
return process_request(request, response, rpath, h2o_handler);
} else {
// Only partial request body is available.
// If async_req is true, the request handler function will be invoked multiple times, for each chunk
request->streaming = rpath->async_req;
async_req_ctx_t* async_req_ctx = new async_req_ctx_t(request, response, h2o_handler, rpath);
// If rpath->async_req is true, the request handler function will be invoked multiple times, for each chunk
//LOG(INFO) << "Partial request body length: " << req->entity.len;
req->write_req.cb = async_req_cb;
req->write_req.ctx = async_req_ctx;
req->write_req.ctx = custom_generator;
int is_end_entity = 0;
req->proceed_req(req, req->entity.len, is_end_entity);
}
@ -428,25 +416,27 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) {
}
int HttpServer::async_req_cb(void *ctx, h2o_iovec_t chunk, int is_end_stream) {
async_req_ctx_t* req_ctx = static_cast<async_req_ctx_t*>(ctx);
h2o_custom_generator_t* custom_generator = static_cast<h2o_custom_generator_t*>(ctx);
http_req* request = req_ctx->request;
http_res* response = req_ctx->response;
http_req* request = custom_generator->request;
http_res* response = custom_generator->response;
LOG(INFO) << "chunk.len: " << chunk.len << ", is_end_stream: " << is_end_stream;
//LOG(INFO) << "async_req_cb, chunk.len: " << chunk.len << ", is_end_stream: " << is_end_stream;
request->chunk_length = chunk.len;
std::string chunk_str(chunk.base, chunk.len);
request->body += chunk_str;
if(request->streaming || is_end_stream) {
// Handler should be invoked for every chunk for streaming requests
bool async_req = custom_generator->rpath->async_req;
if(async_req || is_end_stream) {
// Handler should be invoked for every chunk for async streaming requests
// For a non streaming request, buffer body and invoke only at the end
// default value for response is NON_STREAMING
// for streaming requests, we have to set: START, IN_PROGRESS or END
if(request->streaming) {
if(async_req) {
if(is_end_stream) {
request->stream_state = "END";
} else if(request->stream_state == "NON_STREAMING") {
@ -457,10 +447,10 @@ int HttpServer::async_req_cb(void *ctx, h2o_iovec_t chunk, int is_end_stream) {
}
}
process_request(request, response, req_ctx->rpath, req_ctx->handler);
process_request(request, response, custom_generator->rpath, custom_generator->h2o_handler);
}
if(!is_end_stream && !request->streaming) {
if(!is_end_stream && !async_req) {
// streaming requests will call proceed_req in an async fashion
// so we have to handle this only for non streaming
size_t written = chunk.len;
@ -468,11 +458,6 @@ int HttpServer::async_req_cb(void *ctx, h2o_iovec_t chunk, int is_end_stream) {
request->_req->proceed_req(request->_req, written, stream_ended);
}
if (is_end_stream) {
// deletes only the container -- individual requests and response are deleted by response handler
delete req_ctx;
}
return 0;
}
@ -526,6 +511,50 @@ void HttpServer::send_message(const std::string & type, void* data) {
message_dispatcher->send_message(type, data);
}
void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) {
//LOG(INFO) << "response_proceed called";
h2o_custom_generator_t* custom_generator = reinterpret_cast<h2o_custom_generator_t*>(generator);
// if the request itself is async, we will proceed the request to fetch input content
// otherwise, call the handler since it will be the handler that will be producing content
if (custom_generator->rpath->async_req &&
custom_generator->request->_req && custom_generator->request->_req->proceed_req) {
int is_end_stream = (custom_generator->response->final) ? 1 : 0;
size_t written = custom_generator->request->chunk_length;
custom_generator->request->_req->proceed_req(custom_generator->request->_req, written, is_end_stream);
} else {
custom_generator->rpath->handler(*custom_generator->request, *custom_generator->response);
}
}
void HttpServer::stream_response(http_req& request, http_res& response) {
if(!request._req) {
// underlying request is no longer available
return ;
}
h2o_req_t* req = request._req;
h2o_custom_generator_t *custom_generator = reinterpret_cast<h2o_custom_generator_t *>(response.generator);
if (request.stream_state == "START" || request.stream_state == "NON_STREAMING") {
req->res.status = response.status_code;
req->res.reason = http_res::get_status_reason(response.status_code);
h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL,
response.content_type_header.c_str(),
response.content_type_header.size());
h2o_start_response(req, &custom_generator->super);
}
custom_generator->response->final = (request.stream_state == "END" || request.stream_state == "NON_STREAMING");
h2o_iovec_t body = h2o_strdup(&req->pool, response.body.c_str(), SIZE_MAX);
const h2o_send_state_t state = custom_generator->response->final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS;
h2o_send(req, &body, 1, state);
}
void HttpServer::set_auth_handler(bool (*handler)(std::map<std::string, std::string>& params, const route_path& rpath,
const std::string& auth_key)) {
auth_handler = handler;

View File

@ -68,33 +68,6 @@ Option<std::string> fetch_file_contents(const std::string & file_path) {
return Option<std::string>(content);
}
void stream_response(bool (*req_handler)(http_req* req, http_res* res, void* data),
http_req & request, http_res & response, void* data) {
h2o_req_t* req = request._req;
h2o_custom_generator_t *custom_generator = reinterpret_cast<h2o_custom_generator_t *>(response.generator);
if (request.stream_state == "START" || request.stream_state == "NON_STREAMING") {
req->res.status = response.status_code;
req->res.reason = http_res::get_status_reason(response.status_code);
h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL,
response.content_type_header.c_str(),
response.content_type_header.size());
h2o_start_response(req, &custom_generator->super);
}
custom_generator->res->final = (request.stream_state == "END" || request.stream_state == "NON_STREAMING");
h2o_iovec_t body = h2o_strdup(&req->pool, response.body.c_str(), SIZE_MAX);
const h2o_send_state_t state = custom_generator->res->final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS;
h2o_send(req, &body, 1, state);
if(state == H2O_SEND_STATE_FINAL) {
delete custom_generator->req;
delete custom_generator->res;
delete custom_generator;
}
}
void init_cmdline_options(cmdline::parser & options, int argc, char **argv) {
options.set_program_name("./typesense-server");

View File

@ -1289,9 +1289,7 @@ TEST_F(CollectionTest, ImportDocuments) {
// try importing records
Option<nlohmann::json> import_res = coll_mul_fields->add_many(import_records);
ASSERT_TRUE(import_res.ok());
nlohmann::json import_response = import_res.get();
nlohmann::json import_response = coll_mul_fields->add_many(import_records);
ASSERT_TRUE(import_response["success"].get<bool>());
ASSERT_EQ(18, import_response["num_imported"].get<int>());
@ -1314,11 +1312,11 @@ TEST_F(CollectionTest, ImportDocuments) {
ASSERT_STREQ(id.c_str(), result_id.c_str());
}
// verify that empty import is caught gracefully
// verify that empty import is handled gracefully
std::vector<std::string> empty_records;
import_res = coll_mul_fields->add_many(empty_records);
ASSERT_FALSE(import_res.ok());
ASSERT_STREQ("The request body was empty. So, no records were imported.", import_res.error().c_str());
import_response = coll_mul_fields->add_many(empty_records);
ASSERT_TRUE(import_response["success"].get<bool>());
ASSERT_EQ(0, import_response["num_imported"].get<int>());
// verify that only bad records are rejected, rest must be imported (records 2 and 4 are bad)
std::vector<std::string> more_records = {"{\"id\": \"id1\", \"title\": \"Test1\", \"starring\": \"Rand Fish\", \"points\": 12, "
@ -1330,10 +1328,7 @@ TEST_F(CollectionTest, ImportDocuments) {
"{\"title\": \"Test4\", \"points\": 55, "
"\"cast\": [\"Tom Skerritt\"] }"};
import_res = coll_mul_fields->add_many(more_records);
ASSERT_TRUE(import_res.ok());
import_response = import_res.get();
import_response = coll_mul_fields->add_many(more_records);
ASSERT_FALSE(import_response["success"].get<bool>());
ASSERT_EQ(2, import_response["num_imported"].get<int>());
@ -1356,10 +1351,7 @@ TEST_F(CollectionTest, ImportDocuments) {
"{\"id\": \"id2\", \"title\": \"Test1\", \"starring\": \"Rand Fish\", \"points\": 12, "
"\"cast\": [\"Tom Skerritt\"] }"};
import_res = coll_mul_fields->add_many(more_records);
ASSERT_TRUE(import_res.ok());
import_response = import_res.get();
import_response = coll_mul_fields->add_many(more_records);
ASSERT_FALSE(import_response["success"].get<bool>());
ASSERT_EQ(1, import_response["num_imported"].get<int>());
@ -1374,10 +1366,7 @@ TEST_F(CollectionTest, ImportDocuments) {
// handle bad import json
more_records = {"[]"};
import_res = coll_mul_fields->add_many(more_records);
ASSERT_TRUE(import_res.ok());
import_response = import_res.get();
import_response = coll_mul_fields->add_many(more_records);
ASSERT_FALSE(import_response["success"].get<bool>());
ASSERT_EQ(0, import_response["num_imported"].get<int>());