/* * RESTClient.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "fdbclient/RESTClient.h" #include "fdbclient/HTTP.h" #include "flow/IRateControl.h" #include "fdbclient/RESTUtils.h" #include "flow/Arena.h" #include "flow/Error.h" #include "flow/FastRef.h" #include "flow/Knobs.h" #include "flow/Net2Packet.h" #include "flow/flow.h" #include "flow/network.h" #include "flow/serialize.h" #include "flow/Trace.h" #include "flow/UnitTest.h" #include #include #include "flow/actorcompiler.h" // always the last include json_spirit::mObject RESTClient::Stats::getJSON() { json_spirit::mObject o; o["host_service"] = host_service; o["requests_failed"] = requests_failed; o["requests_successful"] = requests_successful; o["bytes_sent"] = bytes_sent; return o; } RESTClient::Stats RESTClient::Stats::operator-(const Stats& rhs) { Stats r(host_service); r.requests_failed = requests_failed - rhs.requests_failed; r.requests_successful = requests_successful - rhs.requests_successful; r.bytes_sent = bytes_sent - rhs.bytes_sent; return r; } RESTClient::RESTClient() {} RESTClient::RESTClient(std::unordered_map& knobSettings) { knobs.set(knobSettings); } void RESTClient::setKnobs(const std::unordered_map& knobSettings) { knobs.set(knobSettings); } std::unordered_map RESTClient::getKnobs() const { return knobs.get(); } ACTOR Future> doRequest_impl(Reference client, std::string verb, HTTP::Headers headers, RESTUrl* url, std::set successCodes) { state UnsentPacketQueue content; state int contentLen = url->body.size(); if (url->body.size() > 0) { PacketWriter pw(content.getWriteBuffer(url->body.size()), nullptr, Unversioned()); pw.serializeBytes(url->body); } std::string statsKey = RESTClient::getStatsKey(url->service, url->service); auto sItr = client->statsMap.find(statsKey); if (sItr == client->statsMap.end()) { client->statsMap.emplace(statsKey, std::make_unique(statsKey)); } headers["Content-Length"] = format("%d", contentLen); headers["Host"] = url->host; state int maxTries = std::min(client->knobs.request_tries, client->knobs.connect_tries); state int thisTry = 1; state double nextRetryDelay = 2.0; state Reference sendReceiveRate = makeReference(); state double reqTimeout = (client->knobs.request_timeout_secs * 1.0) / 60; state RESTConnectionPoolKey connectPoolKey = RESTConnectionPool::getConnectionPoolKey(url->host, url->service); state RESTClient::Stats* statsPtr = client->statsMap[statsKey].get(); loop { state Optional err; state Optional remoteAddress; state bool connectionEstablished = false; state Reference r; try { // Start connecting Future frconn = client->conectionPool->connect( connectPoolKey, client->knobs.secure_connection, client->knobs.max_connection_life); // Finish connecting, do request state RESTConnectionPool::ReusableConnection rconn = wait(timeoutError(frconn, client->knobs.connect_timeout)); connectionEstablished = true; remoteAddress = rconn.conn->getPeerAddress(); Reference _r = wait(timeoutError(HTTP::doRequest(rconn.conn, verb, url->resource, headers, contentLen > 0 ? &content : nullptr, contentLen, sendReceiveRate, &statsPtr->bytes_sent, sendReceiveRate), reqTimeout)); r = _r; // Since the response was parsed successfully (which is why we are here) reuse the connection unless we // received the "Connection: close" header. if (r->headers["Connection"] != "close") { client->conectionPool->returnConnection(connectPoolKey, rconn, client->knobs.connection_pool_size); } rconn.conn.clear(); } catch (Error& e) { if (e.code() == error_code_actor_cancelled) { throw; } err = e; } // If err is not present then r is valid. // If r->code is in successCodes then record the successful request and return r. if (!err.present() && successCodes.count(r->code) != 0) { statsPtr->requests_successful++; return r; } // Otherwise, this request is considered failed. Update failure count. statsPtr->requests_failed++; // All errors in err are potentially retryable as well as certain HTTP response codes... bool retryable = err.present() || r->code == HTTP::HTTP_STATUS_CODE_INTERNAL_SERVER_ERROR || r->code == HTTP::HTTP_STATUS_CODE_BAD_GATEWAY || r->code == HTTP::HTTP_STATUS_CODE_SERVICE_UNAVAILABLE || r->code == HTTP::HTTP_STATUS_CODE_TOO_MANY_REQUESTS; // But only if our previous attempt was not the last allowable try. retryable = retryable && (thisTry < maxTries); TraceEvent event(SevWarn, retryable ? "RESTClient_FailedRetryable" : "RESTClient_RequestFailed"); // Attach err to trace event if present, otherwise extract some stuff from the response if (err.present()) { event.errorUnsuppressed(err.get()); } event.suppressFor(60); if (!err.present()) { event.detail("ResponseCode", r->code); } event.detail("ConnectionEstablished", connectionEstablished); if (remoteAddress.present()) event.detail("RemoteEndpoint", remoteAddress.get()); else event.detail("RemoteHost", url->host); event.detail("Verb", verb).detail("Resource", url->resource).detail("ThisTry", thisTry); // If r is not valid or not code TOO_MANY_REQUESTS then increment the try count. // TOO_MANY_REQUEST's will not count against the attempt limit. if (!r || r->code != HTTP::HTTP_STATUS_CODE_TOO_MANY_REQUESTS) { ++thisTry; } // We will wait delay seconds before the next retry, start with nextRetryDelay. double delay = nextRetryDelay; // Double but limit the *next* nextRetryDelay. nextRetryDelay = std::min(nextRetryDelay * 2, 60.0); if (retryable) { // If r is valid then obey the Retry-After response header if present. if (r) { auto iRetryAfter = r->headers.find("Retry-After"); if (iRetryAfter != r->headers.end()) { event.detail("RetryAfterHeader", iRetryAfter->second); char* pEnd; double retryAfter = strtod(iRetryAfter->second.c_str(), &pEnd); if (*pEnd) { // If there were other characters then don't trust the parsed value retryAfter = HTTP::HTTP_RETRYAFTER_DELAY_SECS; } // Update delay delay = std::max(delay, retryAfter); } } // Log the delay then wait. event.detail("RetryDelay", delay); wait(::delay(delay)); } else { // We can't retry, so throw something. // This error code means the authentication header was not accepted, likely the account or key is wrong. if (r && r->code == HTTP::HTTP_STATUS_CODE_NOT_ACCEPTABLE) { throw http_not_accepted(); } if (r && r->code == HTTP::HTTP_STATUS_CODE_UNAUTHORIZED) { throw http_auth_failed(); } // Recognize and throw specific errors if (err.present()) { int code = err.get().code(); // If we get a timed_out error during the the connect() phase, we'll call that connection_failed despite // the fact that there was technically never a 'connection' to begin with. It differentiates between an // active connection timing out vs a connection timing out, though not between an active connection // failing vs connection attempt failing. // TODO: Add more error types? if (code == error_code_timed_out && !connectionEstablished) { throw connection_failed(); } if (code == error_code_timed_out || code == error_code_connection_failed || code == error_code_lookup_failed) { throw err.get(); } } throw http_request_failed(); } } } Future> RESTClient::doPutOrPost(const std::string& verb, Optional optHeaders, RESTUrl* url, std::set successCodes) { HTTP::Headers headers; if (optHeaders.present()) { headers = optHeaders.get(); } return doRequest_impl(Reference::addRef(this), verb, headers, url, successCodes); } Future> RESTClient::doPost(const std::string& fullUrl, const std::string& requestBody, Optional optHeaders) { RESTUrl url(fullUrl, requestBody, knobs.secure_connection); return doPutOrPost(HTTP::HTTP_VERB_POST, optHeaders, std::addressof(url), { HTTP::HTTP_STATUS_CODE_OK }); } Future> RESTClient::doPut(const std::string& fullUrl, const std::string& requestBody, Optional optHeaders) { RESTUrl url(fullUrl, requestBody, knobs.secure_connection); return doPutOrPost( HTTP::HTTP_VERB_PUT, optHeaders, std::addressof(url), // 201 - on successful resource create // 200 / 204 - if target resource representation was successfully modified with the desired state { HTTP::HTTP_STATUS_CODE_OK, HTTP::HTTP_STATUS_CODE_CREATED, HTTP::HTTP_STATUS_CODE_NO_CONTENT }); } Future> RESTClient::doGetHeadDeleteOrTrace(const std::string& verb, Optional optHeaders, RESTUrl* url, std::set successCodes) { HTTP::Headers headers; if (optHeaders.present()) { headers = optHeaders.get(); } return doRequest_impl(Reference::addRef(this), HTTP::HTTP_VERB_GET, headers, url, successCodes); } Future> RESTClient::doGet(const std::string& fullUrl, Optional optHeaders) { RESTUrl url(fullUrl, knobs.secure_connection); return doGetHeadDeleteOrTrace(HTTP::HTTP_VERB_GET, optHeaders, std::addressof(url), { HTTP::HTTP_STATUS_CODE_OK }); } Future> RESTClient::doHead(const std::string& fullUrl, Optional optHeaders) { RESTUrl url(fullUrl, knobs.secure_connection); return doGetHeadDeleteOrTrace(HTTP::HTTP_VERB_HEAD, optHeaders, std::addressof(url), { HTTP::HTTP_STATUS_CODE_OK }); } Future> RESTClient::doDelete(const std::string& fullUrl, Optional optHeaders) { RESTUrl url(fullUrl, knobs.secure_connection); return doGetHeadDeleteOrTrace( HTTP::HTTP_VERB_DELETE, optHeaders, std::addressof(url), // 200 - action has been enacted. // 202 - action will likely succeed, but, has not yet been enacted. // 204 - action has been enated, no further information is to supplied. { HTTP::HTTP_STATUS_CODE_OK, HTTP::HTTP_STATUS_CODE_NO_CONTENT, HTTP::HTTP_STATUS_CODE_ACCEPTED }); } Future> RESTClient::doTrace(const std::string& fullUrl, Optional optHeaders) { RESTUrl url(fullUrl, knobs.secure_connection); return doGetHeadDeleteOrTrace( HTTP::HTTP_VERB_TRACE, optHeaders, std::addressof(url), { HTTP::HTTP_STATUS_CODE_OK }); } // Only used to link unit tests void forceLinkRESTClientTests() {} TEST_CASE("fdbrpc/RESTClient") { RESTClient r; std::unordered_map knobs = r.getKnobs(); ASSERT_EQ(knobs["secure_connection"], RESTClientKnobs::SECURE_CONNECTION); ASSERT_EQ(knobs["connection_pool_size"], FLOW_KNOBS->RESTCLIENT_MAX_CONNECTIONPOOL_SIZE); ASSERT_EQ(knobs["connect_tries"], FLOW_KNOBS->RESTCLIENT_CONNECT_TRIES); ASSERT_EQ(knobs["connect_timeout"], FLOW_KNOBS->RESTCLIENT_CONNECT_TIMEOUT); ASSERT_EQ(knobs["max_connection_life"], FLOW_KNOBS->RESTCLIENT_MAX_CONNECTION_LIFE); ASSERT_EQ(knobs["request_tries"], FLOW_KNOBS->RESTCLIENT_REQUEST_TRIES); ASSERT_EQ(knobs["request_timeout_secs"], FLOW_KNOBS->RESTCLIENT_REQUEST_TIMEOUT_SEC); for (auto& itr : knobs) { itr.second++; } r.setKnobs(knobs); std::unordered_map updated = r.getKnobs(); for (auto& itr : updated) { ASSERT_EQ(knobs[itr.first], itr.second); } // invalid client knob knobs["foo"] = 100; try { r.setKnobs(knobs); ASSERT(false); } catch (Error& e) { if (e.code() != error_code_rest_invalid_rest_client_knob) { throw e; } } return Void(); }