/* * LogSystemDiskQueueAdapter.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "fdbserver/IDiskQueue.h" #include "fdbserver/LogSystem.h" #include "fdbserver/LogSystemDiskQueueAdapter.h" #include "fdbserver/Knobs.h" #include "flow/actorcompiler.h" // has to be last include class LogSystemDiskQueueAdapterImpl { public: ACTOR static Future> readNext(LogSystemDiskQueueAdapter* self, int bytes) { while (self->recoveryQueueDataSize < bytes) { if (self->recoveryLoc == self->logSystem->getEnd()) { // Recovery will be complete once the current recoveryQueue is consumed, so we no longer need // self->logSystem TraceEvent("PeekNextEnd") .detail("Queue", self->recoveryQueue.size()) .detail("Bytes", bytes) .detail("Loc", self->recoveryLoc) .detail("End", self->logSystem->getEnd()); self->logSystem.clear(); break; } if (!self->cursor->hasMessage()) { loop { choose { when(wait(self->cursor->getMore())) { break; } when(wait(self->localityChanged)) { self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().primaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion, self->totalRecoveredBytes == 0); self->localityChanged = self->peekLocality->onChange(); } when(wait(delay(self->peekTypeSwitches == 0 ? SERVER_KNOBS->DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME : SERVER_KNOBS->DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME))) { self->peekTypeSwitches++; if (self->peekTypeSwitches % 3 == 1) { self->cursor = self->logSystem->peekTxs(UID(), self->recoveryLoc, tagLocalityInvalid, invalidVersion, self->totalRecoveredBytes == 0); self->localityChanged = Never(); } else if (self->peekTypeSwitches % 3 == 2) { self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().secondaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion, self->totalRecoveredBytes == 0); self->localityChanged = self->peekLocality->onChange(); } else { self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().primaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion, self->totalRecoveredBytes == 0); self->localityChanged = self->peekLocality->onChange(); } } } } TraceEvent("PeekNextGetMore") .detail("Total", self->totalRecoveredBytes) .detail("Queue", self->recoveryQueue.size()) .detail("Bytes", bytes) .detail("Loc", self->recoveryLoc) .detail("End", self->logSystem->getEnd()) .detail("HasMessage", self->cursor->hasMessage()) .detail("Version", self->cursor->version().version); bool buggify = !self->hasDiscardedData && BUGGIFY_WITH_PROB(0.01); if (self->cursor->popped() != 0 || buggify) { TraceEvent(SevWarnAlways, "DiskQueueAdapterReset") .detail("Version", self->cursor->popped()) .detail("PeekTypeSwitch", self->peekTypeSwitches % 3); TEST(true); // disk adapter reset if (self->cursor->popped() != 0) { self->recoveryLoc = self->cursor->popped(); } else { self->recoveryLoc = self->startLoc; } self->recoveryQueue.clear(); self->recoveryQueueDataSize = 0; self->recoveryQueueLoc = self->recoveryLoc; self->totalRecoveredBytes = 0; if (self->peekTypeSwitches % 3 == 1) { self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, tagLocalityInvalid, invalidVersion, true); self->localityChanged = Never(); } else if (self->peekTypeSwitches % 3 == 2) { self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().secondaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion, true); self->localityChanged = self->peekLocality->onChange(); } else { self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().primaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion, true); self->localityChanged = self->peekLocality->onChange(); } self->hasDiscardedData = true; throw disk_adapter_reset(); } if (self->recoveryQueueDataSize == 0) { self->recoveryQueueLoc = self->recoveryLoc; } if (!self->cursor->hasMessage()) { self->recoveryLoc = self->cursor->version().version; wait(yield()); continue; } } self->recoveryQueue.push_back(Standalone(self->cursor->getMessage(), self->cursor->arena())); self->recoveryQueueDataSize += self->recoveryQueue.back().size(); self->totalRecoveredBytes += self->recoveryQueue.back().size(); self->cursor->nextMessage(); if (!self->cursor->hasMessage()) self->recoveryLoc = self->cursor->version().version; //TraceEvent("PeekNextResults").detail("From", self->recoveryLoc).detail("Queue", self->recoveryQueue.size()).detail("Bytes", bytes).detail("Has", self->cursor->hasMessage()).detail("End", self->logSystem->getEnd()); } if (self->recoveryQueue.size() > 1) { self->recoveryQueue[0] = concatenate(self->recoveryQueue.begin(), self->recoveryQueue.end()); self->recoveryQueue.resize(1); } if (self->recoveryQueueDataSize == 0) return Standalone(); ASSERT(self->recoveryQueue[0].size() == self->recoveryQueueDataSize); //TraceEvent("PeekNextReturn").detail("Bytes", bytes).detail("QueueSize", self->recoveryQueue.size()); bytes = std::min(bytes, self->recoveryQueue[0].size()); Standalone result(self->recoveryQueue[0].substr(0, bytes), self->recoveryQueue[0].arena()); self->recoveryQueue[0].contents() = self->recoveryQueue[0].substr(bytes); self->recoveryQueueDataSize = self->recoveryQueue[0].size(); if (self->recoveryQueue[0].size() == 0) { self->recoveryQueue.clear(); } return result; } }; Future> LogSystemDiskQueueAdapter::readNext(int bytes) { if (!enableRecovery) return Standalone(); return LogSystemDiskQueueAdapterImpl::readNext(this, bytes); } IDiskQueue::location LogSystemDiskQueueAdapter::getNextReadLocation() const { return IDiskQueue::location(0, recoveryQueueLoc); } IDiskQueue::location LogSystemDiskQueueAdapter::push(StringRef contents) { while (contents.size()) { int remainder = pushedData.size() == 0 ? 0 : pushedData.back().capacity() - pushedData.back().size(); if (remainder == 0) { VectorRef block; block.reserve(pushedData.arena(), SERVER_KNOBS->LOG_SYSTEM_PUSHED_DATA_BLOCK_SIZE); pushedData.push_back(pushedData.arena(), block); remainder = block.capacity(); } pushedData.back().append(pushedData.arena(), contents.begin(), std::min(remainder, contents.size())); contents = contents.substr(std::min(remainder, contents.size())); } return IDiskQueue::location(0, nextCommit); } void LogSystemDiskQueueAdapter::pop(location upTo) { ASSERT(upTo.hi == 0); poppedUpTo = std::max(upTo.lo, poppedUpTo); } Future LogSystemDiskQueueAdapter::commit() { ASSERT(!commitMessages.empty()); auto promise = commitMessages.front(); commitMessages.pop_front(); CommitMessage cm; cm.messages = this->pushedData; this->pushedData = Standalone>>(); cm.popTo = poppedUpTo; promise.send(cm); return cm.acknowledge.getFuture(); } Future LogSystemDiskQueueAdapter::getError() const { return Void(); } Future LogSystemDiskQueueAdapter::onClosed() const { return Void(); } void LogSystemDiskQueueAdapter::dispose() { delete this; } void LogSystemDiskQueueAdapter::close() { delete this; } Future LogSystemDiskQueueAdapter::getCommitMessage() { Promise pcm; commitMessages.push_back(pcm); return pcm.getFuture(); } LogSystemDiskQueueAdapter* openDiskQueueAdapter(Reference logSystem, Reference> peekLocality, Version txsPoppedVersion) { return new LogSystemDiskQueueAdapter(logSystem, peekLocality, txsPoppedVersion, true); }