1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-24 08:09:58 +08:00

Rearrange things no that the backoff delay has no impact unless it's needed.

This commit is contained in:
A.J. Beamon 2021-04-07 15:23:50 -07:00
parent 36f4c17ef1
commit 040ba0c587

@ -78,10 +78,10 @@ Optional<LoadBalancedReply> getLoadBalancedReply(const void*);
// Stores state for a request made by the load balancer
template <class Request>
struct RequestData : NonCopyable {
Future<ErrorOr<REPLY_TYPE(Request)>> response;
typedef ErrorOr<REPLY_TYPE(Request)> Reply;
Future<Reply> response;
Reference<ModelHolder> modelHolder;
Future<Void> backoffDelay;
RequestStream<Request> const* stream = nullptr;
bool triedAllOptions = false;
bool requestStarted = false; // true once the request has been sent to an alternative
@ -91,36 +91,38 @@ struct RequestData : NonCopyable {
// This is true once setupRequest is called, even though at that point the response is Never().
bool isValid() { return response.isValid(); }
// Initializes the request state and starts the backoff delay
void setupRequest(double backoff, bool triedAllOptions, RequestStream<Request> const* stream) {
backoffDelay = (backoff > 0) ? delay(backoff) : Void();
response = Never();
// Initializes the request state and starts it, possibly after a backoff delay
void startRequest(double backoff,
bool triedAllOptions,
RequestStream<Request> const* stream,
Request const& request,
QueueModel* model) {
modelHolder = Reference<ModelHolder>();
requestStarted = false;
if (backoff > 0) {
response = mapAsync<Void, std::function<Future<Reply>(Void)>, Reply>(
delay(backoff), [this, stream, &request, model](Void _) {
requestStarted = true;
modelHolder = Reference<ModelHolder>(new ModelHolder(model, stream->getEndpoint().token.first()));
return stream->tryGetReply(request);
});
} else {
requestStarted = true;
modelHolder = Reference<ModelHolder>(new ModelHolder(model, stream->getEndpoint().token.first()));
response = stream->tryGetReply(request);
}
requestProcessed = false;
this->stream = stream;
this->triedAllOptions = triedAllOptions;
}
// Sends the request to the configured stream
// This should not be called until after setupRequest has been called and the backoff delay has elapsed
void startRequest(Request request, QueueModel* model) {
ASSERT(stream);
ASSERT(backoffDelay.isReady());
backoffDelay = Never();
modelHolder = Reference<ModelHolder>(new ModelHolder(model, stream->getEndpoint().token.first()));
response = stream->tryGetReply(request);
requestStarted = true;
}
// Implementation of the logic to handle a response.
// Checks the state of the response, updates the queue model, and returns one of the following outcomes:
// A return value of true means that the request completed successfully
// A return value of false means that the request failed but should be retried
// A return value with an error means that the error should be thrown back to original caller
static ErrorOr<bool> checkAndProcessResultImpl(ErrorOr<REPLY_TYPE(Request)> result,
static ErrorOr<bool> checkAndProcessResultImpl(Reply result,
Reference<ModelHolder> modelHolder,
bool atMostOnce,
bool triedAllOptions) {
@ -189,7 +191,7 @@ struct RequestData : NonCopyable {
if (outcome.isError()) {
throw outcome.getError();
} else if (!outcome.get()) {
response = Future<ErrorOr<REPLY_TYPE(Request)>>();
response = Future<Reply>();
}
return outcome.get();
@ -215,11 +217,10 @@ struct RequestData : NonCopyable {
// We need to process the lagging request in order to update the queue model
Reference<ModelHolder> holderCapture = std::move(modelHolder);
bool triedAllOptionsCapture = triedAllOptions;
Future<Void> updateModel =
map(response, [holderCapture, triedAllOptionsCapture](ErrorOr<REPLY_TYPE(Request)> result) {
checkAndProcessResultImpl(result, holderCapture, false, triedAllOptionsCapture);
return Void();
});
Future<Void> updateModel = map(response, [holderCapture, triedAllOptionsCapture](Reply result) {
checkAndProcessResultImpl(result, holderCapture, false, triedAllOptionsCapture);
return Void();
});
model->addActor.send(updateModel);
}
@ -453,25 +454,18 @@ Future<REPLY_TYPE(Request)> loadBalance(
numAttempts = 0; // now that we've got a server back, reset the backoff
} else if (!stream) {
// Only the first location is available.
loop choose {
when(wait(firstRequestData.backoffDelay)) { firstRequestData.startRequest(request, model); }
when(ErrorOr<REPLY_TYPE(Request)> result = wait(firstRequestData.response)) {
if (firstRequestData.checkAndProcessResult(atMostOnce)) {
return result.get();
}
firstRequestEndpoint = Optional<uint64_t>();
break;
}
ErrorOr<REPLY_TYPE(Request)> result = wait(firstRequestData.response);
if (firstRequestData.checkAndProcessResult(atMostOnce)) {
return result.get();
}
firstRequestEndpoint = Optional<uint64_t>();
} else if (firstRequestData.isValid()) {
// Issue a second request, the first one is taking a long time.
secondRequestData.setupRequest(backoff, triedAllOptions, stream);
secondRequestData.startRequest(backoff, triedAllOptions, stream, request, model);
state bool firstFinished = false;
loop choose {
when(wait(firstRequestData.backoffDelay)) { firstRequestData.startRequest(request, model); }
when(wait(secondRequestData.backoffDelay)) { secondRequestData.startRequest(request, model); }
when(ErrorOr<REPLY_TYPE(Request)> result =
wait(firstRequestData.response.isValid() ? firstRequestData.response : Never())) {
if (firstRequestData.checkAndProcessResult(atMostOnce)) {
@ -497,12 +491,11 @@ Future<REPLY_TYPE(Request)> loadBalance(
}
} else {
// Issue a request, if it takes too long to get a reply, go around the loop
firstRequestData.setupRequest(backoff, triedAllOptions, stream);
firstRequestData.startRequest(backoff, triedAllOptions, stream, request, model);
firstRequestEndpoint = stream->getEndpoint().token.first();
loop {
choose {
when(wait(firstRequestData.backoffDelay)) { firstRequestData.startRequest(request, model); }
when(ErrorOr<REPLY_TYPE(Request)> result = wait(firstRequestData.response)) {
if (model) {
model->secondMultiplier =