From f3e630f9de6ca8335e00bc0418cfa44091e73a20 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Sat, 16 Dec 2017 13:02:26 +0530 Subject: [PATCH] Collection export API. --- TODO.md | 6 ++-- include/api.h | 4 +++ include/http_data.h | 7 +++- include/http_server.h | 15 +++++--- src/api.cpp | 44 +++++++++++++++++++++++ src/http_server.cpp | 66 ++++++++++++++++++++++++++++++----- src/main/benchmark.cpp | 2 +- src/main/typesense_server.cpp | 20 ++++++----- 8 files changed, 137 insertions(+), 27 deletions(-) diff --git a/TODO.md b/TODO.md index 3b3ba8d9..ed388415 100644 --- a/TODO.md +++ b/TODO.md @@ -75,11 +75,11 @@ - ~~https support~~ - ~~Validate before string to int conversion in the http api layer~~ - ~~art bool support~~ -- NOT operator support -- handle hyphens (replace them) +- ~~Export collection~~ - get collection should show schema -- Iterator +- handle hyphens (replace them) - clean special chars before indexing +- NOT operator support - > INT32_MAX validation for float field - Proper logging - highlight of string arrays? diff --git a/include/api.h b/include/api.h index 1fb59584..e93a3b98 100644 --- a/include/api.h +++ b/include/api.h @@ -14,6 +14,8 @@ void get_search(http_req & req, http_res & res); void get_collection_summary(http_req & req, http_res & res); +void get_collection_export(http_req & req, http_res & res); + void post_add_document(http_req & req, http_res & res); void get_fetch_document(http_req & req, http_res & res); @@ -24,5 +26,7 @@ void get_replication_updates(http_req &req, http_res &res); void on_send_response(void *data); +void collection_export_handler(http_req* req, http_res* res, void* data); + static constexpr const char* SEND_RESPONSE_MSG = "send_response"; static constexpr const char* REPLICATION_EVENT_MSG = "replication_event"; \ No newline at end of file diff --git a/include/http_data.h b/include/http_data.h index 2e55beb8..5c3dbe40 100644 --- a/include/http_data.h +++ b/include/http_data.h @@ -14,8 +14,14 @@ class HttpServer; struct http_res { uint32_t status_code; + std::string content_type_header; std::string body; HttpServer* server; + bool final; + + http_res(): content_type_header("application/json; charset=utf-8"), final(true) { + + } void send_200(const std::string & res_body) { status_code = 200; @@ -78,7 +84,6 @@ struct route_path { std::string http_method; std::vector path_parts; void (*handler)(http_req &, http_res &); - bool authenticated; bool async; inline bool operator< (const route_path& rhs) const { diff --git a/include/http_server.h b/include/http_server.h index 6542309f..d75ef8a3 100644 --- a/include/http_server.h +++ b/include/http_server.h @@ -73,13 +73,13 @@ public: void set_auth_handler(bool (*handler)(const route_path & rpath, const std::string & auth_key)); - void get(const std::string & path, void (*handler)(http_req & req, http_res & res), bool authenticated, bool async = false); + void get(const std::string & path, void (*handler)(http_req & req, http_res & res), bool async = false); - void post(const std::string & path, void (*handler)(http_req & req, http_res & res), bool authenticated, bool async = false); + void post(const std::string & path, void (*handler)(http_req & req, http_res & res), bool async = false); - void put(const std::string & path, void (*handler)(http_req & req, http_res & res), bool authenticated, bool async = false); + void put(const std::string & path, void (*handler)(http_req & req, http_res & res), bool async = false); - void del(const std::string & path, void (*handler)(http_req & req, http_res & res), bool authenticated, bool async = false); + void del(const std::string & path, void (*handler)(http_req & req, http_res & res), bool async = false); void on(const std::string & message, void (*handler)(void*)); @@ -87,6 +87,13 @@ public: void send_response(http_req* request, const http_res* response); + void stream_response(void (*handler)(http_req* req, http_res* res, void* data), http_req & request, + http_res & response, void* data); + + static void response_proceed(h2o_generator_t *generator, h2o_req_t *req); + + static void response_stop(h2o_generator_t *generator, h2o_req_t *req); + int run(); void stop(); diff --git a/src/api.cpp b/src/api.cpp index a35a0e33..a2fab3b5 100644 --- a/src/api.cpp +++ b/src/api.cpp @@ -259,6 +259,50 @@ void get_collection_summary(http_req & req, http_res & res) { res.send_200(json_response.dump()); } +void collection_export_handler(http_req* req, http_res* res, void* data) { + CollectionManager & collectionManager = CollectionManager::get_instance(); + Collection* collection = collectionManager.get_collection(req->params["collection"]); + const std::string seq_id_prefix = collection->get_seq_id_collection_prefix(); + + rocksdb::Iterator* it = reinterpret_cast(data); + + if(it->Valid() && it->key().ToString().compare(0, seq_id_prefix.size(), seq_id_prefix) == 0) { + res->body = it->value().ToString(); + res->final = false; + it->Next(); + + // apppend a new line character if there is going to be one more record to send + if(it->Valid() && it->key().ToString().compare(0, seq_id_prefix.size(), seq_id_prefix) == 0) { + res->body += "\n"; + } + + } else { + res->body = true; + res->final = true; + delete it; + } +} + +void get_collection_export(http_req & req, http_res & res) { + CollectionManager & collectionManager = CollectionManager::get_instance(); + Collection* collection = collectionManager.get_collection(req.params["collection"]); + + if(collection == nullptr) { + res.send_404(); + res.server->send_message(SEND_RESPONSE_MSG, new request_response{&req, &res}); + return ; + } + + const std::string seq_id_prefix = collection->get_seq_id_collection_prefix(); + + rocksdb::Iterator* it = collectionManager.get_store()->get_iterator(); + it->Seek(seq_id_prefix); + + res.content_type_header = "application/octet-stream"; + res.status_code = 200; + res.server->stream_response(collection_export_handler, req, res, (void *) it); +} + void post_add_document(http_req & req, http_res & res) { CollectionManager & collectionManager = CollectionManager::get_instance(); Collection* collection = collectionManager.get_collection(req.params["collection"]); diff --git a/src/http_server.cpp b/src/http_server.cpp index 2a009052..e3e034be 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -19,6 +19,13 @@ struct h2o_custom_res_message_t { void* data; }; +struct h2o_custom_generator_t { + h2o_generator_t super; + void (*handler)(http_req* req, http_res* res, void* data); + request_response req_res; + void* data; +}; + HttpServer::HttpServer(std::string listen_address, uint32_t listen_port, std::string ssl_cert_path, std::string ssl_cert_key_path): listen_address(listen_address), listen_port(listen_port), @@ -144,7 +151,6 @@ void HttpServer::on_stop_server(void *data) { void HttpServer::clear_timeouts(std::vector & timeouts) { for(h2o_timeout_t* timeout: timeouts) { while (!h2o_linklist_is_empty(&timeout->_entries)) { - std::cout << "Removing entry..." << std::endl; h2o_timeout_entry_t *entry = H2O_STRUCT_FROM_MEMBER(h2o_timeout_entry_t, _link, timeout->_entries.next); h2o_linklist_unlink(&entry->_link); entry->registered_at = 0; @@ -350,6 +356,48 @@ void HttpServer::send_response(http_req* request, const http_res* response) { delete response; } +void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) { + h2o_custom_generator_t* custom_generator = reinterpret_cast(generator); + custom_generator->handler(custom_generator->req_res.req, custom_generator->req_res.response, + custom_generator->data); + + h2o_iovec_t body = h2o_strdup(&req->pool, custom_generator->req_res.response->body.c_str(), SIZE_MAX); + const h2o_send_state_t state = custom_generator->req_res.response->final ? + H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS; + h2o_send(req, &body, 1, state); + + if(custom_generator->req_res.response->final) { + h2o_dispose_request(req); + delete custom_generator->req_res.req; + delete custom_generator->req_res.response; + } +} + +void HttpServer::response_stop(h2o_generator_t *generator, h2o_req_t *req) { + h2o_custom_generator_t* custom_generator = reinterpret_cast(generator); + + h2o_dispose_request(req); + delete custom_generator->req_res.req; + delete custom_generator->req_res.response; +} + +void HttpServer::stream_response(void (*handler)(http_req* req, http_res* res, void* data), + http_req & request, http_res & response, void* data) { + h2o_req_t* req = request._req; + h2o_custom_generator_t* custom_generator = new h2o_custom_generator_t { + h2o_generator_t {response_proceed, response_stop}, handler, request_response {&request, &response}, data + }; + + req->res.status = response.status_code; + req->res.reason = get_status_reason(response.status_code); + h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL, response.content_type_header.c_str(), + response.content_type_header.size()); + h2o_start_response(req, &custom_generator->super); + + h2o_iovec_t body = h2o_strdup(&req->pool, "", SIZE_MAX); + h2o_send(req, &body, 1, H2O_SEND_STATE_IN_PROGRESS); +} + int HttpServer::send_401_unauthorized(h2o_req_t *req) { h2o_generator_t generator = {NULL, NULL}; std::string res_body = std::string("{\"message\": \"Forbidden - ") + AUTH_HEADER + " header is invalid or not present.\"}"; @@ -366,31 +414,31 @@ void HttpServer::set_auth_handler(bool (*handler)(const route_path & rpath, cons auth_handler = handler; } -void HttpServer::get(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated, bool async) { +void HttpServer::get(const std::string & path, void (*handler)(http_req &, http_res &), bool async) { std::vector path_parts; StringUtils::split(path, path_parts, "/"); - route_path rpath = {"GET", path_parts, handler, authenticated, async}; + route_path rpath = {"GET", path_parts, handler, async}; routes.push_back(rpath); } -void HttpServer::post(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated, bool async) { +void HttpServer::post(const std::string & path, void (*handler)(http_req &, http_res &), bool async) { std::vector path_parts; StringUtils::split(path, path_parts, "/"); - route_path rpath = {"POST", path_parts, handler, authenticated, async}; + route_path rpath = {"POST", path_parts, handler, async}; routes.push_back(rpath); } -void HttpServer::put(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated, bool async) { +void HttpServer::put(const std::string & path, void (*handler)(http_req &, http_res &), bool async) { std::vector path_parts; StringUtils::split(path, path_parts, "/"); - route_path rpath = {"PUT", path_parts, handler, authenticated, async}; + route_path rpath = {"PUT", path_parts, handler, async}; routes.push_back(rpath); } -void HttpServer::del(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated, bool async) { +void HttpServer::del(const std::string & path, void (*handler)(http_req &, http_res &), bool async) { std::vector path_parts; StringUtils::split(path, path_parts, "/"); - route_path rpath = {"DELETE", path_parts, handler, authenticated, async}; + route_path rpath = {"DELETE", path_parts, handler, async}; routes.push_back(rpath); } diff --git a/src/main/benchmark.cpp b/src/main/benchmark.cpp index fdf5e8fc..b00372b8 100644 --- a/src/main/benchmark.cpp +++ b/src/main/benchmark.cpp @@ -21,7 +21,7 @@ int main(int argc, char* argv[]) { Store *store = new Store("/tmp/typesense-data"); CollectionManager & collectionManager = CollectionManager::get_instance(); - collectionManager.init(store, "abcd"); + collectionManager.init(store, "abcd", "1234"); Collection *collection = collectionManager.get_collection("hnstories_direct"); if(collection == nullptr) { diff --git a/src/main/typesense_server.cpp b/src/main/typesense_server.cpp index 1dd4a071..fbb2cb23 100644 --- a/src/main/typesense_server.cpp +++ b/src/main/typesense_server.cpp @@ -56,21 +56,23 @@ int main(int argc, char **argv) { // collection management server->set_auth_handler(handle_authentication); - server->post("/collections", post_create_collection, true); - server->get("/collections", get_collections, true); - server->del("/collections/:collection", del_drop_collection, true); + server->post("/collections", post_create_collection); + server->get("/collections", get_collections); + server->del("/collections/:collection", del_drop_collection); + server->get("/collections/:collection", get_collection_summary); + server->get("/collections/:collection/export", get_collection_export, true); // document management - server->post("/collections/:collection", post_add_document, true); - server->get("/collections/:collection", get_collection_summary, true); - server->get("/collections/:collection/search", get_search, false); - server->get("/collections/:collection/:id", get_fetch_document, true); - server->del("/collections/:collection/:id", del_remove_document, true); + server->post("/collections/:collection", post_add_document); + server->get("/collections/:collection/search", get_search); + server->get("/collections/:collection/:id", get_fetch_document); + server->del("/collections/:collection/:id", del_remove_document); // replication - server->get("/replication/updates", get_replication_updates, true, true); + server->get("/replication/updates", get_replication_updates, true); server->on(SEND_RESPONSE_MSG, on_send_response); + server->on(REPLICATION_EVENT_MSG, Replicator::on_replication_event); // start a background replication thread if the server is started as a read-only replica