Reuse new streaming framework for exports.

This commit is contained in:
kishorenc 2020-08-17 07:54:30 +05:30
parent e2c0b52671
commit 7f928fb0bc
4 changed files with 53 additions and 42 deletions

View File

@ -69,8 +69,6 @@ bool get_metrics_json(http_req& req, http_res& res);
// Misc helpers
bool collection_export_handler(http_req* req, http_res* res, void* data);
bool async_write_request(void *data);
static constexpr const char* SEND_RESPONSE_MSG = "send_response";

View File

@ -125,14 +125,16 @@ struct http_req {
std::string body;
std::string metadata;
http_req(): route_hash(1) {
void* data;
http_req(): _req(nullptr), route_hash(1), chunk_length(0), data(nullptr) {
}
http_req(h2o_req_t* _req, const std::string & http_method, uint64_t route_hash,
const std::map<std::string, std::string> & params, const std::string& body):
_req(_req), http_method(http_method), route_hash(route_hash), params(params),
stream_state("NON_STREAMING"), body(body) {
stream_state("NON_STREAMING"), chunk_length(0), body(body), data(nullptr) {
}

View File

@ -502,37 +502,6 @@ bool get_collection_summary(http_req & req, http_res & res) {
return true;
}
bool collection_export_handler(http_req* req, http_res* res, void* data) {
CollectionManager & collectionManager = CollectionManager::get_instance();
Collection* collection = collectionManager.get_collection(req->params["collection"]);
if(!collection) {
res->set_404();
return false;
}
const std::string seq_id_prefix = collection->get_seq_id_collection_prefix();
rocksdb::Iterator* it = reinterpret_cast<rocksdb::Iterator*>(data);
if(it->Valid() && it->key().ToString().compare(0, seq_id_prefix.size(), seq_id_prefix) == 0) {
res->body = it->value().ToString();
res->final = false;
it->Next();
// append a new line character if there is going to be one more record to send
if(it->Valid() && it->key().ToString().compare(0, seq_id_prefix.size(), seq_id_prefix) == 0) {
res->body += "\n";
}
} else {
res->body = "";
res->final = true;
delete it;
}
return true;
}
bool get_export_documents(http_req & req, http_res & res) {
// NOTE: this is a streaming response end-point so this handler will be called multiple times
CollectionManager & collectionManager = CollectionManager::get_instance();
@ -541,19 +510,50 @@ bool get_export_documents(http_req & req, http_res & res) {
if(collection == nullptr) {
req.stream_state = "NON_STREAMING";
res.set_404();
server->stream_response(req, res);
HttpServer::stream_response(req, res);
return false;
}
const std::string seq_id_prefix = collection->get_seq_id_collection_prefix();
rocksdb::Iterator* it = nullptr;
rocksdb::Iterator* it = collectionManager.get_store()->get_iterator();
it->Seek(seq_id_prefix);
if(req.data == nullptr) {
it = collectionManager.get_store()->get_iterator();
it->Seek(seq_id_prefix);
req.data = it;
} else {
it = static_cast<rocksdb::Iterator*>(req.data);
}
if(req.stream_state == "DISPOSE") {
LOG(INFO) << "Disposing export iterator";
delete it;
req.data = nullptr;
return true;
}
if(req.stream_state == "NON_STREAMING") {
req.stream_state = "START";
} else if(req.stream_state == "START") {
req.stream_state = "IN_PROGRESS";
}
if(it->Valid() && it->key().ToString().compare(0, seq_id_prefix.size(), seq_id_prefix) == 0) {
res.body = it->value().ToString();
it->Next();
// append a new line character if there is going to be one more record to send
if(it->Valid() && it->key().ToString().compare(0, seq_id_prefix.size(), seq_id_prefix) == 0) {
res.body += "\n";
} else {
req.stream_state = "END";
}
}
res.content_type_header = "application/octet-stream";
res.status_code = 200;
//stream_response(collection_export_handler, req, res, (void *) it);
HttpServer::stream_response(req, res);
return true;
}
@ -562,14 +562,19 @@ bool post_import_documents(http_req& req, http_res& res) {
Collection* collection = collectionManager.get_collection(req.params["collection"]);
if(collection == nullptr) {
req.stream_state = "NON_STREAMING";
req.stream_state = (req.stream_state == "IN_PROGRESS") ? "END" : "NON_STREAMING";
res.set_404();
server->stream_response(req, res);
HttpServer::stream_response(req, res);
return false;
}
LOG(INFO) << "Import, req.body.size: " << req.body.size() << ", state: " << req.stream_state;
if(req.stream_state == "DISPOSE") {
// we don't cache any state across iterations, so just return
return true;
}
std::vector<std::string> json_lines;
StringUtils::split(req.body, json_lines, "\n");
@ -611,7 +616,7 @@ bool post_import_documents(http_req& req, http_res& res) {
res.content_type_header = "text/plain; charset=utf8";
res.set_200(ss.str());
server->stream_response(req, res);
HttpServer::stream_response(req, res);
return true;
}

View File

@ -272,6 +272,12 @@ void HttpServer::on_res_generator_dispose(void *self) {
LOG(INFO) << "on_res_generator_dispose fires";
h2o_custom_generator_t* res_generator = static_cast<h2o_custom_generator_t*>(self);
if(res_generator->rpath->async_res) {
// for the handler to free any cached resources like an iterator
res_generator->request->stream_state = "DISPOSE";
res_generator->rpath->handler(*res_generator->request, *res_generator->response);
}
// res_generator itself is reference counted, so we only delete the members
delete res_generator->request;
delete res_generator->response;