/* * LogSystemDiskQueueAdapter.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "fdbserver/IDiskQueue.h" #include "fdbserver/LogSystem.h" #include "fdbserver/LogSystemDiskQueueAdapter.h" #include "fdbserver/Knobs.h" #include "flow/actorcompiler.h" // has to be last include class LogSystemDiskQueueAdapterImpl { public: ACTOR static Future> readNext( LogSystemDiskQueueAdapter* self, int bytes ) { while (self->recoveryQueueDataSize < bytes) { if (self->recoveryLoc == self->logSystem->getEnd()) { // Recovery will be complete once the current recoveryQueue is consumed, so we no longer need self->logSystem TraceEvent("PeekNextEnd").detail("Queue", self->recoveryQueue.size()).detail("Bytes", bytes).detail("Loc", self->recoveryLoc).detail("End", self->logSystem->getEnd()); self->logSystem.clear(); break; } if(!self->cursor->hasMessage()) { loop { choose { when( wait( self->cursor->getMore() ) ) { break; } when( wait( self->localityChanged ) ) { self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().primaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion ); self->localityChanged = self->peekLocality->onChange(); } when( wait( delay(self->peekTypeSwitches==0 ? SERVER_KNOBS->DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME : SERVER_KNOBS->DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME)) ) { self->peekTypeSwitches++; if(self->peekTypeSwitches%3==1) { self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, tagLocalityInvalid, invalidVersion ); self->localityChanged = Never(); } else if(self->peekTypeSwitches%3==2) { self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().secondaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion ); self->localityChanged = self->peekLocality->onChange(); } else { self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().primaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion ); self->localityChanged = self->peekLocality->onChange(); } } } } TraceEvent("PeekNextGetMore").detail("Queue", self->recoveryQueue.size()).detail("Bytes", bytes).detail("Loc", self->recoveryLoc).detail("End", self->logSystem->getEnd()); if(self->recoveryQueueDataSize == 0) { self->recoveryQueueLoc = self->recoveryLoc; } if(!self->cursor->hasMessage()) { self->recoveryLoc = self->cursor->version().version; continue; } } self->recoveryQueue.push_back( Standalone(self->cursor->getMessage(), self->cursor->arena()) ); self->recoveryQueueDataSize += self->recoveryQueue.back().size(); self->cursor->nextMessage(); if(!self->cursor->hasMessage()) self->recoveryLoc = self->cursor->version().version; //TraceEvent("PeekNextResults").detail("From", self->recoveryLoc).detail("Queue", self->recoveryQueue.size()).detail("Bytes", bytes).detail("Has", self->cursor->hasMessage()).detail("End", self->logSystem->getEnd()); } if(self->recoveryQueue.size() > 1) { self->recoveryQueue[0] = concatenate(self->recoveryQueue.begin(), self->recoveryQueue.end()); self->recoveryQueue.resize(1); } if(self->recoveryQueueDataSize == 0) return Standalone(); ASSERT(self->recoveryQueue[0].size() == self->recoveryQueueDataSize); //TraceEvent("PeekNextReturn").detail("Bytes", bytes).detail("QueueSize", self->recoveryQueue.size()); bytes = std::min(bytes, self->recoveryQueue[0].size()); Standalone result( self->recoveryQueue[0].substr(0,bytes), self->recoveryQueue[0].arena() ); self->recoveryQueue[0].contents() = self->recoveryQueue[0].substr(bytes); self->recoveryQueueDataSize = self->recoveryQueue[0].size(); if(self->recoveryQueue[0].size() == 0) { self->recoveryQueue.clear(); } return result; } }; Future> LogSystemDiskQueueAdapter::readNext( int bytes ) { if (!enableRecovery) return Standalone(); return LogSystemDiskQueueAdapterImpl::readNext(this, bytes); } IDiskQueue::location LogSystemDiskQueueAdapter::getNextReadLocation() { return IDiskQueue::location( 0, recoveryQueueLoc ); } IDiskQueue::location LogSystemDiskQueueAdapter::push( StringRef contents ) { while(contents.size()) { int remainder = pushedData.size() == 0 ? 0 : pushedData.back().capacity() - pushedData.back().size(); if(remainder == 0) { VectorRef block; block.reserve(pushedData.arena(), SERVER_KNOBS->LOG_SYSTEM_PUSHED_DATA_BLOCK_SIZE); pushedData.push_back(pushedData.arena(), block); remainder = block.capacity(); } pushedData.back().append(pushedData.arena(), contents.begin(), std::min(remainder, contents.size())); contents = contents.substr(std::min(remainder, contents.size())); } return IDiskQueue::location( 0, nextCommit ); } void LogSystemDiskQueueAdapter::pop( location upTo ) { ASSERT( upTo.hi == 0 ); poppedUpTo = std::max( upTo.lo, poppedUpTo ); } Future LogSystemDiskQueueAdapter::commit() { ASSERT( !commitMessages.empty() ); auto promise = commitMessages.front(); commitMessages.pop_front(); CommitMessage cm; cm.messages = this->pushedData; this->pushedData = Standalone>>(); cm.popTo = poppedUpTo; promise.send(cm); return cm.acknowledge.getFuture(); } Future LogSystemDiskQueueAdapter::getError() { return Void(); } Future LogSystemDiskQueueAdapter::onClosed() { return Void(); } void LogSystemDiskQueueAdapter::dispose() { delete this; } void LogSystemDiskQueueAdapter::close() { delete this; } Future LogSystemDiskQueueAdapter::getCommitMessage() { Promise pcm; commitMessages.push_back( pcm ); return pcm.getFuture(); } LogSystemDiskQueueAdapter* openDiskQueueAdapter( Reference logSystem, Reference> peekLocality ) { return new LogSystemDiskQueueAdapter( logSystem, peekLocality ); }