diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 62d8e48b2e..cea39db283 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -22,6 +22,7 @@ #include "fdbserver/Ratekeeper.h" #include "fdbserver/TagThrottler.h" #include "fdbserver/WaitFailure.h" +#include "flow/OwningResource.h" #include "flow/actorcompiler.h" // must be last include @@ -147,7 +148,11 @@ public: } } - ACTOR static Future trackStorageServerQueueInfo(Ratekeeper* self, StorageServerInterface ssi) { + ACTOR static Future trackStorageServerQueueInfo(ResourceWeakRef self, + StorageServerInterface ssi) { + if (!self.available()) { + return Void(); + } self->storageQueueInfo.insert(mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality))); state Map::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id()); TraceEvent("RkTracking", self->id) @@ -157,6 +162,9 @@ public: loop { ErrorOr reply = wait(ssi.getQueuingMetrics.getReplyUnlessFailedFor( StorageQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply? + if (!self.available()) { + return Void(); + } if (reply.present()) { myQueueInfo->value.update(reply.get(), self->smoothTotalDurableBytes); myQueueInfo->value.acceptingRequests = ssi.isAcceptingRequests(); @@ -173,7 +181,10 @@ public: } } catch (...) { // including cancellation - self->storageQueueInfo.erase(myQueueInfo); + if (self.available()) { + self->storageQueueInfo.erase(myQueueInfo); + self->storageServerInterfaces.erase(ssi.id()); + } throw; } } @@ -207,10 +218,12 @@ public: } ACTOR static Future trackEachStorageServer( - Ratekeeper* self, + ResourceWeakRef self, FutureStream>> serverChanges) { - state Map> actors; + + state std::unordered_map> storageServerTrackers; state Promise err; + loop choose { when(state std::pair> change = waitNext(serverChanges)) { wait(delay(0)); // prevent storageServerTracker from getting cancelled while on the call stack @@ -218,15 +231,23 @@ public: const UID& id = change.first; if (change.second.present()) { if (!change.second.get().isTss()) { - auto& a = actors[change.first]; + + auto& a = storageServerTrackers[change.first]; a = Future(); a = splitError(trackStorageServerQueueInfo(self, change.second.get()), err); + if (!self.available()) { + return Void(); + } self->storageServerInterfaces[id] = change.second.get(); } } else { + storageServerTrackers.erase(id); + + if (!self.available()) { + return Void(); + } self->storageServerInterfaces.erase(id); - actors.erase(id); } } when(wait(err.getFuture())) {} @@ -234,7 +255,9 @@ public: } ACTOR static Future run(RatekeeperInterface rkInterf, Reference const> dbInfo) { - state Ratekeeper self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)); + state ResourceOwningRef pSelf( + new Ratekeeper(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True))); + state Ratekeeper& self = *pSelf; state Future timeout = Void(); state std::vector> tlogTrackers; state std::vector tlogInterfs; @@ -247,7 +270,7 @@ public: PromiseStream>> serverChanges; self.addActor.send(self.monitorServerListChange(serverChanges)); - self.addActor.send(self.trackEachStorageServer(serverChanges.getFuture())); + self.addActor.send(RatekeeperImpl::trackEachStorageServer(pSelf, serverChanges.getFuture())); self.addActor.send(traceRole(Role::RATEKEEPER, rkInterf.id())); self.addActor.send(self.monitorThrottlingChanges()); @@ -405,15 +428,6 @@ Future Ratekeeper::monitorServerListChange( return RatekeeperImpl::monitorServerListChange(this, serverChanges); } -Future Ratekeeper::trackEachStorageServer( - FutureStream>> serverChanges) { - return RatekeeperImpl::trackEachStorageServer(this, serverChanges); -} - -Future Ratekeeper::trackStorageServerQueueInfo(StorageServerInterface ssi) { - return RatekeeperImpl::trackStorageServerQueueInfo(this, ssi); -} - Future Ratekeeper::trackTLogQueueInfo(TLogInterface tli) { return RatekeeperImpl::trackTLogQueueInfo(this, tli); } @@ -957,12 +971,11 @@ ACTOR Future ratekeeper(RatekeeperInterface rkInterf, Reference(id.toString() + "/BusiestWriteTag")), valid(false), - id(id), locality(locality), acceptingRequests(false), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), - smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), - limitReason(limitReason_t::unlimited) { + : valid(false), id(id), locality(locality), acceptingRequests(false), + smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), + verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), + smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), + smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited) { // FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo lastReply.instanceID = -1; } diff --git a/fdbserver/include/fdbserver/Ratekeeper.h b/fdbserver/include/fdbserver/Ratekeeper.h index c58b3b2d2f..6af84d099b 100644 --- a/fdbserver/include/fdbserver/Ratekeeper.h +++ b/fdbserver/include/fdbserver/Ratekeeper.h @@ -18,6 +18,9 @@ * limitations under the License. */ +#ifndef FDBSERVER_RATEKEEPER_H +#define FDBSERVER_RATEKEEPER_H + #pragma once #include "fdbclient/DatabaseConfiguration.h" @@ -49,7 +52,6 @@ enum limitReason_t { class StorageQueueInfo { uint64_t totalWriteCosts{ 0 }; int totalWriteOps{ 0 }; - Reference busiestWriteTagEventHolder; // refresh periodically TransactionTagMap tagCostEst; @@ -167,7 +169,6 @@ class Ratekeeper { void updateRate(RatekeeperLimits* limits); Future refreshStorageServerCommitCosts(); Future monitorServerListChange(PromiseStream>> serverChanges); - Future trackEachStorageServer(FutureStream>> serverChanges); // SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function Future trackStorageServerQueueInfo(StorageServerInterface); @@ -180,3 +181,5 @@ class Ratekeeper { public: static Future run(RatekeeperInterface rkInterf, Reference const> dbInfo); }; + +#endif // FDBSERVER_RATEKEEPER_H \ No newline at end of file diff --git a/flow/OwningResource.h b/flow/OwningResource.h new file mode 100644 index 0000000000..5636aac9de --- /dev/null +++ b/flow/OwningResource.h @@ -0,0 +1,119 @@ +/* + * OwningResource.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLOW_SAFE_ACCESS_REF_H +#define FLOW_SAFE_ACCESS_REF_H + +#include "flow/FastRef.h" + +// Consider the following situation: +// +// 1. An ACTOR A0 allocates an object O +// 2. A0 spawns another ACTOR A1, which depends on O +// 3. A0 triggers A1 and then terminates, destroying O +// 4. Since A1 is triggered by A0 while not knowing A0 is terminated and O is released, it would cause a SEGV error. +// +// In this header file, two classes +// +// * ResourceOwningRef +// * ResourceWeakRef +// +// are provided. The ResourceOwningRef is the reference that "holds" the resource, When it is destructed, the resource +// is also released; while ResourceWeakRef is the reference that "weakly holds" the resource. Before each access, it is +// the user's responsibility to verify if the resource is still available, via the available() method. +// +// With the two classes, the issue above can be solved by: +// +// 1. A0 allocates the object O via ResourceOwningRef +// 2. A0 forwards O to A1, via ResourceWeakRef +// 3. Every time A1 accesses O, it will verify if the resource is still available. +// 4. When A0 terminates, O is released and all ResourceWeakRef available() call will report the resource is not +// available anymore, preventing the SEGV error being raised. + +namespace details { + +// The class holding the pointer to the resource. +// SOMEDAY: Think using std::unique_ptr +template +struct Resource : public ReferenceCounted>, NonCopyable { + T* resource; + + Resource(T* resource_) : resource(resource_) {} + ~Resource() { delete resource; } + + void reset(T* resource_) { + delete resource; + resource = resource_; + } +}; + +template +class ResourceRef { +protected: + Reference> resourceRef; + +public: + ResourceRef(const Reference>& ref) : resourceRef(ref) {} + ResourceRef(Reference>&& ref) : resourceRef(std::move(ref)) {} + ResourceRef& operator=(const Reference>& ref) { + resourceRef = ref.resourceRef; + return *this; + } + ResourceRef& operator=(Reference>&& ref) { + resourceRef = std::move(ref); + return *this; + } + + T* operator->() { return resourceRef->resource; } + T& operator*() { + if (resourceRef->resource == nullptr) { + throw internal_error(); + } else { + return *(resourceRef->resource); + } + } + + bool available() const { return resourceRef->resource != nullptr; } +}; + +} // namespace details + +// The class that holds a Reference to the details::Resource which holds the real object. If the instance is destroyed, +// the object is destroyed, too. +template +class ResourceOwningRef : public details::ResourceRef, NonCopyable { + template + friend class ResourceWeakRef; + +public: + ResourceOwningRef(T* resource) : details::ResourceRef(makeReference>(resource)) {} + ~ResourceOwningRef() { details::ResourceRef::resourceRef->reset(nullptr); } +}; + +// The class that weakly holds a Reference tot he etails::Resource. Destroying the reference will have no impact to the +// real object. On the other hand, each time accessing the object requires a verification that the object is still alive +template +class ResourceWeakRef : public details::ResourceRef { +public: + ResourceWeakRef(const ResourceOwningRef& ref) : details::ResourceRef(ref.resourceRef) {} + ResourceWeakRef(const ResourceWeakRef& ref) : details::ResourceRef(ref.resourceRef) {} +}; + +#endif // FLOW_SAFE_ACCESS_REF_H \ No newline at end of file