Interface to enable clients to send/receive REST requests/responses (#6866)

* Interface to enable clients to send/receive REST requests/responses

Description

Major changes:
1. Add RESTClient interface enabling client to send/receive REST HTTP
   requests. Support REST APIs are: get, head, put, post, delete, trace
2. Add RESTUtil file introducing below interfaces:
 2.1. RESTUrl - Extract URI information: host, service, request-parameters.
 2.2. RESTConnectionPool-
      Connection establishment, life-cycle management, connection-pool (TTL)
 2.3. RESTClientKnobs - supports REST Knob parameter management and updates

Testing

Unit test - fdbrpc/RESTClient, fdbrpc/RESTUtils
This commit is contained in:
Ata E Husain Bohra 2022-04-27 12:17:52 -07:00 committed by GitHub
parent 6f841446a5
commit 333aadb903
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 911 additions and 4 deletions

View File

@ -65,7 +65,6 @@ set(FDBCLIENT_SRCS
GlobalConfig.actor.cpp
GrvProxyInterface.h
HighContentionPrefixAllocator.actor.h
HTTP.actor.cpp
IClientApi.h
IConfigTransaction.cpp
IConfigTransaction.h

View File

@ -26,7 +26,7 @@
#include "flow/Net2Packet.h"
#include "fdbclient/Knobs.h"
#include "fdbrpc/IRateControl.h"
#include "fdbclient/HTTP.h"
#include "fdbrpc/HTTP.h"
#include "fdbclient/JSONDoc.h"
// Representation of all the things you need to connect to a blob store instance with some credentials.

View File

@ -15,6 +15,7 @@ set(FDBRPC_SRCS
genericactors.actor.h
genericactors.actor.cpp
HealthMonitor.actor.cpp
HTTP.actor.cpp
IAsyncFile.actor.cpp
IPAllowList.cpp
LoadBalance.actor.cpp
@ -28,6 +29,10 @@ set(FDBRPC_SRCS
ReplicationPolicy.cpp
ReplicationTypes.cpp
ReplicationUtils.cpp
RESTClient.h
RESTClient.actor.cpp
RESTUtils.h
RESTUtils.actor.cpp
SimExternalConnection.actor.cpp
SimExternalConnection.h
Stats.actor.cpp

View File

@ -18,10 +18,12 @@
* limitations under the License.
*/
#include "fdbclient/HTTP.h"
#include "fdbrpc/HTTP.h"
#include "fdbclient/md5/md5.h"
#include "fdbclient/libb64/encode.h"
#include <cctype>
#include "flow/actorcompiler.h" // has to be last include
namespace HTTP {

View File

@ -18,6 +18,11 @@
* limitations under the License.
*/
#ifndef FDBRPC_HTTP_H
#define FDBRPC_HTTP_H
#pragma once
#include "flow/flow.h"
#include "flow/Net2Packet.h"
#include "fdbrpc/IRateControl.h"
@ -63,4 +68,27 @@ Future<Reference<Response>> doRequest(Reference<IConnection> const& conn,
int64_t* const& pSent,
Reference<IRateControl> const& recvRate,
const std::string& requestHeader = std::string());
constexpr int HTTP_STATUS_CODE_OK = 200;
constexpr int HTTP_STATUS_CODE_CREATED = 201;
constexpr int HTTP_STATUS_CODE_ACCEPTED = 202;
constexpr int HTTP_STATUS_CODE_NO_CONTENT = 204;
constexpr int HTTP_STATUS_CODE_UNAUTHORIZED = 401;
constexpr int HTTP_STATUS_CODE_NOT_ACCEPTABLE = 406;
constexpr int HTTP_STATUS_CODE_TOO_MANY_REQUESTS = 429;
constexpr int HTTP_STATUS_CODE_INTERNAL_SERVER_ERROR = 500;
constexpr int HTTP_STATUS_CODE_BAD_GATEWAY = 502;
constexpr int HTTP_STATUS_CODE_SERVICE_UNAVAILABLE = 503;
constexpr int HTTP_RETRYAFTER_DELAY_SECS = 300;
const std::string HTTP_VERB_GET = "GET";
const std::string HTTP_VERB_HEAD = "HEAD";
const std::string HTTP_VERB_DELETE = "DELETE";
const std::string HTTP_VERB_TRACE = "TRACE";
const std::string HTTP_VERB_PUT = "PUT";
const std::string HTTP_VERB_POST = "POST";
} // namespace HTTP
#endif

363
fdbrpc/RESTClient.actor.cpp Normal file
View File

@ -0,0 +1,363 @@
/*
* RESTClient.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbrpc/RESTClient.h"
#include "fdbrpc/HTTP.h"
#include "fdbrpc/IRateControl.h"
#include "fdbrpc/RESTUtils.h"
#include "flow/Arena.h"
#include "flow/Error.h"
#include "flow/FastRef.h"
#include "flow/Knobs.h"
#include "flow/Net2Packet.h"
#include "flow/flow.h"
#include "flow/network.h"
#include "flow/serialize.h"
#include "flow/Trace.h"
#include "flow/UnitTest.h"
#include <memory>
#include <unordered_map>
#include "flow/actorcompiler.h" // always the last include
json_spirit::mObject RESTClient::Stats::getJSON() {
json_spirit::mObject o;
o["host_service"] = host_service;
o["requests_failed"] = requests_failed;
o["requests_successful"] = requests_successful;
o["bytes_sent"] = bytes_sent;
return o;
}
RESTClient::Stats RESTClient::Stats::operator-(const Stats& rhs) {
Stats r(host_service);
r.requests_failed = requests_failed - rhs.requests_failed;
r.requests_successful = requests_successful - rhs.requests_successful;
r.bytes_sent = bytes_sent - rhs.bytes_sent;
return r;
}
RESTClient::RESTClient() {}
RESTClient::RESTClient(std::unordered_map<std::string, int>& knobSettings) {
knobs.set(knobSettings);
}
void RESTClient::setKnobs(const std::unordered_map<std::string, int>& knobSettings) {
knobs.set(knobSettings);
}
std::unordered_map<std::string, int> RESTClient::getKnobs() const {
return knobs.get();
}
ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<RESTClient> client,
std::string verb,
HTTP::Headers headers,
RESTUrl* url,
std::set<unsigned int> successCodes) {
state UnsentPacketQueue content;
state int contentLen = url->body.size();
if (url->body.size() > 0) {
PacketWriter pw(content.getWriteBuffer(url->body.size()), nullptr, Unversioned());
pw.serializeBytes(url->body);
}
std::string statsKey = RESTClient::getStatsKey(url->service, url->service);
auto sItr = client->statsMap.find(statsKey);
if (sItr == client->statsMap.end()) {
client->statsMap.emplace(statsKey, std::make_unique<RESTClient::Stats>(statsKey));
}
headers["Content-Length"] = format("%d", contentLen);
headers["Host"] = url->host;
state int maxTries = std::min(client->knobs.request_tries, client->knobs.connect_tries);
state int thisTry = 1;
state double nextRetryDelay = 2.0;
state Reference<IRateControl> sendReceiveRate = makeReference<Unlimited>();
state double reqTimeout = (client->knobs.request_timeout_secs * 1.0) / 60;
state RESTConnectionPoolKey connectPoolKey = RESTConnectionPool::getConnectionPoolKey(url->host, url->service);
state RESTClient::Stats* statsPtr = client->statsMap[statsKey].get();
loop {
state Optional<Error> err;
state Optional<NetworkAddress> remoteAddress;
state bool connectionEstablished = false;
state Reference<HTTP::Response> r;
try {
// Start connecting
Future<RESTConnectionPool::ReusableConnection> frconn = client->conectionPool->connect(
connectPoolKey, client->knobs.secure_connection, client->knobs.max_connection_life);
// Finish connecting, do request
state RESTConnectionPool::ReusableConnection rconn =
wait(timeoutError(frconn, client->knobs.connect_timeout));
connectionEstablished = true;
remoteAddress = rconn.conn->getPeerAddress();
Reference<HTTP::Response> _r = wait(timeoutError(HTTP::doRequest(rconn.conn,
verb,
url->resource,
headers,
contentLen > 0 ? &content : nullptr,
contentLen,
sendReceiveRate,
&statsPtr->bytes_sent,
sendReceiveRate),
reqTimeout));
r = _r;
// Since the response was parsed successfully (which is why we are here) reuse the connection unless we
// received the "Connection: close" header.
if (r->headers["Connection"] != "close") {
client->conectionPool->returnConnection(connectPoolKey, rconn, client->knobs.connection_pool_size);
}
rconn.conn.clear();
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
err = e;
}
// If err is not present then r is valid.
// If r->code is in successCodes then record the successful request and return r.
if (!err.present() && successCodes.count(r->code) != 0) {
statsPtr->requests_successful++;
return r;
}
// Otherwise, this request is considered failed. Update failure count.
statsPtr->requests_failed++;
// All errors in err are potentially retryable as well as certain HTTP response codes...
bool retryable = err.present() || r->code == HTTP::HTTP_STATUS_CODE_INTERNAL_SERVER_ERROR ||
r->code == HTTP::HTTP_STATUS_CODE_BAD_GATEWAY ||
r->code == HTTP::HTTP_STATUS_CODE_SERVICE_UNAVAILABLE ||
r->code == HTTP::HTTP_STATUS_CODE_TOO_MANY_REQUESTS;
// But only if our previous attempt was not the last allowable try.
retryable = retryable && (thisTry < maxTries);
TraceEvent event(SevWarn, retryable ? "RESTClient_FailedRetryable" : "RESTClient_RequestFailed");
// Attach err to trace event if present, otherwise extract some stuff from the response
if (err.present()) {
event.errorUnsuppressed(err.get());
}
event.suppressFor(60);
if (!err.present()) {
event.detail("ResponseCode", r->code);
}
event.detail("ConnectionEstablished", connectionEstablished);
if (remoteAddress.present())
event.detail("RemoteEndpoint", remoteAddress.get());
else
event.detail("RemoteHost", url->host);
event.detail("Verb", verb).detail("Resource", url->resource).detail("ThisTry", thisTry);
// If r is not valid or not code TOO_MANY_REQUESTS then increment the try count.
// TOO_MANY_REQUEST's will not count against the attempt limit.
if (!r || r->code != HTTP::HTTP_STATUS_CODE_TOO_MANY_REQUESTS) {
++thisTry;
}
// We will wait delay seconds before the next retry, start with nextRetryDelay.
double delay = nextRetryDelay;
// Double but limit the *next* nextRetryDelay.
nextRetryDelay = std::min(nextRetryDelay * 2, 60.0);
if (retryable) {
// If r is valid then obey the Retry-After response header if present.
if (r) {
auto iRetryAfter = r->headers.find("Retry-After");
if (iRetryAfter != r->headers.end()) {
event.detail("RetryAfterHeader", iRetryAfter->second);
char* pEnd;
double retryAfter = strtod(iRetryAfter->second.c_str(), &pEnd);
if (*pEnd) {
// If there were other characters then don't trust the parsed value
retryAfter = HTTP::HTTP_RETRYAFTER_DELAY_SECS;
}
// Update delay
delay = std::max(delay, retryAfter);
}
}
// Log the delay then wait.
event.detail("RetryDelay", delay);
wait(::delay(delay));
} else {
// We can't retry, so throw something.
// This error code means the authentication header was not accepted, likely the account or key is wrong.
if (r && r->code == HTTP::HTTP_STATUS_CODE_NOT_ACCEPTABLE) {
throw http_not_accepted();
}
if (r && r->code == HTTP::HTTP_STATUS_CODE_UNAUTHORIZED) {
throw http_auth_failed();
}
// Recognize and throw specific errors
if (err.present()) {
int code = err.get().code();
// If we get a timed_out error during the the connect() phase, we'll call that connection_failed despite
// the fact that there was technically never a 'connection' to begin with. It differentiates between an
// active connection timing out vs a connection timing out, though not between an active connection
// failing vs connection attempt failing.
// TODO: Add more error types?
if (code == error_code_timed_out && !connectionEstablished) {
throw connection_failed();
}
if (code == error_code_timed_out || code == error_code_connection_failed ||
code == error_code_lookup_failed) {
throw err.get();
}
}
throw http_request_failed();
}
}
}
Future<Reference<HTTP::Response>> RESTClient::doPutOrPost(const std::string& verb,
Optional<HTTP::Headers> optHeaders,
RESTUrl* url,
std::set<unsigned int> successCodes) {
HTTP::Headers headers;
if (optHeaders.present()) {
headers = optHeaders.get();
}
return doRequest_impl(Reference<RESTClient>::addRef(this), verb, headers, url, successCodes);
}
Future<Reference<HTTP::Response>> RESTClient::doPost(const std::string& fullUrl,
const std::string& requestBody,
Optional<HTTP::Headers> optHeaders) {
RESTUrl url(fullUrl, requestBody, knobs.secure_connection);
return doPutOrPost(HTTP::HTTP_VERB_POST, optHeaders, std::addressof(url), { HTTP::HTTP_STATUS_CODE_OK });
}
Future<Reference<HTTP::Response>> RESTClient::doPut(const std::string& fullUrl,
const std::string& requestBody,
Optional<HTTP::Headers> optHeaders) {
RESTUrl url(fullUrl, requestBody, knobs.secure_connection);
return doPutOrPost(
HTTP::HTTP_VERB_PUT,
optHeaders,
std::addressof(url),
// 201 - on successful resource create
// 200 / 204 - if target resource representation was successfully modified with the desired state
{ HTTP::HTTP_STATUS_CODE_OK, HTTP::HTTP_STATUS_CODE_CREATED, HTTP::HTTP_STATUS_CODE_NO_CONTENT });
}
Future<Reference<HTTP::Response>> RESTClient::doGetHeadDeleteOrTrace(const std::string& verb,
Optional<HTTP::Headers> optHeaders,
RESTUrl* url,
std::set<unsigned int> successCodes) {
HTTP::Headers headers;
if (optHeaders.present()) {
headers = optHeaders.get();
}
return doRequest_impl(Reference<RESTClient>::addRef(this), HTTP::HTTP_VERB_GET, headers, url, successCodes);
}
Future<Reference<HTTP::Response>> RESTClient::doGet(const std::string& fullUrl, Optional<HTTP::Headers> optHeaders) {
RESTUrl url(fullUrl, knobs.secure_connection);
return doGetHeadDeleteOrTrace(HTTP::HTTP_VERB_GET, optHeaders, std::addressof(url), { HTTP::HTTP_STATUS_CODE_OK });
}
Future<Reference<HTTP::Response>> RESTClient::doHead(const std::string& fullUrl, Optional<HTTP::Headers> optHeaders) {
RESTUrl url(fullUrl, knobs.secure_connection);
return doGetHeadDeleteOrTrace(HTTP::HTTP_VERB_HEAD, optHeaders, std::addressof(url), { HTTP::HTTP_STATUS_CODE_OK });
}
Future<Reference<HTTP::Response>> RESTClient::doDelete(const std::string& fullUrl, Optional<HTTP::Headers> optHeaders) {
RESTUrl url(fullUrl, knobs.secure_connection);
return doGetHeadDeleteOrTrace(
HTTP::HTTP_VERB_DELETE,
optHeaders,
std::addressof(url),
// 200 - action has been enacted.
// 202 - action will likely succeed, but, has not yet been enacted.
// 204 - action has been enated, no further information is to supplied.
{ HTTP::HTTP_STATUS_CODE_OK, HTTP::HTTP_STATUS_CODE_NO_CONTENT, HTTP::HTTP_STATUS_CODE_ACCEPTED });
}
Future<Reference<HTTP::Response>> RESTClient::doTrace(const std::string& fullUrl, Optional<HTTP::Headers> optHeaders) {
RESTUrl url(fullUrl, knobs.secure_connection);
return doGetHeadDeleteOrTrace(
HTTP::HTTP_VERB_TRACE, optHeaders, std::addressof(url), { HTTP::HTTP_STATUS_CODE_OK });
}
// Only used to link unit tests
void forceLinkRESTClientTests() {}
TEST_CASE("fdbrpc/RESTClient") {
RESTClient r;
std::unordered_map<std::string, int> knobs = r.getKnobs();
ASSERT_EQ(knobs["secure_connection"], RESTClientKnobs::SECURE_CONNECTION);
ASSERT_EQ(knobs["connection_pool_size"], FLOW_KNOBS->RESTCLIENT_MAX_CONNECTIONPOOL_SIZE);
ASSERT_EQ(knobs["connect_tries"], FLOW_KNOBS->RESTCLIENT_CONNECT_TRIES);
ASSERT_EQ(knobs["connect_timeout"], FLOW_KNOBS->RESTCLIENT_CONNECT_TIMEOUT);
ASSERT_EQ(knobs["max_connection_life"], FLOW_KNOBS->RESTCLIENT_MAX_CONNECTION_LIFE);
ASSERT_EQ(knobs["request_tries"], FLOW_KNOBS->RESTCLIENT_REQUEST_TRIES);
ASSERT_EQ(knobs["request_timeout_secs"], FLOW_KNOBS->RESTCLIENT_REQUEST_TIMEOUT_SEC);
for (auto& itr : knobs) {
itr.second++;
}
r.setKnobs(knobs);
std::unordered_map<std::string, int> updated = r.getKnobs();
for (auto& itr : updated) {
ASSERT_EQ(knobs[itr.first], itr.second);
}
// invalid client knob
knobs["foo"] = 100;
try {
r.setKnobs(knobs);
ASSERT(false);
} catch (Error& e) {
if (e.code() != error_code_rest_invalid_rest_client_knob) {
throw e;
}
}
return Void();
}

97
fdbrpc/RESTClient.h Normal file
View File

@ -0,0 +1,97 @@
/*
* RESTClient.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBRPC_RESTCLIENT_H
#define FDBRPC_RESTCLIENT_H
#include <memory>
#pragma once
#include "fdbclient/JSONDoc.h"
#include "fdbrpc/HTTP.h"
#include "fdbrpc/RESTUtils.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/flow.h"
#include "flow/Net2Packet.h"
// This interface enables sending REST HTTP requests and receiving REST HTTP responses from a resource identified by a
// URI.
class RESTClient : public ReferenceCounted<RESTClient> {
public:
struct Stats {
explicit Stats(const std::string& hService)
: host_service(hService), requests_successful(0), requests_failed(0), bytes_sent(0) {}
Stats operator-(const Stats& rhs);
void clear() { requests_failed = requests_successful = bytes_sent = 0; }
json_spirit::mObject getJSON();
std::string host_service;
int64_t requests_successful;
int64_t requests_failed;
int64_t bytes_sent;
};
RESTClientKnobs knobs;
Reference<RESTConnectionPool> conectionPool;
// Connection stats maintained per "host:service"
std::unordered_map<std::string, std::unique_ptr<Stats>> statsMap;
RESTClient();
explicit RESTClient(std::unordered_map<std::string, int>& params);
void setKnobs(const std::unordered_map<std::string, int>& knobSettings);
std::unordered_map<std::string, int> getKnobs() const;
// Supports common REST APIs.
// On invocation of below methods, input 'fullUrl' is parsed using RESTUrl interface,
// RESTConnectionPool is used to leverage cached connection if any for 'host:service' pair. API then leverage
// HTTP::doRequest to accomplish the specified operation
Future<Reference<HTTP::Response>> doGet(const std::string& fullUrl,
Optional<HTTP::Headers> optHeaders = Optional<HTTP::Headers>());
Future<Reference<HTTP::Response>> doHead(const std::string& fullUrl,
Optional<HTTP::Headers> optHeaders = Optional<HTTP::Headers>());
Future<Reference<HTTP::Response>> doDelete(const std::string& fullUrl,
Optional<HTTP::Headers> optHeaders = Optional<HTTP::Headers>());
Future<Reference<HTTP::Response>> doTrace(const std::string& fullUrl,
Optional<HTTP::Headers> optHeaders = Optional<HTTP::Headers>());
Future<Reference<HTTP::Response>> doPut(const std::string& fullUrl,
const std::string& requestBody,
Optional<HTTP::Headers> optHeaders = Optional<HTTP::Headers>());
Future<Reference<HTTP::Response>> doPost(const std::string& fullUrl,
const std::string& requestBody,
Optional<HTTP::Headers> optHeaders = Optional<HTTP::Headers>());
static std::string getStatsKey(const std::string& host, const std::string& service) { return host + ":" + service; }
private:
Future<Reference<HTTP::Response>> doGetHeadDeleteOrTrace(const std::string& verb,
Optional<HTTP::Headers> optHeaders,
RESTUrl* url,
std::set<unsigned int> successCodes);
Future<Reference<HTTP::Response>> doPutOrPost(const std::string& verb,
Optional<HTTP::Headers> headers,
RESTUrl* url,
std::set<unsigned int> successCodes);
};
#endif

276
fdbrpc/RESTUtils.actor.cpp Normal file
View File

@ -0,0 +1,276 @@
/*
* RESTUtils.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbrpc/RESTUtils.h"
#include "flow/flat_buffers.h"
#include "flow/UnitTest.h"
#include <boost/algorithm/string.hpp>
#include "flow/actorcompiler.h" // always the last include
namespace {
std::unordered_set<std::string> protocols = { "http", "https" };
bool isProtocolSupported(const std::string& protocol) {
return protocols.find(protocol) != protocols.end();
}
bool isSecurePrototol(const std::string& protocol) {
return protocol.compare("https") == 0;
}
} // namespace
RESTClientKnobs::RESTClientKnobs() {
secure_connection = RESTClientKnobs::SECURE_CONNECTION;
connection_pool_size = FLOW_KNOBS->RESTCLIENT_MAX_CONNECTIONPOOL_SIZE;
connect_tries = FLOW_KNOBS->RESTCLIENT_CONNECT_TRIES;
connect_timeout = FLOW_KNOBS->RESTCLIENT_CONNECT_TIMEOUT;
max_connection_life = FLOW_KNOBS->RESTCLIENT_MAX_CONNECTION_LIFE;
request_tries = FLOW_KNOBS->RESTCLIENT_REQUEST_TRIES;
request_timeout_secs = FLOW_KNOBS->RESTCLIENT_REQUEST_TIMEOUT_SEC;
knobMap["connection_pool_size"] = std::addressof(connection_pool_size);
knobMap["pz"] = std::addressof(connection_pool_size);
knobMap["secure_connection"] = std::addressof(secure_connection);
knobMap["sc"] = std::addressof(secure_connection);
knobMap["connect_tries"] = std::addressof(connect_tries);
knobMap["ct"] = std::addressof(connect_tries);
knobMap["connect_timeout"] = std::addressof(connect_timeout);
knobMap["cto"] = std::addressof(connect_timeout);
knobMap["max_connection_life"] = std::addressof(max_connection_life);
knobMap["mcl"] = std::addressof(max_connection_life);
knobMap["request_tries"] = std::addressof(request_tries);
knobMap["rt"] = std::addressof(request_tries);
knobMap["request_timeout_secs"] = std::addressof(request_timeout_secs);
knobMap["rtom"] = std::addressof(request_timeout_secs);
}
void RESTClientKnobs::set(const std::unordered_map<std::string, int>& knobSettings) {
TraceEvent trace = TraceEvent("RESTClient_SetKnobs");
for (const auto& itr : knobSettings) {
const auto& kItr = RESTClientKnobs::knobMap.find(itr.first);
if (kItr == RESTClientKnobs::knobMap.end()) {
trace.detail("RESTClient_InvalidKnobName", itr.first);
throw rest_invalid_rest_client_knob();
}
*(kItr->second) = itr.second;
trace.detail(itr.first.c_str(), itr.second);
}
}
std::unordered_map<std::string, int> RESTClientKnobs::get() const {
std::unordered_map<std::string, int> details = {
{ "connection_pool_size", connection_pool_size },
{ "secure_connection", secure_connection },
{ "connect_tries", connect_tries },
{ "connect_timeout", connect_timeout },
{ "max_connection_life", max_connection_life },
{ "request_tries", request_tries },
{ "request_timeout_secs", request_timeout_secs },
};
return details;
}
ACTOR Future<RESTConnectionPool::ReusableConnection> connect_impl(Reference<RESTConnectionPool> connectionPool,
RESTConnectionPoolKey connectKey,
bool isSecure,
int maxConnLife) {
auto poolItr = connectionPool->connectionPoolMap.find(connectKey);
if (poolItr == connectionPool->connectionPoolMap.end()) {
throw rest_connectpool_key_not_found();
}
while (!poolItr->second.empty()) {
RESTConnectionPool::ReusableConnection rconn = poolItr->second.front();
poolItr->second.pop();
if (rconn.expirationTime > now()) {
TraceEvent("RESTClient_ReusableConnection")
.suppressFor(60)
.detail("RemoteEndpoint", rconn.conn->getPeerAddress())
.detail("ExpireIn", rconn.expirationTime - now());
return rconn;
}
}
state Reference<IConnection> conn =
wait(INetworkConnections::net()->connect(connectKey.first, connectKey.second, isSecure));
wait(conn->connectHandshake());
return RESTConnectionPool::ReusableConnection({ conn, now() + maxConnLife });
}
Future<RESTConnectionPool::ReusableConnection> RESTConnectionPool::connect(RESTConnectionPoolKey connectKey,
const bool isSecure,
const int maxConnLife) {
return connect_impl(Reference<RESTConnectionPool>::addRef(this), connectKey, isSecure, maxConnLife);
}
void RESTConnectionPool::returnConnection(RESTConnectionPoolKey connectKey,
ReusableConnection& rconn,
const int maxConnections) {
auto poolItr = connectionPoolMap.find(connectKey);
if (poolItr == connectionPoolMap.end()) {
throw rest_connectpool_key_not_found();
}
// If it expires in the future then add it to the pool in the front iff connection pool size is not maxed
if (rconn.expirationTime > now() && poolItr->second.size() < maxConnections) {
poolItr->second.push(rconn);
}
rconn.conn = Reference<IConnection>();
}
RESTUrl::RESTUrl(const std::string& fUrl, const bool isSecure) {
parseUrl(fUrl, isSecure);
}
RESTUrl::RESTUrl(const std::string& fullUrl, const std::string& b, const bool isSecure) : body(b) {
parseUrl(fullUrl, isSecure);
}
void RESTUrl::parseUrl(const std::string& fullUrl, const bool isSecure) {
// Sample valid URIs
// 1. With 'host' & 'resource' := '<protocol>://<host>/<resource>'
// 2. With 'host', 'service' & 'resource' := '<protocol>://<host>:port/<resource>'
// 3. With 'host', 'service', 'resource' & 'reqParameters' := '<protocol>://<host>:port/<resource>?<parameter-list>'
try {
StringRef t(fullUrl);
StringRef p = t.eat("://");
std::string protocol = p.toString();
boost::algorithm::to_lower(protocol);
if (!isProtocolSupported(protocol)) {
throw format("Invalid REST URI protocol '%s'", protocol.c_str());
}
// Ensure connection secure knob setting matches with the input URI
if ((isSecurePrototol(protocol) && !isSecure) || (!isSecurePrototol(protocol) && isSecure)) {
throw format("Invalid REST URI protocol secure knob '%s'", fullUrl.c_str());
}
// extract 'resource' and optional 'parameter list' if supplied in the URL
uint8_t foundSeparator = 0;
StringRef hostPort = t.eatAny("/?", &foundSeparator);
if (foundSeparator == '/') {
resource = t.eat("?").toString();
reqParameters = t.eat().toString();
}
// hostPort is at least a host or IP address, optionally followed by :portNumber or :serviceName
StringRef hRef(hostPort);
StringRef h = hRef.eat(":");
if (h.size() == 0) {
throw std::string("host cannot be empty");
}
host = h.toString();
service = hRef.eat().toString();
TraceEvent("RESTClient_ParseURI")
.detail("URI", fullUrl)
.detail("Host", host)
.detail("Service", service)
.detail("Resource", resource)
.detail("ReqParameters", reqParameters);
} catch (std::string& err) {
TraceEvent("RESTClient_ParseError").detail("URI", fullUrl).detail("Error", err);
throw rest_invalid_uri();
}
}
// Only used to link unit tests
void forceLinkRESTUtilsTests() {}
TEST_CASE("fdbrpc/RESTUtils") {
// invalid protocol
try {
std::string uri("httpx://foo/bar");
RESTUrl r(uri, false);
ASSERT(false);
} catch (Error& e) {
if (e.code() != error_code_rest_invalid_uri) {
throw e;
}
}
// mismatch protocol and knob values
try {
std::string uri("http://foo/bar");
RESTUrl r(uri, true);
ASSERT(false);
} catch (Error& e) {
if (e.code() != error_code_rest_invalid_uri) {
throw e;
}
}
// missing host
try {
std::string uri("https://:/bar");
RESTUrl r(uri, true);
ASSERT(false);
} catch (Error& e) {
if (e.code() != error_code_rest_invalid_uri) {
throw e;
}
}
// valid URI with service
try {
std::string uri("https://host:80/foo/bar");
RESTUrl r(uri, true);
ASSERT_EQ(r.host.compare("host"), 0);
ASSERT_EQ(r.service.compare("80"), 0);
ASSERT_EQ(r.resource.compare("foo/bar"), 0);
} catch (Error& e) {
throw e;
}
// valid URI with-out service
try {
std::string uri("https://host/foo/bar");
RESTUrl r(uri, true);
ASSERT_EQ(r.host.compare("host"), 0);
ASSERT(r.service.empty());
ASSERT_EQ(r.resource.compare("foo/bar"), 0);
} catch (Error& e) {
throw e;
}
// valid URI with parameters
try {
std::string uri("https://host/foo/bar?param1,param2");
RESTUrl r(uri, true);
ASSERT_EQ(r.host.compare("host"), 0);
ASSERT(r.service.empty());
ASSERT_EQ(r.resource.compare("foo/bar"), 0);
ASSERT_EQ(r.reqParameters.compare("param1,param2"), 0);
} catch (Error& e) {
throw e;
}
// ensure RESTClient::Knob default values and updates
return Void();
}

113
fdbrpc/RESTUtils.h Normal file
View File

@ -0,0 +1,113 @@
/*
* RESTUtils.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDRPC_REST_UTILS_H
#define FDRPC_REST_UTILS_H
#pragma once
#include "flow/flow.h"
#include "flow/FastRef.h"
#include "flow/Net2Packet.h"
#include <unordered_map>
#include <utility>
// Util interface managing REST active connection pool.
// The interface internally constructs and maintains map {"host:service" -> activeConnection}; any new connection
// request would first access cached connection if possible (not expired), if none exists, it would establish a new
// connection and return to the caller. Caller on accomplishing the task at-hand, should return the connection back to
// the pool.
using RESTConnectionPoolKey = std::pair<std::string, std::string>;
class RESTConnectionPool : public ReferenceCounted<RESTConnectionPool> {
public:
struct ReusableConnection {
Reference<IConnection> conn;
double expirationTime;
};
// Maximum number of connections cached in the connection-pool.
int maxConnPerConnectKey;
std::map<RESTConnectionPoolKey, std::queue<ReusableConnection>> connectionPoolMap;
RESTConnectionPool(const int maxConnsPerKey) : maxConnPerConnectKey(maxConnsPerKey) {}
// Routine is responsible to provide an usable TCP connection object; it reuses an active connection from
// connection-pool if availalbe, otherwise, establish a new TCP connection
Future<ReusableConnection> connect(RESTConnectionPoolKey connectKey, const bool isSecure, const int maxConnLife);
void returnConnection(RESTConnectionPoolKey connectKey, ReusableConnection& conn, const int maxConnections);
static RESTConnectionPoolKey getConnectionPoolKey(const std::string& host, const std::string& service) {
return std::make_pair(host, service);
}
};
// Util interface facilitating management and update for RESTClient knob parameters
struct RESTClientKnobs {
int connection_pool_size, secure_connection, connect_timeout, connect_tries, max_connection_life, request_tries,
request_timeout_secs;
constexpr static int SECURE_CONNECTION = 1;
constexpr static int NOT_SECURE_CONNECTION = 0;
RESTClientKnobs();
void set(const std::unordered_map<std::string, int>& knobSettings);
std::unordered_map<std::string, int> get() const;
std::unordered_map<std::string, int*> knobMap;
static std::vector<std::string> getKnobDescriptions() {
return {
"connection_pool_size (pz) Maximum numbers of active connections in the connection-pool",
"secure_connection (or sc) Set 1 for secure connection and 0 for insecure connection.",
"connect_tries (or ct) Number of times to try to connect for each request.",
"connect_timeout (or cto) Number of seconds to wait for a connect request to succeed.",
"max_connection_life (or mcl) Maximum number of seconds to use a single TCP connection.",
"request_tries (or rt) Number of times to try each request until a parsable HTTP "
"response other than 429 is received.",
"request_timeout_secs (or rtom) Number of seconds to wait for a request to succeed after a "
"connection is established.",
};
}
};
// Util interface facilitating parsing of an input REST 'full_url'
struct RESTUrl {
public:
// Connection resources - host and port details
std::string host;
std::string service;
// resource identified by URI
std::string resource;
// optional REST request parameters
std::string reqParameters;
// Request 'body' payload
std::string body;
explicit RESTUrl(const std::string& fullUrl, const bool isSecure);
explicit RESTUrl(const std::string& fullUrl, const std::string& body, const bool isSecure);
private:
void parseUrl(const std::string& fullUrl, bool isSecure);
};
#endif

View File

@ -30,7 +30,7 @@ void forceLinkMemcpyTests();
void forceLinkMemcpyPerfTests();
#if (!defined(TLS_DISABLED) && !defined(_WIN32))
void forceLinkStreamCipherTests();
void forceLinkBLockCiherTests();
void forceLinkBlobCipherTests();
#endif
void forceLinkParallelStreamTests();
void forceLinkSimExternalConnectionTests();
@ -39,6 +39,8 @@ void forceLinkSimKmsConnectorTests();
void forceLinkIThreadPoolTests();
void forceLinkTokenSignTests();
void forceLinkVersionVectorTests();
void forceLinkRESTClientTests();
void forceLinkRESTUtilsTests();
struct UnitTestWorkload : TestWorkload {
bool enabled;
@ -88,6 +90,8 @@ struct UnitTestWorkload : TestWorkload {
forceLinkIThreadPoolTests();
forceLinkTokenSignTests();
forceLinkVersionVectorTests();
forceLinkRESTClientTests();
forceLinkRESTUtilsTests();
}
std::string description() const override { return "UnitTests"; }

View File

@ -276,6 +276,14 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
if ( randomize && BUGGIFY) { ENCRYPT_CIPHER_KEY_CACHE_TTL = deterministicRandom()->randomInt(50, 100); }
init( ENCRYPT_KEY_REFRESH_INTERVAL, isSimulated ? 60 : 8 * 60 );
if ( randomize && BUGGIFY) { ENCRYPT_KEY_REFRESH_INTERVAL = deterministicRandom()->randomInt(20, 40); }
// REST Client
init( RESTCLIENT_MAX_CONNECTIONPOOL_SIZE, 10 );
init( RESTCLIENT_CONNECT_TRIES, 10 );
init( RESTCLIENT_CONNECT_TIMEOUT, 10 );
init( RESTCLIENT_MAX_CONNECTION_LIFE, 120 );
init( RESTCLIENT_REQUEST_TRIES, 10 );
init( RESTCLIENT_REQUEST_TIMEOUT_SEC, 120 );
}
// clang-format on

View File

@ -344,6 +344,14 @@ public:
int64_t ENCRYPT_CIPHER_KEY_CACHE_TTL;
int64_t ENCRYPT_KEY_REFRESH_INTERVAL;
// RESTClient
int RESTCLIENT_MAX_CONNECTIONPOOL_SIZE;
int RESTCLIENT_CONNECT_TRIES;
int RESTCLIENT_CONNECT_TIMEOUT;
int RESTCLIENT_MAX_CONNECTION_LIFE;
int RESTCLIENT_REQUEST_TRIES;
int RESTCLIENT_REQUEST_TIMEOUT_SEC;
FlowKnobs(class Randomize, class IsSimulated);
void initialize(class Randomize, class IsSimulated);
};

View File

@ -146,6 +146,10 @@ ERROR( file_corrupt, 1522, "A structurally corrupt data file was detected" )
ERROR( http_request_failed, 1523, "HTTP response code not received or indicated failure" )
ERROR( http_auth_failed, 1524, "HTTP request failed due to bad credentials" )
ERROR( http_bad_request_id, 1525, "HTTP response contained an unexpected X-Request-ID header" )
ERROR( rest_invalid_uri, 1526, "Invalid REST URI")
ERROR( rest_invalid_rest_client_knob, 1527, "Invalid RESTClient knob")
ERROR( rest_connectpool_key_not_found, 1528, "ConnectKey not found in connection pool")
// 2xxx Attempt (presumably by a _client_) to do something illegal. If an error is known to
// be internally caused, it should be 41xx