From c39126ee798894bfb3a6543cf74e0bc280df40a3 Mon Sep 17 00:00:00 2001 From: krunal1313 Date: Tue, 4 Jul 2023 17:32:44 +0530 Subject: [PATCH] error handling --- include/raft_server.h | 2 +- src/raft_server.cpp | 69 ++++++++++++++++++++++++------------------- 2 files changed, 40 insertions(+), 31 deletions(-) diff --git a/include/raft_server.h b/include/raft_server.h index 2ea9c8d0..dddaccb9 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -142,7 +142,7 @@ private: butil::EndPoint peering_endpoint; - void handle_gzip(const std::shared_ptr& request); + Option handle_gzip(const std::shared_ptr& request); public: diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 51027bf1..053e8bd0 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -180,50 +180,52 @@ string ReplicationState::resolve_node_hosts(const string& nodes_config) { return final_nodes_config; } -void ReplicationState::handle_gzip(const std::shared_ptr& request) { - if(!request->zstream_initialized) { +Option ReplicationState::handle_gzip(const std::shared_ptr& request) { + if (!request->zstream_initialized) { request->zs.zalloc = Z_NULL; request->zs.zfree = Z_NULL; request->zs.opaque = Z_NULL; request->zs.avail_in = 0; request->zs.next_in = Z_NULL; - if (inflateInit2(&request->zs, 16 + MAX_WBITS) != Z_OK) - throw (std::runtime_error("inflateInit failed while decompressing.")); + if (inflateInit2(&request->zs, 16 + MAX_WBITS) != Z_OK) { + return Option(404, "inflateInit failed while decompressing"); + } + request->zstream_initialized = true; } - if(request->zstream_initialized) { - std::string outbuffer; - outbuffer.resize(10 * request->body.size()); + std::string outbuffer; + outbuffer.resize(10 * request->body.size()); - request->zs.next_in = (Bytef *) request->body.c_str(); - request->zs.avail_in = request->body.size(); - std::size_t size_uncompressed = 0; - int ret = 0; - do { - request->zs.avail_out = static_cast(outbuffer.size()); - request->zs.next_out = reinterpret_cast(&outbuffer[0] + size_uncompressed); - ret = inflate(&request->zs, Z_FINISH); - if (ret != Z_STREAM_END && ret != Z_OK && ret != Z_BUF_ERROR) { - std::string error_msg = request->zs.msg; - inflateEnd(&request->zs); - throw std::runtime_error(error_msg); - } - - size_uncompressed += (outbuffer.size() - request->zs.avail_out); - } while (request->zs.avail_out == 0); - - if (ret == Z_STREAM_END) { - request->zstream_initialized = false; + request->zs.next_in = (Bytef *) request->body.c_str(); + request->zs.avail_in = request->body.size(); + std::size_t size_uncompressed = 0; + int ret = 0; + do { + request->zs.avail_out = static_cast(outbuffer.size()); + request->zs.next_out = reinterpret_cast(&outbuffer[0] + size_uncompressed); + ret = inflate(&request->zs, Z_FINISH); + if (ret != Z_STREAM_END && ret != Z_OK && ret != Z_BUF_ERROR) { + std::string error_msg = request->zs.msg; inflateEnd(&request->zs); + return Option(404, error_msg); } - outbuffer.resize(size_uncompressed); + size_uncompressed += (outbuffer.size() - request->zs.avail_out); + } while (request->zs.avail_out == 0); - request->body = outbuffer; - request->chunk_len = outbuffer.size(); + if (ret == Z_STREAM_END) { + request->zstream_initialized = false; + inflateEnd(&request->zs); } + + outbuffer.resize(size_uncompressed); + + request->body = outbuffer; + request->chunk_len = outbuffer.size(); + + return Option(true); } void ReplicationState::write(const std::shared_ptr& request, const std::shared_ptr& response) { @@ -269,7 +271,14 @@ void ReplicationState::write(const std::shared_ptr& request, const std //check if it's first gzip chunk or is gzip stream initialized if(((request->body.size() > 2) && (31 == (int)request->body[0] && -117 == (int)request->body[1])) || request->zstream_initialized) { - handle_gzip(request); + auto res = handle_gzip(request); + + if(!res.ok()) { + response->set_422(res.error()); + response->final = true; + auto req_res = new async_req_res_t(request, response, true); + return message_dispatcher->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res); + } } // Serialize request to replicated WAL so that all the nodes in the group receive it as well.