Collection export API.

This commit is contained in:
Kishore Nallan 2017-12-16 13:02:26 +05:30
parent fe0db59877
commit f3e630f9de
8 changed files with 137 additions and 27 deletions

View File

@ -75,11 +75,11 @@
- ~~https support~~
- ~~Validate before string to int conversion in the http api layer~~
- ~~art bool support~~
- NOT operator support
- handle hyphens (replace them)
- ~~Export collection~~
- get collection should show schema
- Iterator
- handle hyphens (replace them)
- clean special chars before indexing
- NOT operator support
- > INT32_MAX validation for float field
- Proper logging
- highlight of string arrays?

View File

@ -14,6 +14,8 @@ void get_search(http_req & req, http_res & res);
void get_collection_summary(http_req & req, http_res & res);
void get_collection_export(http_req & req, http_res & res);
void post_add_document(http_req & req, http_res & res);
void get_fetch_document(http_req & req, http_res & res);
@ -24,5 +26,7 @@ void get_replication_updates(http_req &req, http_res &res);
void on_send_response(void *data);
void collection_export_handler(http_req* req, http_res* res, void* data);
static constexpr const char* SEND_RESPONSE_MSG = "send_response";
static constexpr const char* REPLICATION_EVENT_MSG = "replication_event";

View File

@ -14,8 +14,14 @@ class HttpServer;
struct http_res {
uint32_t status_code;
std::string content_type_header;
std::string body;
HttpServer* server;
bool final;
http_res(): content_type_header("application/json; charset=utf-8"), final(true) {
}
void send_200(const std::string & res_body) {
status_code = 200;
@ -78,7 +84,6 @@ struct route_path {
std::string http_method;
std::vector<std::string> path_parts;
void (*handler)(http_req &, http_res &);
bool authenticated;
bool async;
inline bool operator< (const route_path& rhs) const {

View File

@ -73,13 +73,13 @@ public:
void set_auth_handler(bool (*handler)(const route_path & rpath, const std::string & auth_key));
void get(const std::string & path, void (*handler)(http_req & req, http_res & res), bool authenticated, bool async = false);
void get(const std::string & path, void (*handler)(http_req & req, http_res & res), bool async = false);
void post(const std::string & path, void (*handler)(http_req & req, http_res & res), bool authenticated, bool async = false);
void post(const std::string & path, void (*handler)(http_req & req, http_res & res), bool async = false);
void put(const std::string & path, void (*handler)(http_req & req, http_res & res), bool authenticated, bool async = false);
void put(const std::string & path, void (*handler)(http_req & req, http_res & res), bool async = false);
void del(const std::string & path, void (*handler)(http_req & req, http_res & res), bool authenticated, bool async = false);
void del(const std::string & path, void (*handler)(http_req & req, http_res & res), bool async = false);
void on(const std::string & message, void (*handler)(void*));
@ -87,6 +87,13 @@ public:
void send_response(http_req* request, const http_res* response);
void stream_response(void (*handler)(http_req* req, http_res* res, void* data), http_req & request,
http_res & response, void* data);
static void response_proceed(h2o_generator_t *generator, h2o_req_t *req);
static void response_stop(h2o_generator_t *generator, h2o_req_t *req);
int run();
void stop();

View File

@ -259,6 +259,50 @@ void get_collection_summary(http_req & req, http_res & res) {
res.send_200(json_response.dump());
}
void collection_export_handler(http_req* req, http_res* res, void* data) {
CollectionManager & collectionManager = CollectionManager::get_instance();
Collection* collection = collectionManager.get_collection(req->params["collection"]);
const std::string seq_id_prefix = collection->get_seq_id_collection_prefix();
rocksdb::Iterator* it = reinterpret_cast<rocksdb::Iterator*>(data);
if(it->Valid() && it->key().ToString().compare(0, seq_id_prefix.size(), seq_id_prefix) == 0) {
res->body = it->value().ToString();
res->final = false;
it->Next();
// apppend a new line character if there is going to be one more record to send
if(it->Valid() && it->key().ToString().compare(0, seq_id_prefix.size(), seq_id_prefix) == 0) {
res->body += "\n";
}
} else {
res->body = true;
res->final = true;
delete it;
}
}
void get_collection_export(http_req & req, http_res & res) {
CollectionManager & collectionManager = CollectionManager::get_instance();
Collection* collection = collectionManager.get_collection(req.params["collection"]);
if(collection == nullptr) {
res.send_404();
res.server->send_message(SEND_RESPONSE_MSG, new request_response{&req, &res});
return ;
}
const std::string seq_id_prefix = collection->get_seq_id_collection_prefix();
rocksdb::Iterator* it = collectionManager.get_store()->get_iterator();
it->Seek(seq_id_prefix);
res.content_type_header = "application/octet-stream";
res.status_code = 200;
res.server->stream_response(collection_export_handler, req, res, (void *) it);
}
void post_add_document(http_req & req, http_res & res) {
CollectionManager & collectionManager = CollectionManager::get_instance();
Collection* collection = collectionManager.get_collection(req.params["collection"]);

View File

@ -19,6 +19,13 @@ struct h2o_custom_res_message_t {
void* data;
};
struct h2o_custom_generator_t {
h2o_generator_t super;
void (*handler)(http_req* req, http_res* res, void* data);
request_response req_res;
void* data;
};
HttpServer::HttpServer(std::string listen_address, uint32_t listen_port,
std::string ssl_cert_path, std::string ssl_cert_key_path):
listen_address(listen_address), listen_port(listen_port),
@ -144,7 +151,6 @@ void HttpServer::on_stop_server(void *data) {
void HttpServer::clear_timeouts(std::vector<h2o_timeout_t*> & timeouts) {
for(h2o_timeout_t* timeout: timeouts) {
while (!h2o_linklist_is_empty(&timeout->_entries)) {
std::cout << "Removing entry..." << std::endl;
h2o_timeout_entry_t *entry = H2O_STRUCT_FROM_MEMBER(h2o_timeout_entry_t, _link, timeout->_entries.next);
h2o_linklist_unlink(&entry->_link);
entry->registered_at = 0;
@ -350,6 +356,48 @@ void HttpServer::send_response(http_req* request, const http_res* response) {
delete response;
}
void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) {
h2o_custom_generator_t* custom_generator = reinterpret_cast<h2o_custom_generator_t*>(generator);
custom_generator->handler(custom_generator->req_res.req, custom_generator->req_res.response,
custom_generator->data);
h2o_iovec_t body = h2o_strdup(&req->pool, custom_generator->req_res.response->body.c_str(), SIZE_MAX);
const h2o_send_state_t state = custom_generator->req_res.response->final ?
H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS;
h2o_send(req, &body, 1, state);
if(custom_generator->req_res.response->final) {
h2o_dispose_request(req);
delete custom_generator->req_res.req;
delete custom_generator->req_res.response;
}
}
void HttpServer::response_stop(h2o_generator_t *generator, h2o_req_t *req) {
h2o_custom_generator_t* custom_generator = reinterpret_cast<h2o_custom_generator_t*>(generator);
h2o_dispose_request(req);
delete custom_generator->req_res.req;
delete custom_generator->req_res.response;
}
void HttpServer::stream_response(void (*handler)(http_req* req, http_res* res, void* data),
http_req & request, http_res & response, void* data) {
h2o_req_t* req = request._req;
h2o_custom_generator_t* custom_generator = new h2o_custom_generator_t {
h2o_generator_t {response_proceed, response_stop}, handler, request_response {&request, &response}, data
};
req->res.status = response.status_code;
req->res.reason = get_status_reason(response.status_code);
h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL, response.content_type_header.c_str(),
response.content_type_header.size());
h2o_start_response(req, &custom_generator->super);
h2o_iovec_t body = h2o_strdup(&req->pool, "", SIZE_MAX);
h2o_send(req, &body, 1, H2O_SEND_STATE_IN_PROGRESS);
}
int HttpServer::send_401_unauthorized(h2o_req_t *req) {
h2o_generator_t generator = {NULL, NULL};
std::string res_body = std::string("{\"message\": \"Forbidden - ") + AUTH_HEADER + " header is invalid or not present.\"}";
@ -366,31 +414,31 @@ void HttpServer::set_auth_handler(bool (*handler)(const route_path & rpath, cons
auth_handler = handler;
}
void HttpServer::get(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated, bool async) {
void HttpServer::get(const std::string & path, void (*handler)(http_req &, http_res &), bool async) {
std::vector<std::string> path_parts;
StringUtils::split(path, path_parts, "/");
route_path rpath = {"GET", path_parts, handler, authenticated, async};
route_path rpath = {"GET", path_parts, handler, async};
routes.push_back(rpath);
}
void HttpServer::post(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated, bool async) {
void HttpServer::post(const std::string & path, void (*handler)(http_req &, http_res &), bool async) {
std::vector<std::string> path_parts;
StringUtils::split(path, path_parts, "/");
route_path rpath = {"POST", path_parts, handler, authenticated, async};
route_path rpath = {"POST", path_parts, handler, async};
routes.push_back(rpath);
}
void HttpServer::put(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated, bool async) {
void HttpServer::put(const std::string & path, void (*handler)(http_req &, http_res &), bool async) {
std::vector<std::string> path_parts;
StringUtils::split(path, path_parts, "/");
route_path rpath = {"PUT", path_parts, handler, authenticated, async};
route_path rpath = {"PUT", path_parts, handler, async};
routes.push_back(rpath);
}
void HttpServer::del(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated, bool async) {
void HttpServer::del(const std::string & path, void (*handler)(http_req &, http_res &), bool async) {
std::vector<std::string> path_parts;
StringUtils::split(path, path_parts, "/");
route_path rpath = {"DELETE", path_parts, handler, authenticated, async};
route_path rpath = {"DELETE", path_parts, handler, async};
routes.push_back(rpath);
}

View File

@ -21,7 +21,7 @@ int main(int argc, char* argv[]) {
Store *store = new Store("/tmp/typesense-data");
CollectionManager & collectionManager = CollectionManager::get_instance();
collectionManager.init(store, "abcd");
collectionManager.init(store, "abcd", "1234");
Collection *collection = collectionManager.get_collection("hnstories_direct");
if(collection == nullptr) {

View File

@ -56,21 +56,23 @@ int main(int argc, char **argv) {
// collection management
server->set_auth_handler(handle_authentication);
server->post("/collections", post_create_collection, true);
server->get("/collections", get_collections, true);
server->del("/collections/:collection", del_drop_collection, true);
server->post("/collections", post_create_collection);
server->get("/collections", get_collections);
server->del("/collections/:collection", del_drop_collection);
server->get("/collections/:collection", get_collection_summary);
server->get("/collections/:collection/export", get_collection_export, true);
// document management
server->post("/collections/:collection", post_add_document, true);
server->get("/collections/:collection", get_collection_summary, true);
server->get("/collections/:collection/search", get_search, false);
server->get("/collections/:collection/:id", get_fetch_document, true);
server->del("/collections/:collection/:id", del_remove_document, true);
server->post("/collections/:collection", post_add_document);
server->get("/collections/:collection/search", get_search);
server->get("/collections/:collection/:id", get_fetch_document);
server->del("/collections/:collection/:id", del_remove_document);
// replication
server->get("/replication/updates", get_replication_updates, true, true);
server->get("/replication/updates", get_replication_updates, true);
server->on(SEND_RESPONSE_MSG, on_send_response);
server->on(REPLICATION_EVENT_MSG, Replicator::on_replication_event);
// start a background replication thread if the server is started as a read-only replica