Remove runners list from PriorityMultiLock and rely on reference counting in the release handler instead of canceling the release handlers. This improves the microbenchmark by 26%.

This commit is contained in:
Steve Atherton 2022-11-11 00:34:03 -08:00
parent 7aa1e23bc6
commit 3b4a467a92
3 changed files with 30 additions and 75 deletions

View File

@ -1015,13 +1015,13 @@ public:
FlowLock serveFetchCheckpointParallelismLock;
PriorityMultiLock ssLock;
Reference<PriorityMultiLock> ssLock;
std::vector<int> readPriorityRanks;
Future<PriorityMultiLock::Lock> getReadLock(const Optional<ReadOptions>& options) {
int readType = (int)(options.present() ? options.get().type : ReadType::NORMAL);
readType = std::clamp<int>(readType, 0, readPriorityRanks.size() - 1);
return ssLock.lock(readPriorityRanks[readType]);
return ssLock->lock(readPriorityRanks[readType]);
}
FlowLock serveAuditStorageParallelismLock;
@ -1302,7 +1302,8 @@ public:
fetchKeysParallelismFullLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_FULL),
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM),
ssLock(SERVER_KNOBS->STORAGE_SERVER_READ_CONCURRENCY, SERVER_KNOBS->STORAGESERVER_READ_PRIORITIES),
ssLock(makeReference<PriorityMultiLock>(SERVER_KNOBS->STORAGE_SERVER_READ_CONCURRENCY,
SERVER_KNOBS->STORAGESERVER_READ_PRIORITIES)),
serveAuditStorageParallelismLock(SERVER_KNOBS->SERVE_AUDIT_STORAGE_PARALLELISM),
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0),
@ -10159,20 +10160,20 @@ ACTOR Future<Void> metricsCore(StorageServer* self, StorageServerInterface ssi)
te.detail("StorageEngine", self->storage.getKeyValueStoreType().toString());
te.detail("Tag", self->tag.toString());
std::vector<int> rpr = self->readPriorityRanks;
te.detail("ReadsActive", self->ssLock.totalRunners());
te.detail("ReadsWaiting", self->ssLock.totalWaiters());
te.detail("ReadsActive", self->ssLock->totalRunners());
te.detail("ReadsWaiting", self->ssLock->totalWaiters());
int type = (int)ReadType::FETCH;
te.detail("ReadFetchActive", self->ssLock.numRunners(rpr[type]));
te.detail("ReadFetchWaiting", self->ssLock.numWaiters(rpr[type]));
te.detail("ReadFetchActive", self->ssLock->numRunners(rpr[type]));
te.detail("ReadFetchWaiting", self->ssLock->numWaiters(rpr[type]));
type = (int)ReadType::LOW;
te.detail("ReadLowActive", self->ssLock.numRunners(rpr[type]));
te.detail("ReadLowWaiting", self->ssLock.numWaiters(rpr[type]));
te.detail("ReadLowActive", self->ssLock->numRunners(rpr[type]));
te.detail("ReadLowWaiting", self->ssLock->numWaiters(rpr[type]));
type = (int)ReadType::NORMAL;
te.detail("ReadNormalActive", self->ssLock.numRunners(rpr[type]));
te.detail("ReadNormalWaiting", self->ssLock.numWaiters(rpr[type]));
te.detail("ReadNormalActive", self->ssLock->numRunners(rpr[type]));
te.detail("ReadNormalWaiting", self->ssLock->numWaiters(rpr[type]));
type = (int)ReadType::HIGH;
te.detail("ReadHighActive", self->ssLock.numRunners(rpr[type]));
te.detail("ReadHighWaiting", self->ssLock.numWaiters(rpr[type]));
te.detail("ReadHighActive", self->ssLock->numRunners(rpr[type]));
te.detail("ReadHighWaiting", self->ssLock->numWaiters(rpr[type]));
StorageBytes sb = self->storage.getStorageBytes();
te.detail("KvstoreBytesUsed", sb.used);
te.detail("KvstoreBytesFree", sb.free);
@ -10988,7 +10989,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
// If the storage server dies while something that uses self is still on the stack,
// we want that actor to complete before we terminate and that memory goes out of scope
self.ssLock.kill();
self.ssLock->kill();
state Error err = e;
if (storageServerTerminated(self, persistentData, err)) {
@ -11086,7 +11087,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
throw internal_error();
} catch (Error& e) {
self.ssLock.kill();
self.ssLock->kill();
if (self.byteSampleRecovery.isValid()) {
self.byteSampleRecovery.cancel();

View File

@ -46,7 +46,8 @@
// A multi user lock with a concurrent holder limit where waiters request a lock with a priority
// id and are granted locks based on a total concurrency and relative weights of the current active
// priorities. Priority id's must start at 0 and are sequential integers.
// priorities. Priority id's must start at 0 and are sequential integers. Priority id numbers
// are not related to the importance of the priority in execution.
//
// Scheduling logic
// Let
@ -67,10 +68,10 @@
// The interface is similar to FlowMutex except that lock holders can just drop the lock to release it.
//
// Usage:
// Lock lock = wait(prioritylock.lock(priorityLevel));
// Lock lock = wait(prioritylock.lock(priority_id));
// lock.release(); // Explicit release, or
// // let lock and all copies of lock go out of scope to release
class PriorityMultiLock {
class PriorityMultiLock : public ReferenceCounted<PriorityMultiLock> {
public:
// Waiting on the lock returns a Lock, which is really just a Promise<Void>
// Calling release() is not necessary, it exists in case the Lock holder wants to explicitly release
@ -142,28 +143,18 @@ public:
fRunner.cancel();
available = 0;
// Cancel and clean up runners
auto r = runners.begin();
while (r != runners.end()) {
r->handler.cancel();
Runner* runner = &*r;
r = runners.erase(r);
delete runner;
}
waitingPriorities.clear();
priorities.clear();
}
std::string toString() const {
std::string s = format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d runnersList=%d "
std::string s = format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d "
"pendingWeights=%d ",
this,
concurrency,
available,
concurrency - available,
waiting,
runners.size(),
totalPendingWeights);
for (auto& p : priorities) {
@ -172,11 +163,6 @@ public:
s += "}";
if (concurrency - available != runners.size()) {
pml_debug_printf("%s\n", s.c_str());
ASSERT_EQ(concurrency - available, runners.size());
}
return s;
}
@ -241,66 +227,35 @@ private:
// does not have to iterage over the priorities vector checking priorities without waiters.
WaitingPrioritiesList waitingPriorities;
struct Runner : boost::intrusive::list_base_hook<>, FastAllocated<Runner> {
Runner(Priority* p) : priority(p) {
#if PRIORITYMULTILOCK_DEBUG || !defined(NO_INTELLISENSE)
debugID = deterministicRandom()->randomUniqueID();
#endif
}
Future<Void> handler;
Priority* priority;
#if PRIORITYMULTILOCK_DEBUG || !defined(NO_INTELLISENSE)
UID debugID;
#endif
};
// Current runners list. This is an intrusive list of FastAllocated items so that they can remove themselves
// efficiently as they complete. size() will be linear because it's only used in toString() for debugging
typedef boost::intrusive::list<Runner, boost::intrusive::constant_time_size<false>> RunnerList;
RunnerList runners;
Future<Void> fRunner;
AsyncTrigger wakeRunner;
Promise<Void> brokenOnDestruct;
ACTOR static Future<Void> handleRelease(PriorityMultiLock* self, Runner* r, Future<Void> holder) {
pml_debug_printf("%f handleRelease self=%p id=%s start \n", now(), self, r->debugID.toString().c_str());
ACTOR static void handleRelease(Reference<PriorityMultiLock> self, Priority* priority, Future<Void> holder) {
pml_debug_printf("%f handleRelease self=%p start\n", now(), self.getPtr());
try {
wait(holder);
pml_debug_printf("%f handleRelease self=%p id=%s success\n", now(), self, r->debugID.toString().c_str());
pml_debug_printf("%f handleRelease self=%p success\n", now(), self.getPtr());
} catch (Error& e) {
pml_debug_printf(
"%f handleRelease self=%p id=%s error %s\n", now(), self, r->debugID.toString().c_str(), e.what());
if (e.code() == error_code_actor_cancelled) {
// self is shutting down so no need to clean up r, this is done in kill()
throw;
}
pml_debug_printf("%f handleRelease self=%p error %s\n", now(), self.getPtr(), e.what());
}
pml_debug_printf("lock release priority %d %s\n", (int)(r->priority->priority), self->toString().c_str());
pml_debug_printf("lock release priority %d %s\n", (int)(priority->priority), self->toString().c_str());
pml_debug_printf("%f handleRelease self=%p id=%s releasing\n", now(), self, r->debugID.toString().c_str());
pml_debug_printf("%f handleRelease self=%p releasing\n", now(), self.getPtr());
++self->available;
r->priority->runners -= 1;
// Remove r from runners list and delete it
self->runners.erase(RunnerList::s_iterator_to(*r));
delete r;
priority->runners -= 1;
// If there are any waiters or if the runners array is getting large, trigger the runner loop
if (self->waiting > 0) {
self->wakeRunner.trigger();
}
return Void();
}
void addRunner(Lock& lock, Priority* priority) {
priority->runners += 1;
--available;
Runner* runner = new Runner(priority);
runners.push_back(*runner);
runner->handler = handleRelease(this, runner, lock.promise.getFuture());
handleRelease(Reference<PriorityMultiLock>::addRef(this), priority, lock.promise.getFuture());
}
// Current maximum running tasks for the specified priority, which must have waiters

View File

@ -40,7 +40,7 @@ ACTOR static Future<Void> benchPriorityMultiLock(benchmark::State* benchState) {
}
state int concurrency = priorities.size() * 10;
state PriorityMultiLock* pml = new PriorityMultiLock(concurrency, priorities);
state Reference<PriorityMultiLock> pml = makeReference<PriorityMultiLock>(concurrency, priorities);
// Clog the lock buy taking n=concurrency locks
state std::deque<Future<PriorityMultiLock::Lock>> lockFutures;
@ -78,7 +78,6 @@ ACTOR static Future<Void> benchPriorityMultiLock(benchmark::State* benchState) {
benchState->SetItemsProcessed(static_cast<long>(benchState->iterations()));
delete pml;
return Void();
}