mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-15 02:18:39 +08:00
fixup! Add a wrapper of ResourceWeakRef for better support of self pointer
This commit is contained in:
parent
1ab8d388af
commit
ec40c6bfec
@ -148,13 +148,9 @@ public:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR static Future<Void> trackStorageServerQueueInfo(ResourceWeakRef<Ratekeeper> self,
|
ACTOR static Future<Void> trackStorageServerQueueInfo(ActorWeakSelfRef<Ratekeeper> self,
|
||||||
StorageServerInterface ssi) {
|
StorageServerInterface ssi) {
|
||||||
if (!self.available()) {
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
self->storageQueueInfo.insert(mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality)));
|
self->storageQueueInfo.insert(mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality)));
|
||||||
state Map<UID, StorageQueueInfo>::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id());
|
|
||||||
TraceEvent("RkTracking", self->id)
|
TraceEvent("RkTracking", self->id)
|
||||||
.detail("StorageServer", ssi.id())
|
.detail("StorageServer", ssi.id())
|
||||||
.detail("Locality", ssi.locality.toString());
|
.detail("Locality", ssi.locality.toString());
|
||||||
@ -162,9 +158,7 @@ public:
|
|||||||
loop {
|
loop {
|
||||||
ErrorOr<StorageQueuingMetricsReply> reply = wait(ssi.getQueuingMetrics.getReplyUnlessFailedFor(
|
ErrorOr<StorageQueuingMetricsReply> reply = wait(ssi.getQueuingMetrics.getReplyUnlessFailedFor(
|
||||||
StorageQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply?
|
StorageQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply?
|
||||||
if (!self.available()) {
|
Map<UID, StorageQueueInfo>::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id());
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
if (reply.present()) {
|
if (reply.present()) {
|
||||||
myQueueInfo->value.update(reply.get(), self->smoothTotalDurableBytes);
|
myQueueInfo->value.update(reply.get(), self->smoothTotalDurableBytes);
|
||||||
myQueueInfo->value.acceptingRequests = ssi.isAcceptingRequests();
|
myQueueInfo->value.acceptingRequests = ssi.isAcceptingRequests();
|
||||||
@ -181,10 +175,9 @@ public:
|
|||||||
}
|
}
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
// including cancellation
|
// including cancellation
|
||||||
if (self.available()) {
|
Map<UID, StorageQueueInfo>::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id());
|
||||||
self->storageQueueInfo.erase(myQueueInfo);
|
self->storageQueueInfo.erase(myQueueInfo);
|
||||||
self->storageServerInterfaces.erase(ssi.id());
|
self->storageServerInterfaces.erase(ssi.id());
|
||||||
}
|
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -218,7 +211,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
ACTOR static Future<Void> trackEachStorageServer(
|
ACTOR static Future<Void> trackEachStorageServer(
|
||||||
ResourceWeakRef<Ratekeeper> self,
|
ActorWeakSelfRef<Ratekeeper> self,
|
||||||
FutureStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges) {
|
FutureStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges) {
|
||||||
|
|
||||||
state std::unordered_map<UID, Future<Void>> storageServerTrackers;
|
state std::unordered_map<UID, Future<Void>> storageServerTrackers;
|
||||||
@ -236,17 +229,11 @@ public:
|
|||||||
a = Future<Void>();
|
a = Future<Void>();
|
||||||
a = splitError(trackStorageServerQueueInfo(self, change.second.get()), err);
|
a = splitError(trackStorageServerQueueInfo(self, change.second.get()), err);
|
||||||
|
|
||||||
if (!self.available()) {
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
self->storageServerInterfaces[id] = change.second.get();
|
self->storageServerInterfaces[id] = change.second.get();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
storageServerTrackers.erase(id);
|
storageServerTrackers.erase(id);
|
||||||
|
|
||||||
if (!self.available()) {
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
self->storageServerInterfaces.erase(id);
|
self->storageServerInterfaces.erase(id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -255,7 +242,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
ACTOR static Future<Void> run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
ACTOR static Future<Void> run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||||
state ResourceOwningRef<Ratekeeper> pSelf(
|
state ActorOwningSelfRef<Ratekeeper> pSelf(
|
||||||
new Ratekeeper(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)));
|
new Ratekeeper(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)));
|
||||||
state Ratekeeper& self = *pSelf;
|
state Ratekeeper& self = *pSelf;
|
||||||
state Future<Void> timeout = Void();
|
state Future<Void> timeout = Void();
|
||||||
@ -407,14 +394,10 @@ public:
|
|||||||
// for each SS, select the busiest commit tag from ssTrTagCommitCost
|
// for each SS, select the busiest commit tag from ssTrTagCommitCost
|
||||||
for (auto& [ssId, ssQueueInfo] : self->storageQueueInfo) {
|
for (auto& [ssId, ssQueueInfo] : self->storageQueueInfo) {
|
||||||
// NOTE: In some cases, for unknown reason SS will not respond to the updateCommitCostRequest. Since the
|
// NOTE: In some cases, for unknown reason SS will not respond to the updateCommitCostRequest. Since the
|
||||||
// information is not time-sensitive, place a timeout to avoid stucking RK.
|
// information is not time-sensitive, we do not wait for the replies.
|
||||||
replies.push_back(timeout(self->storageServerInterfaces[ssId].updateCommitCostRequest.getReply(
|
replies.push_back(self->storageServerInterfaces[ssId].updateCommitCostRequest.getReply(
|
||||||
ssQueueInfo.refreshCommitCost(elapsed)),
|
ssQueueInfo.refreshCommitCost(elapsed)));
|
||||||
SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL,
|
|
||||||
Void()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wait(waitForAll(replies));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}; // class RatekeeperImpl
|
}; // class RatekeeperImpl
|
||||||
|
@ -69,7 +69,6 @@ class ResourceRef {
|
|||||||
protected:
|
protected:
|
||||||
Reference<Resource<T>> resourceRef;
|
Reference<Resource<T>> resourceRef;
|
||||||
|
|
||||||
public:
|
|
||||||
ResourceRef(const Reference<Resource<T>>& ref) : resourceRef(ref) {}
|
ResourceRef(const Reference<Resource<T>>& ref) : resourceRef(ref) {}
|
||||||
ResourceRef(Reference<Resource<T>>&& ref) : resourceRef(std::move(ref)) {}
|
ResourceRef(Reference<Resource<T>>&& ref) : resourceRef(std::move(ref)) {}
|
||||||
ResourceRef& operator=(const Reference<Resource<T>>& ref) {
|
ResourceRef& operator=(const Reference<Resource<T>>& ref) {
|
||||||
@ -81,16 +80,23 @@ public:
|
|||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
T* operator->() { return resourceRef->resource; }
|
virtual ~ResourceRef() {}
|
||||||
T& operator*() {
|
|
||||||
if (resourceRef->resource == nullptr) {
|
public:
|
||||||
|
// Retrieves the resource as a pointer
|
||||||
|
T* operator->() const noexcept { return resourceRef->resource; }
|
||||||
|
|
||||||
|
// Retrieves the resource as a reference
|
||||||
|
T& operator*() const {
|
||||||
|
if (!available()) {
|
||||||
throw internal_error();
|
throw internal_error();
|
||||||
} else {
|
} else {
|
||||||
return *(resourceRef->resource);
|
return *(resourceRef->resource);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool available() const { return resourceRef->resource != nullptr; }
|
// Returns true if the resource is available, i.e. not nullptr
|
||||||
|
bool available() const noexcept { return resourceRef->resource != nullptr; }
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace details
|
} // namespace details
|
||||||
@ -102,12 +108,15 @@ class ResourceOwningRef : public details::ResourceRef<T>, NonCopyable {
|
|||||||
template <typename U>
|
template <typename U>
|
||||||
friend class ResourceWeakRef;
|
friend class ResourceWeakRef;
|
||||||
|
|
||||||
|
template <typename U>
|
||||||
|
friend class ActorWeakSelfRef;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
ResourceOwningRef(T* resource) : details::ResourceRef<T>(makeReference<details::Resource<T>>(resource)) {}
|
ResourceOwningRef(T* resource) : details::ResourceRef<T>(makeReference<details::Resource<T>>(resource)) {}
|
||||||
~ResourceOwningRef() { details::ResourceRef<T>::resourceRef->reset(nullptr); }
|
virtual ~ResourceOwningRef() { details::ResourceRef<T>::resourceRef->reset(nullptr); }
|
||||||
};
|
};
|
||||||
|
|
||||||
// The class that weakly holds a Reference tot he etails::Resource. Destroying the reference will have no impact to the
|
// The class that weakly holds a Reference to the details::Resource. Destroying the reference will have no impact to the
|
||||||
// real object. On the other hand, each time accessing the object requires a verification that the object is still alive
|
// real object. On the other hand, each time accessing the object requires a verification that the object is still alive
|
||||||
template <typename T>
|
template <typename T>
|
||||||
class ResourceWeakRef : public details::ResourceRef<T> {
|
class ResourceWeakRef : public details::ResourceRef<T> {
|
||||||
@ -116,4 +125,31 @@ public:
|
|||||||
ResourceWeakRef(const ResourceWeakRef& ref) : details::ResourceRef<T>(ref.resourceRef) {}
|
ResourceWeakRef(const ResourceWeakRef& ref) : details::ResourceRef<T>(ref.resourceRef) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// A unique reference that takes the ownership of the self object. The self object is widely used as the "global"
|
||||||
|
// context of each role.
|
||||||
|
template <typename T>
|
||||||
|
using ActorOwningSelfRef = ResourceOwningRef<T>;
|
||||||
|
|
||||||
|
// A wrapper of ResourceWeakRef, used to forward the widely used `self` pointer from the core ACTOR to other ACTORs. It
|
||||||
|
// will check the resource before returning it. If the resource is not available, an operation_cancelled error will be
|
||||||
|
// thrown to terminate the current ACTOR.
|
||||||
|
template <typename T>
|
||||||
|
class ActorWeakSelfRef : public ResourceWeakRef<T> {
|
||||||
|
public:
|
||||||
|
ActorWeakSelfRef(const ResourceOwningRef<T>& ref) : ResourceWeakRef<T>(ref) {}
|
||||||
|
ActorWeakSelfRef(const ResourceWeakRef<T>& ref) : ResourceWeakRef<T>(ref) {}
|
||||||
|
ActorWeakSelfRef(const ActorWeakSelfRef<T>& ref)
|
||||||
|
: ResourceWeakRef<T>(static_cast<const ResourceWeakRef<T>&>(ref)) {}
|
||||||
|
|
||||||
|
// Retrieves the resource as a pointer, throws operation_cancelled if the resource is not available
|
||||||
|
T* operator->() const {
|
||||||
|
if (!ResourceOwningRef<T>::available()) [[unlikely]]
|
||||||
|
throw operation_cancelled();
|
||||||
|
return ResourceOwningRef<T>::resourceRef->resource;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gets the reference to the resource, Throws operation_cancelled if the resource is not available
|
||||||
|
T& operator*() const { return *(this->operator->()); }
|
||||||
|
};
|
||||||
|
|
||||||
#endif // FLOW_OWNING_REOSURCE_H
|
#endif // FLOW_OWNING_REOSURCE_H
|
Loading…
x
Reference in New Issue
Block a user