Fix http streaming on upgraded h2o lib.

This commit is contained in:
Kishore Nallan 2023-03-29 17:36:03 +05:30
parent 56184aff26
commit 9c8b2eed72
3 changed files with 14 additions and 48 deletions

View File

@ -244,13 +244,12 @@ struct http_req {
int64_t log_index;
std::atomic<bool> is_http_v1;
std::atomic<bool> is_diposed;
std::string client_ip = "0.0.0.0";
http_req(): _req(nullptr), route_hash(1),
first_chunk_aggregate(true), last_chunk_aggregate(false),
chunk_len(0), body_index(0), data(nullptr), ready(false), log_index(0), is_http_v1(true),
chunk_len(0), body_index(0), data(nullptr), ready(false), log_index(0),
is_diposed(false) {
start_ts = std::chrono::duration_cast<std::chrono::microseconds>(
@ -272,7 +271,6 @@ struct http_req {
if(_req != nullptr) {
const auto& tv = _req->processed_at.at;
conn_ts = (tv.tv_sec * 1000 * 1000) + tv.tv_usec;
is_http_v1 = (_req->version < 0x200);
} else {
conn_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();

View File

@ -48,7 +48,6 @@ private:
public:
bool is_req_early_exit = false;
bool is_req_http1 = true;
bool is_res_start = true;
h2o_send_state_t send_state = H2O_SEND_STATE_IN_PROGRESS;
@ -125,7 +124,6 @@ public:
h2o_custom_generator_t* res_generator = static_cast<h2o_custom_generator_t*>(res->generator.load());
res_state.is_req_early_exit = (res_generator->rpath->async_req && res->final && !req->last_chunk_aggregate);
res_state.is_req_http1 = req->is_http_v1;
res_state.send_state = res->final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS;
res_state.generator = (res_generator == nullptr) ? nullptr : &res_generator->h2o_generator;
res_state.set_response(res->status_code, res->content_type_header, res->body);

View File

@ -544,17 +544,18 @@ bool HttpServer::is_write_request(const std::string& root_resource, const std::s
}
int HttpServer::async_req_cb(void *ctx, int is_end_stream) {
// NOTE: this callback is triggered multiple times by HTTP 2 but only once by HTTP 1
// This quirk is because of the underlying buffer/window sizes. We will have to deal with both cases.
h2o_custom_generator_t* custom_generator = static_cast<h2o_custom_generator_t*>(ctx);
const std::shared_ptr<http_req>& request = custom_generator->req();
const std::shared_ptr<http_res>& response = custom_generator->res();
h2o_iovec_t chunk = request->_req->entity;
bool async_req = custom_generator->rpath->async_req;
bool is_http_v1 = (0x101 <= request->_req->version && request->_req->version < 0x200);
/*
LOG(INFO) << "async_req_cb, chunk.len=" << chunk.len
<< ", is_http_v1: " << is_http_v1
<< ", request->req->entity.len=" << request->req->entity.len
<< ", content_len: " << request->req->content_length
<< ", is_end_stream=" << is_end_stream;
@ -562,8 +563,12 @@ int HttpServer::async_req_cb(void *ctx, int is_end_stream) {
// disallow specific curl clients from using import call via http2
// detects: https://github.com/curl/curl/issues/1410
if(request->_req->version >= 0x200 && request->path_without_query.find("import") != std::string::npos) {
ssize_t agent_header_cursor = h2o_find_header_by_str(&request->_req->headers, http_req::AGENT_HEADER, strlen(http_req::AGENT_HEADER), -1);
if(!is_http_v1 && async_req && request->first_chunk_aggregate && request->chunk_len == 0 &&
request->path_without_query.find("import") != std::string::npos) {
ssize_t agent_header_cursor = h2o_find_header_by_str(&request->_req->headers,
http_req::AGENT_HEADER,
strlen(http_req::AGENT_HEADER), -1);
if(agent_header_cursor != -1) {
h2o_iovec_t & slot = request->_req->headers.entries[agent_header_cursor].value;
const std::string user_agent = std::string(slot.base, slot.len);
@ -601,25 +606,7 @@ int HttpServer::async_req_cb(void *ctx, int is_end_stream) {
//LOG(INFO) << "request->body.size(): " << request->body.size() << ", request->chunk_len=" << request->chunk_len;
// LOG(INFO) << "req->entity.len: " << request->req->entity.len << ", request->chunk_len=" << request->chunk_len;
bool async_req = custom_generator->rpath->async_req;
/*
On HTTP2, the request body callback is invoked multiple times with chunks of 16,384 bytes until the
`active_stream_window_size` is reached. For the first iteration, `active_stream_window_size`
includes initial request entity size and as well as chunk sizes
On HTTP 1, though, the handler is called only once with a small chunk size and requires proceed_req() to
be called for fetching further chunks. We need to handle this difference.
*/
bool exceeds_chunk_limit;
if(!request->is_http_v1 && request->first_chunk_aggregate) {
exceeds_chunk_limit = ((request->chunk_len + request->_req->entity.len) >= ACTIVE_STREAM_WINDOW_SIZE);
} else {
exceeds_chunk_limit = (request->chunk_len >= ACTIVE_STREAM_WINDOW_SIZE);
}
bool exceeds_chunk_limit = (request->chunk_len >= ACTIVE_STREAM_WINDOW_SIZE);
bool can_process_async = async_req && exceeds_chunk_limit;
/*if(is_end_stream == 1) {
@ -641,20 +628,7 @@ int HttpServer::async_req_cb(void *ctx, int is_end_stream) {
return 0;
}
// we are not ready to fire the request handler, so that means we need to buffer the request further
// this could be because we are a) dealing with a HTTP v1 request or b) a synchronous request
if(request->is_http_v1) {
// http v1 callbacks fire on small chunk sizes, so fetch more to match window size of http v2 buffer
request->_req->proceed_req(request->_req, NULL);
}
if(!async_req) {
// progress ONLY non-streaming type request body since
// streaming requests will call proceed_req in an async fashion
request->_req->proceed_req(request->_req, NULL);
}
request->_req->proceed_req(request->_req, NULL);
return 0;
}
@ -825,12 +799,8 @@ void HttpServer::stream_response(stream_response_state_t& state) {
h2o_start_response(req, state.generator);
}
if(state.is_req_http1) {
h2o_send(req, &state.res_body, 1, H2O_SEND_STATE_FINAL);
h2o_dispose_request(req);
} else {
h2o_send(req, &state.res_body, 1, H2O_SEND_STATE_ERROR);
}
h2o_send(req, &state.res_body, 1, H2O_SEND_STATE_FINAL);
h2o_dispose_request(req);
return ;
}