mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-15 02:18:39 +08:00
fixup! Non-owning reference to an object
See documents in flow/OwningResource.h
This commit is contained in:
parent
26877a8924
commit
cf04afe925
@ -22,6 +22,7 @@
|
||||
#include "fdbserver/Ratekeeper.h"
|
||||
#include "fdbserver/TagThrottler.h"
|
||||
#include "fdbserver/WaitFailure.h"
|
||||
#include "flow/OwningResource.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // must be last include
|
||||
|
||||
@ -147,7 +148,11 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> trackStorageServerQueueInfo(Ratekeeper* self, StorageServerInterface ssi) {
|
||||
ACTOR static Future<Void> trackStorageServerQueueInfo(ResourceWeakRef<Ratekeeper> self,
|
||||
StorageServerInterface ssi) {
|
||||
if (!self.available()) {
|
||||
return Void();
|
||||
}
|
||||
self->storageQueueInfo.insert(mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality)));
|
||||
state Map<UID, StorageQueueInfo>::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id());
|
||||
TraceEvent("RkTracking", self->id)
|
||||
@ -157,6 +162,9 @@ public:
|
||||
loop {
|
||||
ErrorOr<StorageQueuingMetricsReply> reply = wait(ssi.getQueuingMetrics.getReplyUnlessFailedFor(
|
||||
StorageQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply?
|
||||
if (!self.available()) {
|
||||
return Void();
|
||||
}
|
||||
if (reply.present()) {
|
||||
myQueueInfo->value.update(reply.get(), self->smoothTotalDurableBytes);
|
||||
myQueueInfo->value.acceptingRequests = ssi.isAcceptingRequests();
|
||||
@ -173,7 +181,10 @@ public:
|
||||
}
|
||||
} catch (...) {
|
||||
// including cancellation
|
||||
if (self.available()) {
|
||||
self->storageQueueInfo.erase(myQueueInfo);
|
||||
self->storageServerInterfaces.erase(ssi.id());
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
@ -207,10 +218,12 @@ public:
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> trackEachStorageServer(
|
||||
Ratekeeper* self,
|
||||
ResourceWeakRef<Ratekeeper> self,
|
||||
FutureStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges) {
|
||||
state Map<UID, Future<Void>> actors;
|
||||
|
||||
state std::unordered_map<UID, Future<Void>> storageServerTrackers;
|
||||
state Promise<Void> err;
|
||||
|
||||
loop choose {
|
||||
when(state std::pair<UID, Optional<StorageServerInterface>> change = waitNext(serverChanges)) {
|
||||
wait(delay(0)); // prevent storageServerTracker from getting cancelled while on the call stack
|
||||
@ -218,15 +231,23 @@ public:
|
||||
const UID& id = change.first;
|
||||
if (change.second.present()) {
|
||||
if (!change.second.get().isTss()) {
|
||||
auto& a = actors[change.first];
|
||||
|
||||
auto& a = storageServerTrackers[change.first];
|
||||
a = Future<Void>();
|
||||
a = splitError(trackStorageServerQueueInfo(self, change.second.get()), err);
|
||||
|
||||
if (!self.available()) {
|
||||
return Void();
|
||||
}
|
||||
self->storageServerInterfaces[id] = change.second.get();
|
||||
}
|
||||
} else {
|
||||
storageServerTrackers.erase(id);
|
||||
|
||||
if (!self.available()) {
|
||||
return Void();
|
||||
}
|
||||
self->storageServerInterfaces.erase(id);
|
||||
actors.erase(id);
|
||||
}
|
||||
}
|
||||
when(wait(err.getFuture())) {}
|
||||
@ -234,7 +255,9 @@ public:
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
state Ratekeeper self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True));
|
||||
state ResourceOwningRef<Ratekeeper> pSelf(
|
||||
new Ratekeeper(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)));
|
||||
state Ratekeeper& self = *pSelf;
|
||||
state Future<Void> timeout = Void();
|
||||
state std::vector<Future<Void>> tlogTrackers;
|
||||
state std::vector<TLogInterface> tlogInterfs;
|
||||
@ -247,7 +270,7 @@ public:
|
||||
|
||||
PromiseStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges;
|
||||
self.addActor.send(self.monitorServerListChange(serverChanges));
|
||||
self.addActor.send(self.trackEachStorageServer(serverChanges.getFuture()));
|
||||
self.addActor.send(RatekeeperImpl::trackEachStorageServer(pSelf, serverChanges.getFuture()));
|
||||
self.addActor.send(traceRole(Role::RATEKEEPER, rkInterf.id()));
|
||||
|
||||
self.addActor.send(self.monitorThrottlingChanges());
|
||||
@ -405,15 +428,6 @@ Future<Void> Ratekeeper::monitorServerListChange(
|
||||
return RatekeeperImpl::monitorServerListChange(this, serverChanges);
|
||||
}
|
||||
|
||||
Future<Void> Ratekeeper::trackEachStorageServer(
|
||||
FutureStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges) {
|
||||
return RatekeeperImpl::trackEachStorageServer(this, serverChanges);
|
||||
}
|
||||
|
||||
Future<Void> Ratekeeper::trackStorageServerQueueInfo(StorageServerInterface ssi) {
|
||||
return RatekeeperImpl::trackStorageServerQueueInfo(this, ssi);
|
||||
}
|
||||
|
||||
Future<Void> Ratekeeper::trackTLogQueueInfo(TLogInterface tli) {
|
||||
return RatekeeperImpl::trackTLogQueueInfo(this, tli);
|
||||
}
|
||||
@ -957,12 +971,11 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
|
||||
}
|
||||
|
||||
StorageQueueInfo::StorageQueueInfo(UID id, LocalityData locality)
|
||||
: busiestWriteTagEventHolder(makeReference<EventCacheHolder>(id.toString() + "/BusiestWriteTag")), valid(false),
|
||||
id(id), locality(locality), acceptingRequests(false), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
|
||||
smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
limitReason(limitReason_t::unlimited) {
|
||||
: valid(false), id(id), locality(locality), acceptingRequests(false),
|
||||
smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited) {
|
||||
// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
|
||||
lastReply.instanceID = -1;
|
||||
}
|
||||
|
@ -18,6 +18,9 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBSERVER_RATEKEEPER_H
|
||||
#define FDBSERVER_RATEKEEPER_H
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/DatabaseConfiguration.h"
|
||||
@ -49,7 +52,6 @@ enum limitReason_t {
|
||||
class StorageQueueInfo {
|
||||
uint64_t totalWriteCosts{ 0 };
|
||||
int totalWriteOps{ 0 };
|
||||
Reference<EventCacheHolder> busiestWriteTagEventHolder;
|
||||
|
||||
// refresh periodically
|
||||
TransactionTagMap<TransactionCommitCostEstimation> tagCostEst;
|
||||
@ -167,7 +169,6 @@ class Ratekeeper {
|
||||
void updateRate(RatekeeperLimits* limits);
|
||||
Future<Void> refreshStorageServerCommitCosts();
|
||||
Future<Void> monitorServerListChange(PromiseStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges);
|
||||
Future<Void> trackEachStorageServer(FutureStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges);
|
||||
|
||||
// SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function
|
||||
Future<Void> trackStorageServerQueueInfo(StorageServerInterface);
|
||||
@ -180,3 +181,5 @@ class Ratekeeper {
|
||||
public:
|
||||
static Future<Void> run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo);
|
||||
};
|
||||
|
||||
#endif // FDBSERVER_RATEKEEPER_H
|
119
flow/OwningResource.h
Normal file
119
flow/OwningResource.h
Normal file
@ -0,0 +1,119 @@
|
||||
/*
|
||||
* OwningResource.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FLOW_SAFE_ACCESS_REF_H
|
||||
#define FLOW_SAFE_ACCESS_REF_H
|
||||
|
||||
#include "flow/FastRef.h"
|
||||
|
||||
// Consider the following situation:
|
||||
//
|
||||
// 1. An ACTOR A0 allocates an object O
|
||||
// 2. A0 spawns another ACTOR A1, which depends on O
|
||||
// 3. A0 triggers A1 and then terminates, destroying O
|
||||
// 4. Since A1 is triggered by A0 while not knowing A0 is terminated and O is released, it would cause a SEGV error.
|
||||
//
|
||||
// In this header file, two classes
|
||||
//
|
||||
// * ResourceOwningRef
|
||||
// * ResourceWeakRef
|
||||
//
|
||||
// are provided. The ResourceOwningRef is the reference that "holds" the resource, When it is destructed, the resource
|
||||
// is also released; while ResourceWeakRef is the reference that "weakly holds" the resource. Before each access, it is
|
||||
// the user's responsibility to verify if the resource is still available, via the available() method.
|
||||
//
|
||||
// With the two classes, the issue above can be solved by:
|
||||
//
|
||||
// 1. A0 allocates the object O via ResourceOwningRef
|
||||
// 2. A0 forwards O to A1, via ResourceWeakRef
|
||||
// 3. Every time A1 accesses O, it will verify if the resource is still available.
|
||||
// 4. When A0 terminates, O is released and all ResourceWeakRef available() call will report the resource is not
|
||||
// available anymore, preventing the SEGV error being raised.
|
||||
|
||||
namespace details {
|
||||
|
||||
// The class holding the pointer to the resource.
|
||||
// SOMEDAY: Think using std::unique_ptr
|
||||
template <typename T>
|
||||
struct Resource : public ReferenceCounted<Resource<T>>, NonCopyable {
|
||||
T* resource;
|
||||
|
||||
Resource(T* resource_) : resource(resource_) {}
|
||||
~Resource() { delete resource; }
|
||||
|
||||
void reset(T* resource_) {
|
||||
delete resource;
|
||||
resource = resource_;
|
||||
}
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
class ResourceRef {
|
||||
protected:
|
||||
Reference<Resource<T>> resourceRef;
|
||||
|
||||
public:
|
||||
ResourceRef(const Reference<Resource<T>>& ref) : resourceRef(ref) {}
|
||||
ResourceRef(Reference<Resource<T>>&& ref) : resourceRef(std::move(ref)) {}
|
||||
ResourceRef& operator=(const Reference<Resource<T>>& ref) {
|
||||
resourceRef = ref.resourceRef;
|
||||
return *this;
|
||||
}
|
||||
ResourceRef& operator=(Reference<Resource<T>>&& ref) {
|
||||
resourceRef = std::move(ref);
|
||||
return *this;
|
||||
}
|
||||
|
||||
T* operator->() { return resourceRef->resource; }
|
||||
T& operator*() {
|
||||
if (resourceRef->resource == nullptr) {
|
||||
throw internal_error();
|
||||
} else {
|
||||
return *(resourceRef->resource);
|
||||
}
|
||||
}
|
||||
|
||||
bool available() const { return resourceRef->resource != nullptr; }
|
||||
};
|
||||
|
||||
} // namespace details
|
||||
|
||||
// The class that holds a Reference to the details::Resource which holds the real object. If the instance is destroyed,
|
||||
// the object is destroyed, too.
|
||||
template <typename T>
|
||||
class ResourceOwningRef : public details::ResourceRef<T>, NonCopyable {
|
||||
template <typename U>
|
||||
friend class ResourceWeakRef;
|
||||
|
||||
public:
|
||||
ResourceOwningRef(T* resource) : details::ResourceRef<T>(makeReference<details::Resource<T>>(resource)) {}
|
||||
~ResourceOwningRef() { details::ResourceRef<T>::resourceRef->reset(nullptr); }
|
||||
};
|
||||
|
||||
// The class that weakly holds a Reference tot he etails::Resource. Destroying the reference will have no impact to the
|
||||
// real object. On the other hand, each time accessing the object requires a verification that the object is still alive
|
||||
template <typename T>
|
||||
class ResourceWeakRef : public details::ResourceRef<T> {
|
||||
public:
|
||||
ResourceWeakRef(const ResourceOwningRef<T>& ref) : details::ResourceRef<T>(ref.resourceRef) {}
|
||||
ResourceWeakRef(const ResourceWeakRef& ref) : details::ResourceRef<T>(ref.resourceRef) {}
|
||||
};
|
||||
|
||||
#endif // FLOW_SAFE_ACCESS_REF_H
|
Loading…
x
Reference in New Issue
Block a user