error handling

This commit is contained in:
krunal1313 2023-07-04 17:32:44 +05:30
parent 46a2aa140b
commit c39126ee79
2 changed files with 40 additions and 31 deletions

View File

@ -142,7 +142,7 @@ private:
butil::EndPoint peering_endpoint;
void handle_gzip(const std::shared_ptr<http_req>& request);
Option<bool> handle_gzip(const std::shared_ptr<http_req>& request);
public:

View File

@ -180,50 +180,52 @@ string ReplicationState::resolve_node_hosts(const string& nodes_config) {
return final_nodes_config;
}
void ReplicationState::handle_gzip(const std::shared_ptr<http_req>& request) {
if(!request->zstream_initialized) {
Option<bool> ReplicationState::handle_gzip(const std::shared_ptr<http_req>& request) {
if (!request->zstream_initialized) {
request->zs.zalloc = Z_NULL;
request->zs.zfree = Z_NULL;
request->zs.opaque = Z_NULL;
request->zs.avail_in = 0;
request->zs.next_in = Z_NULL;
if (inflateInit2(&request->zs, 16 + MAX_WBITS) != Z_OK)
throw (std::runtime_error("inflateInit failed while decompressing."));
if (inflateInit2(&request->zs, 16 + MAX_WBITS) != Z_OK) {
return Option<bool>(404, "inflateInit failed while decompressing");
}
request->zstream_initialized = true;
}
if(request->zstream_initialized) {
std::string outbuffer;
outbuffer.resize(10 * request->body.size());
std::string outbuffer;
outbuffer.resize(10 * request->body.size());
request->zs.next_in = (Bytef *) request->body.c_str();
request->zs.avail_in = request->body.size();
std::size_t size_uncompressed = 0;
int ret = 0;
do {
request->zs.avail_out = static_cast<unsigned int>(outbuffer.size());
request->zs.next_out = reinterpret_cast<Bytef *>(&outbuffer[0] + size_uncompressed);
ret = inflate(&request->zs, Z_FINISH);
if (ret != Z_STREAM_END && ret != Z_OK && ret != Z_BUF_ERROR) {
std::string error_msg = request->zs.msg;
inflateEnd(&request->zs);
throw std::runtime_error(error_msg);
}
size_uncompressed += (outbuffer.size() - request->zs.avail_out);
} while (request->zs.avail_out == 0);
if (ret == Z_STREAM_END) {
request->zstream_initialized = false;
request->zs.next_in = (Bytef *) request->body.c_str();
request->zs.avail_in = request->body.size();
std::size_t size_uncompressed = 0;
int ret = 0;
do {
request->zs.avail_out = static_cast<unsigned int>(outbuffer.size());
request->zs.next_out = reinterpret_cast<Bytef *>(&outbuffer[0] + size_uncompressed);
ret = inflate(&request->zs, Z_FINISH);
if (ret != Z_STREAM_END && ret != Z_OK && ret != Z_BUF_ERROR) {
std::string error_msg = request->zs.msg;
inflateEnd(&request->zs);
return Option<bool>(404, error_msg);
}
outbuffer.resize(size_uncompressed);
size_uncompressed += (outbuffer.size() - request->zs.avail_out);
} while (request->zs.avail_out == 0);
request->body = outbuffer;
request->chunk_len = outbuffer.size();
if (ret == Z_STREAM_END) {
request->zstream_initialized = false;
inflateEnd(&request->zs);
}
outbuffer.resize(size_uncompressed);
request->body = outbuffer;
request->chunk_len = outbuffer.size();
return Option<bool>(true);
}
void ReplicationState::write(const std::shared_ptr<http_req>& request, const std::shared_ptr<http_res>& response) {
@ -269,7 +271,14 @@ void ReplicationState::write(const std::shared_ptr<http_req>& request, const std
//check if it's first gzip chunk or is gzip stream initialized
if(((request->body.size() > 2) &&
(31 == (int)request->body[0] && -117 == (int)request->body[1])) || request->zstream_initialized) {
handle_gzip(request);
auto res = handle_gzip(request);
if(!res.ok()) {
response->set_422(res.error());
response->final = true;
auto req_res = new async_req_res_t(request, response, true);
return message_dispatcher->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
}
}
// Serialize request to replicated WAL so that all the nodes in the group receive it as well.