diff --git a/fdbserver/DiskQueue.actor.cpp b/fdbserver/DiskQueue.actor.cpp index 1f38dfb8ee..543cb92331 100644 --- a/fdbserver/DiskQueue.actor.cpp +++ b/fdbserver/DiskQueue.actor.cpp @@ -218,7 +218,7 @@ public: void dispose() { shutdown(this, true); } void close() { shutdown(this, false); } - StorageBytes getStorageBytes() { + StorageBytes getStorageBytes() const { int64_t free; int64_t total; @@ -789,7 +789,7 @@ public: { } - virtual location push( StringRef contents ) { + location push(StringRef contents) override { ASSERT( recovered ); uint8_t const* begin = contents.begin(); uint8_t const* end = contents.end(); @@ -807,7 +807,7 @@ public: return endLocation(); } - virtual void pop( location upTo ) { + void pop(location upTo) override { ASSERT( !upTo.hi ); ASSERT( !recovered || upTo.lo <= endLocation() ); @@ -829,14 +829,14 @@ public: } } - virtual Future<Standalone<StringRef>> read(location from, location to, CheckHashes ch) { return read(this, from, to, ch); } - - int getMaxPayload() { - return Page::maxPayload; + Future<Standalone<StringRef>> read(location from, location to, CheckHashes ch) override { + return read(this, from, to, ch); } + int getMaxPayload() const { return Page::maxPayload; } + // Always commit an entire page. Commit overhead is the unused space in a to-be-committed page - virtual int getCommitOverhead() { + int getCommitOverhead() const override { if(!pushedPageCount()) { if(!anyPopped) return 0; @@ -849,7 +849,7 @@ public: return backPage().remainingCapacity(); } - virtual Future<Void> commit() { + Future<Void> commit() override { ASSERT( recovered ); if (!pushedPageCount()) { if (!anyPopped) return Void(); @@ -887,30 +887,30 @@ public: rawQueue->stall(); } - virtual Future<bool> initializeRecovery(location recoverAt) { return initializeRecovery( this, recoverAt ); } - virtual Future<Standalone<StringRef>> readNext( int bytes ) { return readNext(this, bytes); } + Future<bool> initializeRecovery(location recoverAt) override { return initializeRecovery(this, recoverAt); } + Future<Standalone<StringRef>> readNext(int bytes) override { return readNext(this, bytes); } // FIXME: getNextReadLocation should ASSERT( initialized ), but the memory storage engine needs // to be changed to understand the new intiailizeRecovery protocol. - virtual location getNextReadLocation() { return nextReadLocation; } - virtual location getNextCommitLocation() { ASSERT( initialized ); return lastCommittedSeq + sizeof(Page); } - virtual location getNextPushLocation() { ASSERT( initialized ); return endLocation(); } + location getNextReadLocation() const override { return nextReadLocation; } + location getNextCommitLocation() const override { + ASSERT(initialized); + return lastCommittedSeq + sizeof(Page); + } + location getNextPushLocation() const override { + ASSERT(initialized); + return endLocation(); + } - virtual Future<Void> getError() { return rawQueue->getError(); } - virtual Future<Void> onClosed() { return rawQueue->onClosed(); } + Future<Void> getError() override { return rawQueue->getError(); } + Future<Void> onClosed() override { return rawQueue->onClosed(); } - virtual void dispose() { + void dispose() override { TraceEvent("DQDestroy", dbgid).detail("LastPoppedSeq", lastPoppedSeq).detail("PoppedSeq", poppedSeq).detail("NextPageSeq", nextPageSeq).detail("File0Name", rawQueue->files[0].dbgFilename); dispose(this); } - ACTOR static void dispose(DiskQueue* self) { - wait( self->onSafeToDestruct() ); - TraceEvent("DQDestroyDone", self->dbgid).detail("File0Name", self->rawQueue->files[0].dbgFilename); - self->rawQueue->dispose(); - delete self; - } - virtual void close() { + void close() override { TraceEvent("DQClose", dbgid) .detail("LastPoppedSeq", lastPoppedSeq) .detail("PoppedSeq", poppedSeq) @@ -919,6 +919,17 @@ public: .detail("File0Name", rawQueue->files[0].dbgFilename); close(this); } + + StorageBytes getStorageBytes() const override { return rawQueue->getStorageBytes(); } + +private: + ACTOR static void dispose(DiskQueue* self) { + wait(self->onSafeToDestruct()); + TraceEvent("DQDestroyDone", self->dbgid).detail("File0Name", self->rawQueue->files[0].dbgFilename); + self->rawQueue->dispose(); + delete self; + } + ACTOR static void close(DiskQueue* self) { wait( self->onSafeToDestruct() ); TraceEvent("DQCloseDone", self->dbgid).detail("File0Name", self->rawQueue->files[0].dbgFilename); @@ -926,11 +937,6 @@ public: delete self; } - virtual StorageBytes getStorageBytes() { - return rawQueue->getStorageBytes(); - } - -private: #pragma pack(push, 1) struct PageHeader { union { @@ -1399,29 +1405,30 @@ public: Future<bool> initializeRecovery(location recoverAt) { return queue->initializeRecovery(recoverAt); } Future<Standalone<StringRef>> readNext( int bytes ) { return readNext(this, bytes); } - virtual location getNextReadLocation() { return queue->getNextReadLocation(); } + location getNextReadLocation() const override { return queue->getNextReadLocation(); } - virtual Future<Standalone<StringRef>> read( location start, location end, CheckHashes ch ) { return queue->read( start, end, ch ); } - virtual location getNextCommitLocation() { return queue->getNextCommitLocation(); } - virtual location getNextPushLocation() { return queue->getNextPushLocation(); } + Future<Standalone<StringRef>> read(location start, location end, CheckHashes ch) override { + return queue->read(start, end, ch); + } + location getNextCommitLocation() const override { return queue->getNextCommitLocation(); } + location getNextPushLocation() const override { return queue->getNextPushLocation(); } - - virtual location push( StringRef contents ) { + location push(StringRef contents) override { pushed = queue->push(contents); return pushed; } - virtual void pop( location upTo ) { + void pop(location upTo) override { popped = std::max(popped, upTo); ASSERT_WE_THINK(committed >= popped); queue->pop(std::min(committed, popped)); } - virtual int getCommitOverhead() { + int getCommitOverhead() const override { return queue->getCommitOverhead() + (popped > committed ? queue->getMaxPayload() : 0); } - Future<Void> commit() { + Future<Void> commit() override { location pushLocation = pushed; location popLocation = popped; @@ -1444,7 +1451,7 @@ public: return commitFuture; } - virtual StorageBytes getStorageBytes() { return queue->getStorageBytes(); } + StorageBytes getStorageBytes() const override { return queue->getStorageBytes(); } private: DiskQueue *queue; diff --git a/fdbserver/IDiskQueue.h b/fdbserver/IDiskQueue.h index 627a5a44ca..7d86b6e5a2 100644 --- a/fdbserver/IDiskQueue.h +++ b/fdbserver/IDiskQueue.h @@ -68,18 +68,23 @@ public: // Before calling push or commit, the caller *must* perform recovery by calling readNext() until it returns less than the requested number of bytes. // Thereafter it may not be called again. virtual Future<Standalone<StringRef>> readNext( int bytes ) = 0; // Return the next bytes in the queue (beginning, the first time called, with the first unpopped byte) - virtual location getNextReadLocation() = 0; // Returns a location >= the location of all bytes previously returned by readNext(), and <= the location of all bytes subsequently returned - virtual location getNextCommitLocation() = 0; // If commit() were to be called, all buffered writes would be written starting at `location`. - virtual location getNextPushLocation() = 0; // If push() were to be called, the pushed data would be written starting at `location`. + virtual location getNextReadLocation() + const = 0; // Returns a location >= the location of all bytes previously returned by readNext(), and <= the + // location of all bytes subsequently returned + virtual location getNextCommitLocation() + const = 0; // If commit() were to be called, all buffered writes would be written starting at `location`. + virtual location getNextPushLocation() + const = 0; // If push() were to be called, the pushed data would be written starting at `location`. virtual Future<Standalone<StringRef>> read( location start, location end, CheckHashes vc ) = 0; virtual location push( StringRef contents ) = 0; // Appends the given bytes to the byte stream. Returns a location token representing the *end* of the contents. virtual void pop( location upTo ) = 0; // Removes all bytes before the given location token from the byte stream. virtual Future<Void> commit() = 0; // returns when all prior pushes and pops are durable. If commit does not return (due to close or a crash), any prefix of the pushed bytes and any prefix of the popped bytes may be durable. - virtual int getCommitOverhead() = 0; // returns the amount of unused space that would be written by a commit that immediately followed this call + virtual int getCommitOverhead() const = 0; // returns the amount of unused space that would be written by a commit + // that immediately followed this call - virtual StorageBytes getStorageBytes() = 0; + virtual StorageBytes getStorageBytes() const = 0; }; template<> diff --git a/fdbserver/LogSystemDiskQueueAdapter.actor.cpp b/fdbserver/LogSystemDiskQueueAdapter.actor.cpp index b8612e9033..ab10869e74 100644 --- a/fdbserver/LogSystemDiskQueueAdapter.actor.cpp +++ b/fdbserver/LogSystemDiskQueueAdapter.actor.cpp @@ -136,7 +136,7 @@ Future<Standalone<StringRef>> LogSystemDiskQueueAdapter::readNext( int bytes ) { return LogSystemDiskQueueAdapterImpl::readNext(this, bytes); } -IDiskQueue::location LogSystemDiskQueueAdapter::getNextReadLocation() { +IDiskQueue::location LogSystemDiskQueueAdapter::getNextReadLocation() const { return IDiskQueue::location( 0, recoveryQueueLoc ); } diff --git a/fdbserver/LogSystemDiskQueueAdapter.h b/fdbserver/LogSystemDiskQueueAdapter.h index d4a514c4b8..bb3d880723 100644 --- a/fdbserver/LogSystemDiskQueueAdapter.h +++ b/fdbserver/LogSystemDiskQueueAdapter.h @@ -73,23 +73,35 @@ public: Future<CommitMessage> getCommitMessage(); // IClosable interface - virtual Future<Void> getError(); - virtual Future<Void> onClosed(); - virtual void dispose(); - virtual void close(); + Future<Void> getError() override; + Future<Void> onClosed() override; + void dispose() override; + void close() override; // IDiskQueue interface - virtual Future<bool> initializeRecovery(location recoverAt) { return false; } - virtual Future<Standalone<StringRef>> readNext( int bytes ); - virtual IDiskQueue::location getNextReadLocation(); - virtual IDiskQueue::location getNextCommitLocation() { ASSERT(false); throw internal_error(); } - virtual IDiskQueue::location getNextPushLocation() { ASSERT(false); throw internal_error(); } - virtual Future<Standalone<StringRef>> read( location start, location end, CheckHashes ch ) { ASSERT(false); throw internal_error(); } - virtual IDiskQueue::location push( StringRef contents ); - virtual void pop( IDiskQueue::location upTo ); - virtual Future<Void> commit(); - virtual StorageBytes getStorageBytes() { ASSERT(false); throw internal_error(); } - virtual int getCommitOverhead() { return 0; } //SOMEDAY: could this be more accurate? + Future<bool> initializeRecovery(location recoverAt) override { return false; } + Future<Standalone<StringRef>> readNext(int bytes) override; + IDiskQueue::location getNextReadLocation() const override; + IDiskQueue::location getNextCommitLocation() const override { + ASSERT(false); + throw internal_error(); + } + IDiskQueue::location getNextPushLocation() const override { + ASSERT(false); + throw internal_error(); + } + Future<Standalone<StringRef>> read(location start, location end, CheckHashes ch) override { + ASSERT(false); + throw internal_error(); + } + IDiskQueue::location push(StringRef contents) override; + void pop(IDiskQueue::location upTo) override; + Future<Void> commit() override; + StorageBytes getStorageBytes() const override { + ASSERT(false); + throw internal_error(); + } + int getCommitOverhead() const override { return 0; } // SOMEDAY: could this be more accurate? private: Reference<AsyncVar<PeekTxsInfo>> peekLocality;