diff --git a/include/collection.h b/include/collection.h index 92355bc7..665f8aaf 100644 --- a/include/collection.h +++ b/include/collection.h @@ -223,7 +223,7 @@ public: Option add(const std::string & json_str); - Option add_many(std::vector& json_lines); + nlohmann::json add_many(std::vector& json_lines); Option search(const std::string & query, const std::vector & search_fields, const std::string & simple_filter_query, const std::vector & facet_fields, diff --git a/include/core_api.h b/include/core_api.h index 59e2d1dd..ee9a50be 100644 --- a/include/core_api.h +++ b/include/core_api.h @@ -7,65 +7,65 @@ bool handle_authentication(std::map& req_params, const // Collections -bool get_collections(http_req & req, http_res & res); +bool get_collections(http_req& req, http_res& res); -bool post_create_collection(http_req & req, http_res & res); +bool post_create_collection(http_req& req, http_res& res); -bool del_drop_collection(http_req & req, http_res & res); +bool del_drop_collection(http_req& req, http_res& res); -bool get_collection_summary(http_req & req, http_res & res); +bool get_collection_summary(http_req& req, http_res& res); // Documents -bool get_search(http_req & req, http_res & res); +bool get_search(http_req& req, http_res& res); -bool get_export_documents(http_req & req, http_res & res); +bool get_export_documents(http_req& req, http_res& res); -bool post_add_document(http_req & req, http_res & res); +bool post_add_document(http_req& req, http_res& res); -bool post_import_documents(http_req & req, http_res & res); +bool post_import_documents(http_req& req, http_res& res); -bool get_fetch_document(http_req & req, http_res & res); +bool get_fetch_document(http_req& req, http_res& res); -bool del_remove_document(http_req & req, http_res & res); +bool del_remove_document(http_req& req, http_res& res); // Alias -bool get_alias(http_req & req, http_res & res); +bool get_alias(http_req& req, http_res& res); -bool get_aliases(http_req & req, http_res & res); +bool get_aliases(http_req& req, http_res& res); -bool put_upsert_alias(http_req & req, http_res & res); +bool put_upsert_alias(http_req& req, http_res& res); -bool del_alias(http_req & req, http_res & res); +bool del_alias(http_req& req, http_res& res); // Overrides -bool get_overrides(http_req & req, http_res & res); +bool get_overrides(http_req& req, http_res& res); -bool get_override(http_req & req, http_res & res); +bool get_override(http_req& req, http_res& res); -bool put_override(http_req & req, http_res & res); +bool put_override(http_req& req, http_res& res); -bool del_override(http_req & req, http_res & res); +bool del_override(http_req& req, http_res& res); // Keys -bool get_keys(http_req & req, http_res & res); +bool get_keys(http_req& req, http_res& res); -bool post_create_key(http_req & req, http_res & res); +bool post_create_key(http_req& req, http_res& res); -bool get_key(http_req & req, http_res & res); +bool get_key(http_req& req, http_res& res); -bool del_key(http_req & req, http_res & res); +bool del_key(http_req& req, http_res& res); // Metrics -bool get_debug(http_req & req, http_res & res); +bool get_debug(http_req& req, http_res& res); -bool get_health(http_req & req, http_res & res); +bool get_health(http_req& req, http_res& res); -bool get_metrics_json(http_req & req, http_res & res); +bool get_metrics_json(http_req& req, http_res& res); // Misc helpers diff --git a/include/http_data.h b/include/http_data.h index ed07bf52..0fa8cb15 100644 --- a/include/http_data.h +++ b/include/http_data.h @@ -119,7 +119,6 @@ struct http_req { uint64_t route_hash; std::map params; - bool streaming; std::string stream_state; size_t chunk_length; @@ -133,7 +132,7 @@ struct http_req { http_req(h2o_req_t* _req, const std::string & http_method, uint64_t route_hash, const std::map & params, const std::string& body): _req(_req), http_method(http_method), route_hash(route_hash), params(params), - streaming(false), stream_state("NON_STREAMING"), body(body) { + stream_state("NON_STREAMING"), body(body) { } @@ -150,7 +149,6 @@ struct http_req { } metadata = content.count("metadata") != 0 ? content["metadata"] : ""; - streaming = content.count("streaming") != 0 ? content["streaming"] : ""; stream_state = content.count("stream_state") != 0 ? content["stream_state"] : ""; _req = nullptr; @@ -160,7 +158,6 @@ struct http_req { nlohmann::json content; content["route_hash"] = route_hash; content["params"] = params; - content["streaming"] = streaming; content["stream_state"] = stream_state; content["body"] = body; content["metadata"] = metadata; @@ -187,6 +184,10 @@ struct route_path { http_method(httpMethod), path_parts(pathParts), handler(handler), async_req(async_req), async_res(async_res) { action = _get_action(); + if(async_req) { + // once a request is async, response also needs to be async + this->async_res = true; + } } inline bool operator< (const route_path& rhs) const { @@ -252,13 +253,6 @@ struct route_path { } }; -struct h2o_custom_generator_t { - h2o_generator_t super; - bool (*req_handler)(http_req& req, http_res& res); - http_req* req; - http_res* res; -}; - struct h2o_custom_res_message_t { h2o_multithread_message_t super; std::map *message_handlers; diff --git a/include/http_server.h b/include/http_server.h index 4c4322fa..aedbbf34 100644 --- a/include/http_server.h +++ b/include/http_server.h @@ -34,16 +34,12 @@ struct h2o_custom_req_handler_t { HttpServer* http_server; }; -struct async_req_ctx_t { +struct h2o_custom_generator_t { + h2o_generator_t super; + h2o_custom_req_handler_t* h2o_handler; + route_path *rpath; http_req* request; http_res* response; - h2o_custom_req_handler_t* handler; - route_path *rpath; - - async_req_ctx_t(http_req *request, http_res* response, h2o_custom_req_handler_t *handler, route_path *rpath) : - request(request), response(response), handler(handler), rpath(rpath) { - - } }; class HttpServer { @@ -136,6 +132,8 @@ public: void send_response(http_req* request, http_res* response); + static void stream_response(http_req& request, http_res& response); + uint64_t find_route(const std::vector & path_parts, const std::string & http_method, route_path** found_rpath); diff --git a/include/typesense_server_utils.h b/include/typesense_server_utils.h index 4ccca45d..6feb4572 100644 --- a/include/typesense_server_utils.h +++ b/include/typesense_server_utils.h @@ -16,14 +16,11 @@ extern HttpServer* server; void catch_interrupt(int sig); -bool directory_exists(const std::string & dir_path); +bool directory_exists(const std::string& dir_path); -void stream_response(bool (*req_handler)(http_req* req, http_res* res, void* data), - http_req & request, http_res & response, void* data); +void init_cmdline_options(cmdline::parser& options, int argc, char **argv); -void init_cmdline_options(cmdline::parser & options, int argc, char **argv); +int init_logger(Config& config, const std::string& server_version); -int init_logger(Config & config, const std::string & server_version); - -int run_server(const Config & config, const std::string & version, +int run_server(const Config& config, const std::string& version, void (*master_server_routes)()); diff --git a/src/collection.cpp b/src/collection.cpp index a3955d00..efedf3a3 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -185,11 +185,7 @@ Option Collection::add(const std::string & json_str) { return Option(document); } -Option Collection::add_many(std::vector& json_lines) { - if(json_lines.empty()) { - return Option(400, "The request body was empty. So, no records were imported."); - } - +nlohmann::json Collection::add_many(std::vector& json_lines) { LOG(INFO) << "Memory ratio. Max = " << max_memory_ratio << ", Used = " << SystemMetrics::used_memory_ratio(); std::vector> iter_batch; @@ -249,7 +245,7 @@ Option Collection::add_many(std::vector& json_lines resp_summary["num_imported"] = num_indexed; resp_summary["success"] = (num_indexed == json_lines.size()); - return Option(resp_summary); + return resp_summary; } bool Collection::is_exceeding_memory_threshold() const { diff --git a/src/core_api.cpp b/src/core_api.cpp index 1948b358..9dedb747 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -534,12 +534,14 @@ bool collection_export_handler(http_req* req, http_res* res, void* data) { } bool get_export_documents(http_req & req, http_res & res) { + // NOTE: this is a streaming response end-point so this handler will be called multiple times CollectionManager & collectionManager = CollectionManager::get_instance(); Collection* collection = collectionManager.get_collection(req.params["collection"]); if(collection == nullptr) { + req.stream_state = "NON_STREAMING"; res.set_404(); - server->send_message(SEND_RESPONSE_MSG, new request_response{&req, &res}); + server->stream_response(req, res); return false; } @@ -550,50 +552,23 @@ bool get_export_documents(http_req & req, http_res & res) { res.content_type_header = "application/octet-stream"; res.status_code = 200; - stream_response(collection_export_handler, req, res, (void *) it); + + //stream_response(collection_export_handler, req, res, (void *) it); return true; } -bool post_add_document(http_req & req, http_res & res) { +bool post_import_documents(http_req& req, http_res& res) { CollectionManager & collectionManager = CollectionManager::get_instance(); Collection* collection = collectionManager.get_collection(req.params["collection"]); if(collection == nullptr) { + req.stream_state = "NON_STREAMING"; res.set_404(); + server->stream_response(req, res); return false; } - Option inserted_doc_op = collection->add(req.body); - - if(!inserted_doc_op.ok()) { - res.set(inserted_doc_op.code(), inserted_doc_op.error()); - return false; - } - - res.set_201(inserted_doc_op.get().dump()); - return true; -} - -bool collection_import_handler(http_req* request, http_res* response, void* data) { - if(request->_req->proceed_req) { - int is_end_stream = (response->final) ? 1: 0; - size_t written = request->chunk_length; - request->_req->proceed_req(request->_req, written, is_end_stream); - } - - return true; -} - -bool post_import_documents(http_req & req, http_res & res) { - CollectionManager & collectionManager = CollectionManager::get_instance(); - Collection* collection = collectionManager.get_collection(req.params["collection"]); - - if(collection == nullptr) { - res.set_404(); - return false; - } - - //LOG(INFO) << "req.body.size: " << req.body.size(); + LOG(INFO) << "Import, req.body.size: " << req.body.size() << ", state: " << req.stream_state; std::vector json_lines; StringUtils::split(req.body, json_lines, "\n"); @@ -618,16 +593,10 @@ bool post_import_documents(http_req & req, http_res & res) { return true; } - const Option& res_op = collection->add_many(json_lines); - - if(!res_op.ok()) { - // FIXME: cannot set new status code midst of an ongoing import response - res.set(res_op.code(), res_op.error()); - return false; - } + nlohmann::json json_res = collection->add_many(json_lines); std::stringstream ss; - const std::string& import_summary_json = res_op.get().dump(); + const std::string& import_summary_json = json_res.dump(); ss << import_summary_json << "\n"; @@ -642,10 +611,27 @@ bool post_import_documents(http_req & req, http_res & res) { res.content_type_header = "text/plain; charset=utf8"; res.set_200(ss.str()); - if(req._req) { - stream_response(collection_import_handler, req, res, nullptr); + server->stream_response(req, res); + return true; +} + +bool post_add_document(http_req & req, http_res & res) { + CollectionManager & collectionManager = CollectionManager::get_instance(); + Collection* collection = collectionManager.get_collection(req.params["collection"]); + + if(collection == nullptr) { + res.set_404(); + return false; } + Option inserted_doc_op = collection->add(req.body); + + if(!inserted_doc_op.ok()) { + res.set(inserted_doc_op.code(), inserted_doc_op.error()); + return false; + } + + res.set_201(inserted_doc_op.get().dump()); return true; } diff --git a/src/http_server.cpp b/src/http_server.cpp index 798ef38b..c9febb85 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -269,28 +269,16 @@ uint64_t HttpServer::find_route(const std::vector & path_parts, con } void HttpServer::on_res_generator_dispose(void *self) { - //LOG(INFO) << "on_res_generator_dispose fires"; + LOG(INFO) << "on_res_generator_dispose fires"; h2o_custom_generator_t* res_generator = static_cast(self); - // res_generator itself is reference counted, so we only free members - delete res_generator->req; - delete res_generator->res; -} - -void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) { - //LOG(INFO) << "response_proceed called"; - h2o_custom_generator_t* custom_generator = reinterpret_cast(generator); - custom_generator->req_handler(*custom_generator->req, *custom_generator->res); - - if(custom_generator->req->_req->proceed_req) { - int is_end_stream = (custom_generator->res->final) ? 1: 0; - size_t written = custom_generator->req->chunk_length; - custom_generator->req->_req->proceed_req(custom_generator->req->_req, written, is_end_stream); - } + // res_generator itself is reference counted, so we only delete the members + delete res_generator->request; + delete res_generator->response; } int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) { - h2o_custom_req_handler_t *h2o_handler = (h2o_custom_req_handler_t *)_h2o_handler; + h2o_custom_req_handler_t* h2o_handler = (h2o_custom_req_handler_t *)_h2o_handler; const std::string & http_method = std::string(req->method.base, req->method.len); const std::string & path = std::string(req->path.base, req->path.len); @@ -387,17 +375,19 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) { } const std::string & body = std::string(req->entity.base, req->entity.len); + http_req* request = new http_req(req, rpath->http_method, route_hash, query_map, body); http_res* response = new http_res(); // add custom generator with a dipose function for cleaning up resources h2o_custom_generator_t* custom_generator = static_cast( - h2o_mem_alloc_shared(&req->pool, sizeof(*custom_generator), on_res_generator_dispose) + h2o_mem_alloc_shared(&req->pool, sizeof(*custom_generator), on_res_generator_dispose) ); custom_generator->super = h2o_generator_t {response_proceed, nullptr}; - custom_generator->req = request; - custom_generator->res = response; - custom_generator->req_handler = rpath->handler; + custom_generator->request = request; + custom_generator->response = response; + custom_generator->rpath = rpath; + custom_generator->h2o_handler = h2o_handler; response->generator = &custom_generator->super; // routes match and is an authenticated request @@ -412,14 +402,12 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) { return process_request(request, response, rpath, h2o_handler); } else { // Only partial request body is available. - // If async_req is true, the request handler function will be invoked multiple times, for each chunk - request->streaming = rpath->async_req; - async_req_ctx_t* async_req_ctx = new async_req_ctx_t(request, response, h2o_handler, rpath); + // If rpath->async_req is true, the request handler function will be invoked multiple times, for each chunk //LOG(INFO) << "Partial request body length: " << req->entity.len; req->write_req.cb = async_req_cb; - req->write_req.ctx = async_req_ctx; + req->write_req.ctx = custom_generator; int is_end_entity = 0; req->proceed_req(req, req->entity.len, is_end_entity); } @@ -428,25 +416,27 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) { } int HttpServer::async_req_cb(void *ctx, h2o_iovec_t chunk, int is_end_stream) { - async_req_ctx_t* req_ctx = static_cast(ctx); + h2o_custom_generator_t* custom_generator = static_cast(ctx); - http_req* request = req_ctx->request; - http_res* response = req_ctx->response; + http_req* request = custom_generator->request; + http_res* response = custom_generator->response; - LOG(INFO) << "chunk.len: " << chunk.len << ", is_end_stream: " << is_end_stream; + //LOG(INFO) << "async_req_cb, chunk.len: " << chunk.len << ", is_end_stream: " << is_end_stream; request->chunk_length = chunk.len; std::string chunk_str(chunk.base, chunk.len); request->body += chunk_str; - if(request->streaming || is_end_stream) { - // Handler should be invoked for every chunk for streaming requests + bool async_req = custom_generator->rpath->async_req; + + if(async_req || is_end_stream) { + // Handler should be invoked for every chunk for async streaming requests // For a non streaming request, buffer body and invoke only at the end // default value for response is NON_STREAMING // for streaming requests, we have to set: START, IN_PROGRESS or END - if(request->streaming) { + if(async_req) { if(is_end_stream) { request->stream_state = "END"; } else if(request->stream_state == "NON_STREAMING") { @@ -457,10 +447,10 @@ int HttpServer::async_req_cb(void *ctx, h2o_iovec_t chunk, int is_end_stream) { } } - process_request(request, response, req_ctx->rpath, req_ctx->handler); + process_request(request, response, custom_generator->rpath, custom_generator->h2o_handler); } - if(!is_end_stream && !request->streaming) { + if(!is_end_stream && !async_req) { // streaming requests will call proceed_req in an async fashion // so we have to handle this only for non streaming size_t written = chunk.len; @@ -468,11 +458,6 @@ int HttpServer::async_req_cb(void *ctx, h2o_iovec_t chunk, int is_end_stream) { request->_req->proceed_req(request->_req, written, stream_ended); } - if (is_end_stream) { - // deletes only the container -- individual requests and response are deleted by response handler - delete req_ctx; - } - return 0; } @@ -526,6 +511,50 @@ void HttpServer::send_message(const std::string & type, void* data) { message_dispatcher->send_message(type, data); } +void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) { + //LOG(INFO) << "response_proceed called"; + h2o_custom_generator_t* custom_generator = reinterpret_cast(generator); + + // if the request itself is async, we will proceed the request to fetch input content + // otherwise, call the handler since it will be the handler that will be producing content + + if (custom_generator->rpath->async_req && + custom_generator->request->_req && custom_generator->request->_req->proceed_req) { + + int is_end_stream = (custom_generator->response->final) ? 1 : 0; + size_t written = custom_generator->request->chunk_length; + custom_generator->request->_req->proceed_req(custom_generator->request->_req, written, is_end_stream); + + } else { + custom_generator->rpath->handler(*custom_generator->request, *custom_generator->response); + } +} + +void HttpServer::stream_response(http_req& request, http_res& response) { + if(!request._req) { + // underlying request is no longer available + return ; + } + + h2o_req_t* req = request._req; + h2o_custom_generator_t *custom_generator = reinterpret_cast(response.generator); + + if (request.stream_state == "START" || request.stream_state == "NON_STREAMING") { + req->res.status = response.status_code; + req->res.reason = http_res::get_status_reason(response.status_code); + h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL, + response.content_type_header.c_str(), + response.content_type_header.size()); + h2o_start_response(req, &custom_generator->super); + } + + custom_generator->response->final = (request.stream_state == "END" || request.stream_state == "NON_STREAMING"); + + h2o_iovec_t body = h2o_strdup(&req->pool, response.body.c_str(), SIZE_MAX); + const h2o_send_state_t state = custom_generator->response->final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS; + h2o_send(req, &body, 1, state); +} + void HttpServer::set_auth_handler(bool (*handler)(std::map& params, const route_path& rpath, const std::string& auth_key)) { auth_handler = handler; diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index b7725320..3946a090 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -68,33 +68,6 @@ Option fetch_file_contents(const std::string & file_path) { return Option(content); } -void stream_response(bool (*req_handler)(http_req* req, http_res* res, void* data), - http_req & request, http_res & response, void* data) { - h2o_req_t* req = request._req; - h2o_custom_generator_t *custom_generator = reinterpret_cast(response.generator); - - if (request.stream_state == "START" || request.stream_state == "NON_STREAMING") { - req->res.status = response.status_code; - req->res.reason = http_res::get_status_reason(response.status_code); - h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL, - response.content_type_header.c_str(), - response.content_type_header.size()); - h2o_start_response(req, &custom_generator->super); - } - - custom_generator->res->final = (request.stream_state == "END" || request.stream_state == "NON_STREAMING"); - - h2o_iovec_t body = h2o_strdup(&req->pool, response.body.c_str(), SIZE_MAX); - const h2o_send_state_t state = custom_generator->res->final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS; - h2o_send(req, &body, 1, state); - - if(state == H2O_SEND_STATE_FINAL) { - delete custom_generator->req; - delete custom_generator->res; - delete custom_generator; - } -} - void init_cmdline_options(cmdline::parser & options, int argc, char **argv) { options.set_program_name("./typesense-server"); diff --git a/test/collection_test.cpp b/test/collection_test.cpp index 31873bfe..571561c3 100644 --- a/test/collection_test.cpp +++ b/test/collection_test.cpp @@ -1289,9 +1289,7 @@ TEST_F(CollectionTest, ImportDocuments) { // try importing records - Option import_res = coll_mul_fields->add_many(import_records); - ASSERT_TRUE(import_res.ok()); - nlohmann::json import_response = import_res.get(); + nlohmann::json import_response = coll_mul_fields->add_many(import_records); ASSERT_TRUE(import_response["success"].get()); ASSERT_EQ(18, import_response["num_imported"].get()); @@ -1314,11 +1312,11 @@ TEST_F(CollectionTest, ImportDocuments) { ASSERT_STREQ(id.c_str(), result_id.c_str()); } - // verify that empty import is caught gracefully + // verify that empty import is handled gracefully std::vector empty_records; - import_res = coll_mul_fields->add_many(empty_records); - ASSERT_FALSE(import_res.ok()); - ASSERT_STREQ("The request body was empty. So, no records were imported.", import_res.error().c_str()); + import_response = coll_mul_fields->add_many(empty_records); + ASSERT_TRUE(import_response["success"].get()); + ASSERT_EQ(0, import_response["num_imported"].get()); // verify that only bad records are rejected, rest must be imported (records 2 and 4 are bad) std::vector more_records = {"{\"id\": \"id1\", \"title\": \"Test1\", \"starring\": \"Rand Fish\", \"points\": 12, " @@ -1330,10 +1328,7 @@ TEST_F(CollectionTest, ImportDocuments) { "{\"title\": \"Test4\", \"points\": 55, " "\"cast\": [\"Tom Skerritt\"] }"}; - import_res = coll_mul_fields->add_many(more_records); - ASSERT_TRUE(import_res.ok()); - - import_response = import_res.get(); + import_response = coll_mul_fields->add_many(more_records); ASSERT_FALSE(import_response["success"].get()); ASSERT_EQ(2, import_response["num_imported"].get()); @@ -1356,10 +1351,7 @@ TEST_F(CollectionTest, ImportDocuments) { "{\"id\": \"id2\", \"title\": \"Test1\", \"starring\": \"Rand Fish\", \"points\": 12, " "\"cast\": [\"Tom Skerritt\"] }"}; - import_res = coll_mul_fields->add_many(more_records); - ASSERT_TRUE(import_res.ok()); - - import_response = import_res.get(); + import_response = coll_mul_fields->add_many(more_records); ASSERT_FALSE(import_response["success"].get()); ASSERT_EQ(1, import_response["num_imported"].get()); @@ -1374,10 +1366,7 @@ TEST_F(CollectionTest, ImportDocuments) { // handle bad import json more_records = {"[]"}; - import_res = coll_mul_fields->add_many(more_records); - ASSERT_TRUE(import_res.ok()); - - import_response = import_res.get(); + import_response = coll_mul_fields->add_many(more_records); ASSERT_FALSE(import_response["success"].get()); ASSERT_EQ(0, import_response["num_imported"].get());