/* * TLogServer.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "flow/Hash3.h" #include "flow/UnitTest.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/Notified.h" #include "fdbclient/KeyRangeMap.h" #include "fdbclient/RunTransaction.actor.h" #include "fdbclient/SystemData.h" #include "fdbserver/WorkerInterface.actor.h" #include "fdbserver/TLogInterface.h" #include "fdbserver/Knobs.h" #include "fdbserver/IKeyValueStore.h" #include "flow/ActorCollection.h" #include "fdbrpc/FailureMonitor.h" #include "fdbserver/IDiskQueue.h" #include "fdbrpc/sim_validation.h" #include "fdbrpc/simulator.h" #include "fdbrpc/Stats.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/LogSystem.h" #include "fdbserver/WaitFailure.h" #include "fdbserver/RecoveryState.h" #include "fdbserver/FDBExecHelper.actor.h" #include "flow/actorcompiler.h" // This must be the last #include. using std::max; using std::min; namespace oldTLog_6_0 { struct TLogQueueEntryRef { UID id; Version version; Version knownCommittedVersion; StringRef messages; TLogQueueEntryRef() : version(0), knownCommittedVersion(0) {} TLogQueueEntryRef(Arena& a, TLogQueueEntryRef const& from) : id(from.id), version(from.version), knownCommittedVersion(from.knownCommittedVersion), messages(a, from.messages) {} template void serialize(Ar& ar) { serializer(ar, version, messages, knownCommittedVersion, id); } size_t expectedSize() const { return messages.expectedSize(); } }; struct AlternativeTLogQueueEntryRef { UID id; Version version; Version knownCommittedVersion; std::vector* alternativeMessages; AlternativeTLogQueueEntryRef() : version(0), knownCommittedVersion(0), alternativeMessages(nullptr) {} template void serialize(Ar& ar) { ASSERT(!ar.isDeserializing && alternativeMessages); uint32_t msgSize = expectedSize(); serializer(ar, version, msgSize); for (auto& msg : *alternativeMessages) { ar.serializeBytes(msg.message); } serializer(ar, knownCommittedVersion, id); } uint32_t expectedSize() const { uint32_t msgSize = 0; for (auto& msg : *alternativeMessages) { msgSize += msg.message.size(); } return msgSize; } }; typedef Standalone TLogQueueEntry; struct LogData; struct TLogData; struct TLogQueue final : public IClosable { public: TLogQueue(IDiskQueue* queue, UID dbgid) : queue(queue), dbgid(dbgid) {} // Each packet in the queue is // uint32_t payloadSize // uint8_t payload[payloadSize] (begins with uint64_t protocolVersion via IncludeVersion) // uint8_t validFlag // TLogQueue is a durable queue of TLogQueueEntry objects with an interface similar to IDiskQueue // TLogQueue pushes (but not commits) are atomic - after commit fails to return, a prefix of entire calls to push // are durable. This is // implemented on top of the weaker guarantee of IDiskQueue::commit (that a prefix of bytes is durable) using // validFlag and by padding any incomplete packet with zeros after recovery. // Before calling push, pop, or commit, the user must call readNext() until it throws // end_of_stream(). It may not be called again thereafter. Future readNext(TLogData* tLog) { return readNext(this, tLog); } template void push(T const& qe, Reference logData); void pop(Version upTo, Reference logData); Future commit() { return queue->commit(); } // Implements IClosable Future getError() const override { return queue->getError(); } Future onClosed() const override { return queue->onClosed(); } void dispose() override { queue->dispose(); delete this; } void close() override { queue->close(); delete this; } private: IDiskQueue* queue; UID dbgid; void updateVersionSizes(const TLogQueueEntry& result, TLogData* tLog); ACTOR static Future readNext(TLogQueue* self, TLogData* tLog) { state TLogQueueEntry result; state int zeroFillSize = 0; loop { Standalone h = wait(self->queue->readNext(sizeof(uint32_t))); if (h.size() != sizeof(uint32_t)) { if (h.size()) { TEST(true); // Zero fill within size field int payloadSize = 0; memcpy(&payloadSize, h.begin(), h.size()); zeroFillSize = sizeof(uint32_t) - h.size(); // zero fill the size itself zeroFillSize += payloadSize + 1; // and then the contents and valid flag } break; } state uint32_t payloadSize = *(uint32_t*)h.begin(); ASSERT(payloadSize < (100 << 20)); Standalone e = wait(self->queue->readNext(payloadSize + 1)); if (e.size() != payloadSize + 1) { TEST(true); // Zero fill within payload zeroFillSize = payloadSize + 1 - e.size(); break; } if (e[payloadSize]) { Arena a = e.arena(); ArenaReader ar(a, e.substr(0, payloadSize), IncludeVersion()); ar >> result; self->updateVersionSizes(result, tLog); return result; } } if (zeroFillSize) { TEST(true); // Fixing a partial commit at the end of the tlog queue for (int i = 0; i < zeroFillSize; i++) self->queue->push(StringRef((const uint8_t*)"", 1)); } throw end_of_stream(); } }; ////// Persistence format (for self->persistentData) // Immutable keys static const KeyValueRef persistFormat(LiteralStringRef("Format"), LiteralStringRef("FoundationDB/LogServer/2/4")); static const KeyRangeRef persistFormatReadableRange(LiteralStringRef("FoundationDB/LogServer/2/3"), LiteralStringRef("FoundationDB/LogServer/2/5")); static const KeyRangeRef persistRecoveryCountKeys = KeyRangeRef(LiteralStringRef("DbRecoveryCount/"), LiteralStringRef("DbRecoveryCount0")); // Updated on updatePersistentData() static const KeyRangeRef persistCurrentVersionKeys = KeyRangeRef(LiteralStringRef("version/"), LiteralStringRef("version0")); static const KeyRangeRef persistKnownCommittedVersionKeys = KeyRangeRef(LiteralStringRef("knownCommitted/"), LiteralStringRef("knownCommitted0")); static const KeyRangeRef persistLocalityKeys = KeyRangeRef(LiteralStringRef("Locality/"), LiteralStringRef("Locality0")); static const KeyRangeRef persistLogRouterTagsKeys = KeyRangeRef(LiteralStringRef("LogRouterTags/"), LiteralStringRef("LogRouterTags0")); static const KeyRangeRef persistTxsTagsKeys = KeyRangeRef(LiteralStringRef("TxsTags/"), LiteralStringRef("TxsTags0")); static const KeyRange persistTagMessagesKeys = prefixRange(LiteralStringRef("TagMsg/")); static const KeyRange persistTagPoppedKeys = prefixRange(LiteralStringRef("TagPop/")); static Key persistTagMessagesKey(UID id, Tag tag, Version version) { BinaryWriter wr(Unversioned()); wr.serializeBytes(persistTagMessagesKeys.begin); wr << id; wr << tag; wr << bigEndian64(version); return wr.toValue(); } static Key persistTagPoppedKey(UID id, Tag tag) { BinaryWriter wr(Unversioned()); wr.serializeBytes(persistTagPoppedKeys.begin); wr << id; wr << tag; return wr.toValue(); } static Value persistTagPoppedValue(Version popped) { return BinaryWriter::toValue(popped, Unversioned()); } static Tag decodeTagPoppedKey(KeyRef id, KeyRef key) { Tag s; BinaryReader rd(key.removePrefix(persistTagPoppedKeys.begin).removePrefix(id), Unversioned()); rd >> s; return s; } static Version decodeTagPoppedValue(ValueRef value) { return BinaryReader::fromStringRef(value, Unversioned()); } static StringRef stripTagMessagesKey(StringRef key) { return key.substr(sizeof(UID) + sizeof(Tag) + persistTagMessagesKeys.begin.size()); } static Version decodeTagMessagesKey(StringRef key) { return bigEndian64(BinaryReader::fromStringRef(stripTagMessagesKey(key), Unversioned())); } struct TLogData : NonCopyable { AsyncTrigger newLogData; Deque queueOrder; std::map> id_data; UID dbgid; UID workerID; IKeyValueStore* persistentData; IDiskQueue* rawPersistentQueue; TLogQueue* persistentQueue; int64_t diskQueueCommitBytes; AsyncVar largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES Reference const> dbInfo; Database cx; NotifiedVersion queueCommitEnd; Version queueCommitBegin; int64_t instanceID; int64_t bytesInput; int64_t bytesDurable; int64_t targetVolatileBytes; // The number of bytes of mutations this TLog should hold in memory before spilling. int64_t overheadBytesInput; int64_t overheadBytesDurable; int activePeekStreams = 0; WorkerCache tlogCache; PromiseStream> sharedActors; Promise terminated; FlowLock concurrentLogRouterReads; FlowLock persistentDataCommitLock; bool ignorePopRequest; // ignore pop request from storage servers double ignorePopDeadline; // time until which the ignorePopRequest will be // honored std::string ignorePopUid; // callers that set ignorePopRequest will set this // extra state, used to validate the ownership of // the set and for callers that unset will // be able to match it up std::string dataFolder; // folder where data is stored Reference> degraded; std::vector tempTagMessages; TLogData(UID dbgid, UID workerID, IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference const> dbInfo, Reference> degraded, std::string folder) : dbgid(dbgid), workerID(workerID), persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)), diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), dbInfo(dbInfo), queueCommitEnd(0), queueCommitBegin(0), instanceID(deterministicRandom()->randomUniqueID().first()), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0), concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS), ignorePopRequest(false), dataFolder(folder), degraded(degraded) { cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True); } }; struct LogData : NonCopyable, public ReferenceCounted { struct TagData : NonCopyable, public ReferenceCounted { std::deque> versionMessages; bool nothingPersistent; // true means tag is *known* to have no messages in persistentData. false means nothing. bool poppedRecently; // `popped` has changed since last updatePersistentData Version popped; // see popped version tracking contract below bool unpoppedRecovered; Tag tag; TagData(Tag tag, Version popped, bool nothingPersistent, bool poppedRecently, bool unpoppedRecovered) : nothingPersistent(nothingPersistent), poppedRecently(poppedRecently), popped(popped), unpoppedRecovered(unpoppedRecovered), tag(tag) {} TagData(TagData&& r) noexcept : versionMessages(std::move(r.versionMessages)), nothingPersistent(r.nothingPersistent), poppedRecently(r.poppedRecently), popped(r.popped), unpoppedRecovered(r.unpoppedRecovered), tag(r.tag) {} void operator=(TagData&& r) noexcept { versionMessages = std::move(r.versionMessages); nothingPersistent = r.nothingPersistent; poppedRecently = r.poppedRecently; popped = r.popped; tag = r.tag; unpoppedRecovered = r.unpoppedRecovered; } // Erase messages not needed to update *from* versions >= before (thus, messages with toversion <= before) ACTOR Future eraseMessagesBefore(TagData* self, Version before, TLogData* tlogData, Reference logData, TaskPriority taskID) { while (!self->versionMessages.empty() && self->versionMessages.front().first < before) { Version version = self->versionMessages.front().first; std::pair& sizes = logData->version_sizes[version]; int64_t messagesErased = 0; while (!self->versionMessages.empty() && self->versionMessages.front().first == version) { auto const& m = self->versionMessages.front(); ++messagesErased; if (self->tag.locality != tagLocalityTxs && self->tag != txsTag) { sizes.first -= m.second.expectedSize(); } else { sizes.second -= m.second.expectedSize(); } self->versionMessages.pop_front(); } int64_t bytesErased = messagesErased * SERVER_KNOBS->VERSION_MESSAGES_ENTRY_BYTES_WITH_OVERHEAD; logData->bytesDurable += bytesErased; tlogData->bytesDurable += bytesErased; tlogData->overheadBytesDurable += bytesErased; wait(yield(taskID)); } return Void(); } Future eraseMessagesBefore(Version before, TLogData* tlogData, Reference logData, TaskPriority taskID) { return eraseMessagesBefore(this, before, tlogData, logData, taskID); } }; Map versionLocation; // For the version of each entry that was push()ed, the end location of the serialized bytes /* Popped version tracking contract needed by log system to implement ILogCursor::popped(): - Log server tracks for each (possible) tag a popped_version Impl: TagData::popped (in memory) and persistTagPoppedKeys (in persistentData) - popped_version(tag) is <= the maximum version for which log server (or a predecessor) is ever asked to pop the tag Impl: Only increased by tLogPop() in response to either a pop request or recovery from a predecessor - popped_version(tag) is > the maximum version for which log server is unable to peek messages due to previous pops (on this server or a predecessor) Impl: Increased by tLogPop() atomically with erasing messages from memory; persisted by updatePersistentData() atomically with erasing messages from store; messages are not erased from queue where popped_version is not persisted - LockTLogReply returns all tags which either have messages, or which have nonzero popped_versions Impl: tag_data is present for all such tags - peek(tag, v) returns the popped_version for tag if that is greater than v Impl: Check tag_data->popped (after all waits) */ AsyncTrigger stopCommit; bool stopped, initialized; DBRecoveryCount recoveryCount; VersionMetricHandle persistentDataVersion, persistentDataDurableVersion; // The last version number in the portion of the log (written|durable) to // persistentData NotifiedVersion version, queueCommittedVersion; Version queueCommittingVersion; Version knownCommittedVersion, durableKnownCommittedVersion, minKnownCommittedVersion; // Track lastUpdate time for parallel peek and detect stall on tLogs struct PeekTrackerData { std::map>> sequence_version; double lastUpdate; Tag tag; double lastLogged; int64_t totalPeeks; int64_t replyBytes; int64_t duplicatePeeks; double queueTime; double queueMax; double blockTime; double blockMax; double workTime; double workMax; int64_t unblockedPeeks; double idleTime; double idleMax; PeekTrackerData() : lastUpdate(0) { resetMetrics(); } void resetMetrics() { lastLogged = now(); totalPeeks = 0; replyBytes = 0; duplicatePeeks = 0; queueTime = 0; queueMax = 0; blockTime = 0; blockMax = 0; workTime = 0; workMax = 0; unblockedPeeks = 0; idleTime = 0; idleMax = 0; } }; std::map peekTracker; Deque>>> messageBlocks; std::vector>> tag_data; // tag.locality | tag.id int unpoppedRecoveredTags; Reference getTagData(Tag tag) { int idx = tag.toTagDataIndex(); if (idx >= tag_data.size()) { tag_data.resize(idx + 1); } if (tag.id >= tag_data[idx].size()) { tag_data[idx].resize(tag.id + 1); } return tag_data[idx][tag.id]; } // only callable after getTagData returns a null reference Reference createTagData(Tag tag, Version popped, bool nothingPersistent, bool poppedRecently, bool unpoppedRecovered) { if (tag.locality != tagLocalityLogRouter && tag.locality != tagLocalityTxs && tag != txsTag && allTags.size() && !allTags.count(tag) && popped <= recoveredAt) { popped = recoveredAt + 1; } Reference newTagData = Reference(new TagData(tag, popped, nothingPersistent, poppedRecently, unpoppedRecovered)); tag_data[tag.toTagDataIndex()][tag.id] = newTagData; return newTagData; } Map> version_sizes; CounterCollection cc; Counter bytesInput; Counter bytesDurable; UID logId; Version newPersistentDataVersion; Future removed; PromiseStream> addActor; TLogData* tLogData; Promise recoveryComplete, committingQueue; Version unrecoveredBefore, recoveredAt; Reference>> logSystem; Tag remoteTag; bool isPrimary; int logRouterTags; Version logRouterPoppedVersion, logRouterPopToVersion; int8_t locality; UID recruitmentID; std::set allTags; Future terminated; FlowLock execOpLock; bool execOpCommitInProgress; int txsTags; std::map toBePopped; // map of Tag->Version for all the pops // that came when ignorePopRequest was set explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, std::vector tags, std::string context) : stopped(false), initialized(false), queueCommittingVersion(0), knownCommittedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), unpoppedRecoveredTags(0), cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), logId(interf.id()), newPersistentDataVersion(invalidVersion), tLogData(tLogData), unrecoveredBefore(1), recoveredAt(1), logSystem(new AsyncVar>()), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), logRouterPoppedVersion(0), logRouterPopToVersion(0), locality(tagLocalityInvalid), recruitmentID(recruitmentID), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()), execOpCommitInProgress(false), txsTags(txsTags) { startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, { { "SharedTLog", tLogData->dbgid.shortString() } }, context); addActor.send(traceRole(Role::TRANSACTION_LOG, interf.id())); persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id); persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id); version.initMetric(LiteralStringRef("TLog.Version"), cc.id); queueCommittedVersion.initMetric(LiteralStringRef("TLog.QueueCommittedVersion"), cc.id); specialCounter(cc, "Version", [this]() { return this->version.get(); }); specialCounter(cc, "QueueCommittedVersion", [this]() { return this->queueCommittedVersion.get(); }); specialCounter(cc, "PersistentDataVersion", [this]() { return this->persistentDataVersion; }); specialCounter(cc, "PersistentDataDurableVersion", [this]() { return this->persistentDataDurableVersion; }); specialCounter(cc, "KnownCommittedVersion", [this]() { return this->knownCommittedVersion; }); specialCounter(cc, "QueuePoppedVersion", [this]() { return this->persistentDataDurableVersion; }); specialCounter(cc, "SharedBytesInput", [tLogData]() { return tLogData->bytesInput; }); specialCounter(cc, "SharedBytesDurable", [tLogData]() { return tLogData->bytesDurable; }); specialCounter(cc, "SharedOverheadBytesInput", [tLogData]() { return tLogData->overheadBytesInput; }); specialCounter(cc, "SharedOverheadBytesDurable", [tLogData]() { return tLogData->overheadBytesDurable; }); specialCounter( cc, "KvstoreBytesUsed", [tLogData]() { return tLogData->persistentData->getStorageBytes().used; }); specialCounter( cc, "KvstoreBytesFree", [tLogData]() { return tLogData->persistentData->getStorageBytes().free; }); specialCounter(cc, "KvstoreBytesAvailable", [tLogData]() { return tLogData->persistentData->getStorageBytes().available; }); specialCounter( cc, "KvstoreBytesTotal", [tLogData]() { return tLogData->persistentData->getStorageBytes().total; }); specialCounter( cc, "QueueDiskBytesUsed", [tLogData]() { return tLogData->rawPersistentQueue->getStorageBytes().used; }); specialCounter( cc, "QueueDiskBytesFree", [tLogData]() { return tLogData->rawPersistentQueue->getStorageBytes().free; }); specialCounter(cc, "QueueDiskBytesAvailable", [tLogData]() { return tLogData->rawPersistentQueue->getStorageBytes().available; }); specialCounter( cc, "QueueDiskBytesTotal", [tLogData]() { return tLogData->rawPersistentQueue->getStorageBytes().total; }); specialCounter(cc, "ActivePeekStreams", [tLogData]() { return tLogData->activePeekStreams; }); } ~LogData() { endRole(Role::TRANSACTION_LOG, logId, "Error", true); if (!terminated.isReady()) { tLogData->bytesDurable += bytesInput.getValue() - bytesDurable.getValue(); TraceEvent("TLogBytesWhenRemoved", logId) .detail("SharedBytesInput", tLogData->bytesInput) .detail("SharedBytesDurable", tLogData->bytesDurable) .detail("LocalBytesInput", bytesInput.getValue()) .detail("LocalBytesDurable", bytesDurable.getValue()); ASSERT_ABORT(tLogData->bytesDurable <= tLogData->bytesInput); Key logIdKey = BinaryWriter::toValue(logId, Unversioned()); tLogData->persistentData->clear(singleKeyRange(logIdKey.withPrefix(persistCurrentVersionKeys.begin))); tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistKnownCommittedVersionKeys.begin))); tLogData->persistentData->clear(singleKeyRange(logIdKey.withPrefix(persistLocalityKeys.begin))); tLogData->persistentData->clear(singleKeyRange(logIdKey.withPrefix(persistLogRouterTagsKeys.begin))); tLogData->persistentData->clear(singleKeyRange(logIdKey.withPrefix(persistTxsTagsKeys.begin))); tLogData->persistentData->clear(singleKeyRange(logIdKey.withPrefix(persistRecoveryCountKeys.begin))); Key msgKey = logIdKey.withPrefix(persistTagMessagesKeys.begin); tLogData->persistentData->clear(KeyRangeRef(msgKey, strinc(msgKey))); Key poppedKey = logIdKey.withPrefix(persistTagPoppedKeys.begin); tLogData->persistentData->clear(KeyRangeRef(poppedKey, strinc(poppedKey))); } for (auto it = peekTracker.begin(); it != peekTracker.end(); ++it) { for (auto seq : it->second.sequence_version) { if (!seq.second.isSet()) { seq.second.sendError(operation_obsolete()); } } } } LogEpoch epoch() const { return recoveryCount; } }; template void TLogQueue::push(T const& qe, Reference logData) { BinaryWriter wr(Unversioned()); // outer framing is not versioned wr << uint32_t(0); IncludeVersion(ProtocolVersion::withTLogQueueEntryRef()).write(wr); // payload is versioned wr << qe; wr << uint8_t(1); *(uint32_t*)wr.getData() = wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t); auto loc = queue->push(wr.toValue()); //TraceEvent("TLogQueueVersionWritten", dbgid).detail("Size", wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t)).detail("Loc", loc); logData->versionLocation[qe.version] = loc; } void TLogQueue::pop(Version upTo, Reference logData) { // Keep only the given and all subsequent version numbers // Find the first version >= upTo auto v = logData->versionLocation.lower_bound(upTo); if (v == logData->versionLocation.begin()) return; if (v == logData->versionLocation.end()) { v = logData->versionLocation.lastItem(); } else { v.decrementNonEnd(); } queue->pop(v->value); logData->versionLocation.erase(logData->versionLocation.begin(), v); // ... and then we erase that previous version and all prior versions } void TLogQueue::updateVersionSizes(const TLogQueueEntry& result, TLogData* tLog) { auto it = tLog->id_data.find(result.id); if (it != tLog->id_data.end()) { it->second->versionLocation[result.version] = queue->getNextReadLocation(); } } ACTOR Future tLogLock(TLogData* self, ReplyPromise reply, Reference logData) { state Version stopVersion = logData->version.get(); TEST(true); // TLog stopped by recovering master TEST(logData->stopped); // logData already stopped TEST(!logData->stopped); // logData not yet stopped TraceEvent("TLogStop", logData->logId) .detail("Ver", stopVersion) .detail("IsStopped", logData->stopped) .detail("QueueCommitted", logData->queueCommittedVersion.get()); logData->stopped = true; if (!logData->recoveryComplete.isSet()) { logData->recoveryComplete.sendError(end_of_stream()); } // Lock once the current version has been committed wait(logData->queueCommittedVersion.whenAtLeast(stopVersion)); ASSERT(stopVersion == logData->version.get()); TLogLockResult result; result.end = stopVersion; result.knownCommittedVersion = logData->knownCommittedVersion; TraceEvent("TLogStop2", self->dbgid) .detail("LogId", logData->logId) .detail("Ver", stopVersion) .detail("IsStopped", logData->stopped) .detail("QueueCommitted", logData->queueCommittedVersion.get()) .detail("KnownCommitted", result.knownCommittedVersion); reply.send(result); return Void(); } void updatePersistentPopped(TLogData* self, Reference logData, Reference data) { if (!data->poppedRecently) return; self->persistentData->set( KeyValueRef(persistTagPoppedKey(logData->logId, data->tag), persistTagPoppedValue(data->popped))); data->poppedRecently = false; if (data->nothingPersistent) return; self->persistentData->clear(KeyRangeRef(persistTagMessagesKey(logData->logId, data->tag, Version(0)), persistTagMessagesKey(logData->logId, data->tag, data->popped))); if (data->popped > logData->persistentDataVersion) data->nothingPersistent = true; } ACTOR Future updatePersistentData(TLogData* self, Reference logData, Version newPersistentDataVersion) { // PERSIST: Changes self->persistentDataVersion and writes and commits the relevant changes ASSERT(newPersistentDataVersion <= logData->version.get()); ASSERT(newPersistentDataVersion <= logData->queueCommittedVersion.get()); ASSERT(newPersistentDataVersion > logData->persistentDataVersion); ASSERT(logData->persistentDataVersion == logData->persistentDataDurableVersion); //TraceEvent("UpdatePersistentData", self->dbgid).detail("Seq", newPersistentDataSeq); state bool anyData = false; // For all existing tags state int tagLocality = 0; state int tagId = 0; for (tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) { for (tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) { state Reference tagData = logData->tag_data[tagLocality][tagId]; if (tagData) { state Version currentVersion = 0; // Clear recently popped versions from persistentData if necessary updatePersistentPopped(self, logData, tagData); // Transfer unpopped messages with version numbers less than newPersistentDataVersion to persistentData state std::deque>::iterator msg = tagData->versionMessages.begin(); while (msg != tagData->versionMessages.end() && msg->first <= newPersistentDataVersion) { currentVersion = msg->first; anyData = true; tagData->nothingPersistent = false; BinaryWriter wr(Unversioned()); for (; msg != tagData->versionMessages.end() && msg->first == currentVersion; ++msg) wr << msg->second.toStringRef(); self->persistentData->set( KeyValueRef(persistTagMessagesKey(logData->logId, tagData->tag, currentVersion), wr.toValue())); Future f = yield(TaskPriority::UpdateStorage); if (!f.isReady()) { wait(f); msg = std::upper_bound(tagData->versionMessages.begin(), tagData->versionMessages.end(), std::make_pair(currentVersion, LengthPrefixedStringRef()), [](const auto& l, const auto& r) { return l.first < r.first; }); } } wait(yield(TaskPriority::UpdateStorage)); } } } self->persistentData->set( KeyValueRef(BinaryWriter::toValue(logData->logId, Unversioned()).withPrefix(persistCurrentVersionKeys.begin), BinaryWriter::toValue(newPersistentDataVersion, Unversioned()))); self->persistentData->set(KeyValueRef( BinaryWriter::toValue(logData->logId, Unversioned()).withPrefix(persistKnownCommittedVersionKeys.begin), BinaryWriter::toValue(logData->knownCommittedVersion, Unversioned()))); logData->persistentDataVersion = newPersistentDataVersion; wait(self->persistentData->commit()); // SOMEDAY: This seems to be running pretty often, should we slow it down??? wait(delay(0, TaskPriority::UpdateStorage)); // Now that the changes we made to persistentData are durable, erase the data we moved from memory and the queue, // increase bytesDurable accordingly, and update persistentDataDurableVersion. TEST(anyData); // TLog moved data to persistentData logData->persistentDataDurableVersion = newPersistentDataVersion; for (tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) { for (tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) { if (logData->tag_data[tagLocality][tagId]) { wait(logData->tag_data[tagLocality][tagId]->eraseMessagesBefore( newPersistentDataVersion + 1, self, logData, TaskPriority::UpdateStorage)); wait(yield(TaskPriority::UpdateStorage)); } } } logData->version_sizes.erase(logData->version_sizes.begin(), logData->version_sizes.lower_bound(logData->persistentDataDurableVersion)); wait(yield(TaskPriority::UpdateStorage)); while (!logData->messageBlocks.empty() && logData->messageBlocks.front().first <= newPersistentDataVersion) { int64_t bytesErased = int64_t(logData->messageBlocks.front().second.size()) * SERVER_KNOBS->TLOG_MESSAGE_BLOCK_OVERHEAD_FACTOR; logData->bytesDurable += bytesErased; self->bytesDurable += bytesErased; logData->messageBlocks.pop_front(); wait(yield(TaskPriority::UpdateStorage)); } if (logData->bytesDurable.getValue() > logData->bytesInput.getValue() || self->bytesDurable > self->bytesInput) { TraceEvent(SevError, "BytesDurableTooLarge", logData->logId) .detail("SharedBytesInput", self->bytesInput) .detail("SharedBytesDurable", self->bytesDurable) .detail("LocalBytesInput", logData->bytesInput.getValue()) .detail("LocalBytesDurable", logData->bytesDurable.getValue()); } ASSERT(logData->bytesDurable.getValue() <= logData->bytesInput.getValue()); ASSERT(self->bytesDurable <= self->bytesInput); if (self->queueCommitEnd.get() > 0) self->persistentQueue->pop( newPersistentDataVersion + 1, logData); // SOMEDAY: this can cause a slow task (~0.5ms), presumably from erasing too many versions. Should // we limit the number of versions cleared at a time? return Void(); } ACTOR Future tLogPopCore(TLogData* self, Tag inputTag, Version to, Reference logData) { if (self->ignorePopRequest) { TraceEvent(SevDebug, "IgnoringPopRequest").detail("IgnorePopDeadline", self->ignorePopDeadline); if (logData->toBePopped.find(inputTag) == logData->toBePopped.end() || to > logData->toBePopped[inputTag]) { logData->toBePopped[inputTag] = to; } // add the pop to the toBePopped map TraceEvent(SevDebug, "IgnoringPopRequest") .detail("IgnorePopDeadline", self->ignorePopDeadline) .detail("Tag", inputTag.toString()) .detail("Version", to); return Void(); } state Version upTo = to; int8_t tagLocality = inputTag.locality; if (isPseudoLocality(tagLocality)) { if (logData->logSystem->get().isValid()) { upTo = logData->logSystem->get()->popPseudoLocalityTag(inputTag, to); tagLocality = tagLocalityLogRouter; } else { TraceEvent(SevWarn, "TLogPopNoLogSystem", self->dbgid) .detail("Locality", tagLocality) .detail("Version", upTo); return Void(); } } state Tag tag(tagLocality, inputTag.id); auto tagData = logData->getTagData(tag); if (!tagData) { tagData = logData->createTagData(tag, upTo, true, true, false); } else if (upTo > tagData->popped) { tagData->popped = upTo; tagData->poppedRecently = true; if (tagData->unpoppedRecovered && upTo > logData->recoveredAt) { tagData->unpoppedRecovered = false; logData->unpoppedRecoveredTags--; TraceEvent("TLogPoppedTag", logData->logId) .detail("Tags", logData->unpoppedRecoveredTags) .detail("Tag", tag.toString()) .detail("DurableKCVer", logData->durableKnownCommittedVersion) .detail("RecoveredAt", logData->recoveredAt); if (logData->unpoppedRecoveredTags == 0 && logData->durableKnownCommittedVersion >= logData->recoveredAt && logData->recoveryComplete.canBeSet()) { logData->recoveryComplete.send(Void()); } } if (upTo > logData->persistentDataDurableVersion) wait(tagData->eraseMessagesBefore(upTo, self, logData, TaskPriority::TLogPop)); //TraceEvent("TLogPop", self->dbgid).detail("Tag", tag.toString()).detail("To", upTo); } return Void(); } ACTOR Future tLogPop(TLogData* self, TLogPopRequest req, Reference logData) { // timeout check for ignorePopRequest if (self->ignorePopRequest && (g_network->now() > self->ignorePopDeadline)) { TraceEvent("EnableTLogPlayAllIgnoredPops").log(); // use toBePopped and issue all the pops state std::map::iterator it; state std::vector> ignoredPops; self->ignorePopRequest = false; self->ignorePopUid = ""; self->ignorePopDeadline = 0.0; for (it = logData->toBePopped.begin(); it != logData->toBePopped.end(); it++) { TraceEvent("PlayIgnoredPop").detail("Tag", it->first.toString()).detail("Version", it->second); ignoredPops.push_back(tLogPopCore(self, it->first, it->second, logData)); } logData->toBePopped.clear(); wait(waitForAll(ignoredPops)); TraceEvent("ResetIgnorePopRequest") .detail("Now", g_network->now()) .detail("IgnorePopRequest", self->ignorePopRequest) .detail("IgnorePopDeadline", self->ignorePopDeadline); } wait(tLogPopCore(self, req.tag, req.to, logData)); req.reply.send(Void()); return Void(); } // This function (and updatePersistentData, which is called by this function) run at a low priority and can soak up all // CPU resources. For this reason, they employ aggressive use of yields to avoid causing slow tasks that could introduce // latencies for more important work (e.g. commits). ACTOR Future updateStorage(TLogData* self) { while (self->queueOrder.size() && !self->id_data.count(self->queueOrder.front())) { self->queueOrder.pop_front(); } if (!self->queueOrder.size()) { wait(delay(BUGGIFY ? SERVER_KNOBS->BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL : SERVER_KNOBS->TLOG_STORAGE_MIN_UPDATE_INTERVAL, TaskPriority::UpdateStorage)); return Void(); } state Reference logData = self->id_data[self->queueOrder.front()]; state Version nextVersion = 0; state int totalSize = 0; state FlowLock::Releaser commitLockReleaser; // FIXME: This policy for calculating the cache pop version could end up popping recent data in the remote DC after // two consecutive recoveries. // It also does not protect against spilling the cache tag directly, so it is theoretically possible to spill this // tag; which is not intended to ever happen. Optional cachePopVersion; for (auto& it : self->id_data) { if (!it.second->stopped) { if (it.second->version.get() - it.second->unrecoveredBefore > SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT + SERVER_KNOBS->MAX_CACHE_VERSIONS) { cachePopVersion = it.second->version.get() - SERVER_KNOBS->MAX_CACHE_VERSIONS; } break; } } if (cachePopVersion.present()) { state std::vector> cachePopFutures; for (auto& it : self->id_data) { cachePopFutures.push_back(tLogPop(self, TLogPopRequest(cachePopVersion.get(), 0, cacheTag), it.second)); } wait(waitForAll(cachePopFutures)); } if (logData->stopped) { if (self->bytesInput - self->bytesDurable >= self->targetVolatileBytes) { while (logData->persistentDataDurableVersion != logData->version.get()) { totalSize = 0; Map>::iterator sizeItr = logData->version_sizes.begin(); nextVersion = logData->version.get(); while (totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT && sizeItr != logData->version_sizes.end()) { totalSize += sizeItr->value.first + sizeItr->value.second; ++sizeItr; nextVersion = sizeItr == logData->version_sizes.end() ? logData->version.get() : sizeItr->key; } wait(logData->queueCommittedVersion.whenAtLeast(nextVersion)); if (logData->queueCommittedVersion.get() == std::numeric_limits::max()) { return Void(); } wait(delay(0, TaskPriority::UpdateStorage)); //TraceEvent("TlogUpdatePersist", self->dbgid).detail("LogId", logData->logId).detail("NextVersion", nextVersion).detail("Version", logData->version.get()).detail("PersistentDataDurableVer", logData->persistentDataDurableVersion).detail("QueueCommitVer", logData->queueCommittedVersion.get()).detail("PersistDataVer", logData->persistentDataVersion); if (nextVersion > logData->persistentDataVersion) { wait(self->persistentDataCommitLock.take()); commitLockReleaser = FlowLock::Releaser(self->persistentDataCommitLock); wait(updatePersistentData(self, logData, nextVersion)); commitLockReleaser.release(); } else { wait(delay(BUGGIFY ? SERVER_KNOBS->BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL : SERVER_KNOBS->TLOG_STORAGE_MIN_UPDATE_INTERVAL, TaskPriority::UpdateStorage)); } if (logData->removed.isReady()) { break; } } if (logData->persistentDataDurableVersion == logData->version.get()) { self->queueOrder.pop_front(); } wait(delay(0.0, TaskPriority::UpdateStorage)); } else { wait(delay(BUGGIFY ? SERVER_KNOBS->BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL : SERVER_KNOBS->TLOG_STORAGE_MIN_UPDATE_INTERVAL, TaskPriority::UpdateStorage)); } } else if (logData->initialized) { ASSERT(self->queueOrder.size() == 1); if (logData->version_sizes.empty()) { nextVersion = logData->version.get(); } else { Map>::iterator sizeItr = logData->version_sizes.begin(); while (totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT && sizeItr != logData->version_sizes.end() && (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= self->targetVolatileBytes || sizeItr->value.first == 0)) { totalSize += sizeItr->value.first + sizeItr->value.second; ++sizeItr; nextVersion = sizeItr == logData->version_sizes.end() ? logData->version.get() : sizeItr->key; } } //TraceEvent("UpdateStorageVer", logData->logId).detail("NextVersion", nextVersion).detail("PersistentDataVersion", logData->persistentDataVersion).detail("TotalSize", totalSize); wait(logData->queueCommittedVersion.whenAtLeast(nextVersion)); if (logData->queueCommittedVersion.get() == std::numeric_limits::max()) { return Void(); } wait(delay(0, TaskPriority::UpdateStorage)); if (nextVersion > logData->persistentDataVersion) { wait(self->persistentDataCommitLock.take()); commitLockReleaser = FlowLock::Releaser(self->persistentDataCommitLock); wait(updatePersistentData(self, logData, nextVersion)); commitLockReleaser.release(); } if (totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT) { wait(delay(BUGGIFY ? SERVER_KNOBS->BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL : SERVER_KNOBS->TLOG_STORAGE_MIN_UPDATE_INTERVAL, TaskPriority::UpdateStorage)); } else { // recovery wants to commit to persistant data when updatePersistentData is not active, this delay ensures // that immediately after updatePersist returns another one has not been started yet. wait(delay(0.0, TaskPriority::UpdateStorage)); } } else { wait(delay(BUGGIFY ? SERVER_KNOBS->BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL : SERVER_KNOBS->TLOG_STORAGE_MIN_UPDATE_INTERVAL, TaskPriority::UpdateStorage)); } return Void(); } ACTOR Future updateStorageLoop(TLogData* self) { wait(delay(0, TaskPriority::UpdateStorage)); loop { wait(updateStorage(self)); } } void commitMessages(TLogData* self, Reference logData, Version version, const std::vector& taggedMessages) { // SOMEDAY: This method of copying messages is reasonably memory efficient, but it's still a lot of bytes copied. // Find a way to do the memory allocation right as we receive the messages in the network layer. int64_t addedBytes = 0; int64_t overheadBytes = 0; int expectedBytes = 0; int txsBytes = 0; if (!taggedMessages.size()) { return; } int msgSize = 0; for (auto& i : taggedMessages) { msgSize += i.message.size(); } // Grab the last block in the blocks list so we can share its arena // We pop all of the elements of it to create a "fresh" vector that starts at the end of the previous vector Standalone> block; if (logData->messageBlocks.empty()) { block = Standalone>(); block.reserve(block.arena(), std::max(SERVER_KNOBS->TLOG_MESSAGE_BLOCK_BYTES, msgSize)); } else { block = logData->messageBlocks.back().second; } block.pop_front(block.size()); for (auto& msg : taggedMessages) { if (msg.message.size() > block.capacity() - block.size()) { logData->messageBlocks.emplace_back(version, block); addedBytes += int64_t(block.size()) * SERVER_KNOBS->TLOG_MESSAGE_BLOCK_OVERHEAD_FACTOR; block = Standalone>(); block.reserve(block.arena(), std::max(SERVER_KNOBS->TLOG_MESSAGE_BLOCK_BYTES, msgSize)); } block.append(block.arena(), msg.message.begin(), msg.message.size()); for (auto tag : msg.tags) { if (logData->locality == tagLocalitySatellite) { if (!(tag.locality == tagLocalityTxs || tag.locality == tagLocalityLogRouter || tag == txsTag)) { continue; } } else if (!(logData->locality == tagLocalitySpecial || logData->locality == tag.locality || tag.locality < 0)) { continue; } if (tag.locality == tagLocalityLogRouter) { if (!logData->logRouterTags) { continue; } tag.id = tag.id % logData->logRouterTags; } if (tag.locality == tagLocalityTxs) { if (logData->txsTags > 0) { tag.id = tag.id % logData->txsTags; } else { tag = txsTag; } } Reference tagData = logData->getTagData(tag); if (!tagData) { tagData = logData->createTagData(tag, 0, true, true, false); } if (version >= tagData->popped) { tagData->versionMessages.emplace_back( version, LengthPrefixedStringRef((uint32_t*)(block.end() - msg.message.size()))); if (tagData->versionMessages.back().second.expectedSize() > SERVER_KNOBS->MAX_MESSAGE_SIZE) { TraceEvent(SevWarnAlways, "LargeMessage") .detail("Size", tagData->versionMessages.back().second.expectedSize()); } if (tag.locality != tagLocalityTxs && tag != txsTag) { expectedBytes += tagData->versionMessages.back().second.expectedSize(); } else { txsBytes += tagData->versionMessages.back().second.expectedSize(); } // The factor of VERSION_MESSAGES_OVERHEAD is intended to be an overestimate of the actual memory used // to store this data in a std::deque. In practice, this number is probably something like 528/512 // ~= 1.03, but this could vary based on the implementation. There will also be a fixed overhead per // std::deque, but its size should be trivial relative to the size of the TLog queue and can be thought // of as increasing the capacity of the queue slightly. overheadBytes += SERVER_KNOBS->VERSION_MESSAGES_ENTRY_BYTES_WITH_OVERHEAD; } } msgSize -= msg.message.size(); } logData->messageBlocks.emplace_back(version, block); addedBytes += int64_t(block.size()) * SERVER_KNOBS->TLOG_MESSAGE_BLOCK_OVERHEAD_FACTOR; addedBytes += overheadBytes; logData->version_sizes[version] = std::make_pair(expectedBytes, txsBytes); logData->bytesInput += addedBytes; self->bytesInput += addedBytes; self->overheadBytesInput += overheadBytes; //TraceEvent("TLogPushed", self->dbgid).detail("Bytes", addedBytes).detail("MessageBytes", messages.size()).detail("Tags", tags.size()).detail("ExpectedBytes", expectedBytes).detail("MCount", mCount).detail("TCount", tCount); } void commitMessages(TLogData* self, Reference logData, Version version, Arena arena, StringRef messages) { ArenaReader rd(arena, messages, Unversioned()); self->tempTagMessages.clear(); while (!rd.empty()) { TagsAndMessage tagsAndMsg; tagsAndMsg.loadFromArena(&rd, nullptr); self->tempTagMessages.push_back(std::move(tagsAndMsg)); } commitMessages(self, logData, version, self->tempTagMessages); } Version poppedVersion(Reference self, Tag tag) { auto tagData = self->getTagData(tag); if (!tagData) { if (tag == txsTag || tag.locality == tagLocalityTxs) { return 0; } return self->recoveredAt + 1; } return tagData->popped; } std::deque>& getVersionMessages(Reference self, Tag tag) { auto tagData = self->getTagData(tag); if (!tagData) { static std::deque> empty; return empty; } return tagData->versionMessages; }; void peekMessagesFromMemory(Reference self, Tag tag, Version begin, BinaryWriter& messages, Version& endVersion) { ASSERT(!messages.getLength()); auto& deque = getVersionMessages(self, tag); //TraceEvent("TLogPeekMem", self->dbgid).detail("Tag", req.tag1).detail("PDS", self->persistentDataSequence).detail("PDDS", self->persistentDataDurableSequence).detail("Oldest", map1.empty() ? 0 : map1.begin()->key ).detail("OldestMsgCount", map1.empty() ? 0 : map1.begin()->value.size()); begin = std::max(begin, self->persistentDataDurableVersion + 1); auto it = std::lower_bound(deque.begin(), deque.end(), std::make_pair(begin, LengthPrefixedStringRef()), [](const auto& l, const auto& r) { return l.first < r.first; }); Version currentVersion = -1; for (; it != deque.end(); ++it) { if (it->first != currentVersion) { if (messages.getLength() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) { endVersion = currentVersion + 1; //TraceEvent("TLogPeekMessagesReached2", self->dbgid); break; } currentVersion = it->first; messages << VERSION_HEADER << currentVersion; } messages << it->second.toStringRef(); } } // Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request ACTOR template Future tLogPeekMessages(PromiseType replyPromise, TLogData* self, Reference logData, Version reqBegin, Tag reqTag, bool reqReturnIfBlocked = false, bool reqOnlySpilled = false, Optional> reqSequence = Optional>()) { state BinaryWriter messages(Unversioned()); state BinaryWriter messages2(Unversioned()); state int sequence = -1; state UID peekId; state double queueStart = now(); if (reqTag.locality == tagLocalityTxs && reqTag.id >= logData->txsTags && logData->txsTags > 0) { reqTag.id = reqTag.id % logData->txsTags; } if (reqSequence.present()) { try { peekId = reqSequence.get().first; sequence = reqSequence.get().second; if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && logData->peekTracker.find(peekId) == logData->peekTracker.end()) { throw operation_obsolete(); } auto& trackerData = logData->peekTracker[peekId]; if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) { trackerData.tag = reqTag; trackerData.sequence_version[0].send(std::make_pair(reqBegin, reqOnlySpilled)); } auto seqBegin = trackerData.sequence_version.begin(); while (trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) { if (seqBegin->second.canBeSet()) { seqBegin->second.sendError(operation_obsolete()); } trackerData.sequence_version.erase(seqBegin); seqBegin = trackerData.sequence_version.begin(); } if (trackerData.sequence_version.size() && sequence < seqBegin->first) { throw operation_obsolete(); } Future> fPrevPeekData = trackerData.sequence_version[sequence].getFuture(); if (fPrevPeekData.isReady()) { trackerData.unblockedPeeks++; double t = now() - trackerData.lastUpdate; if (t > trackerData.idleMax) trackerData.idleMax = t; trackerData.idleTime += t; } trackerData.lastUpdate = now(); std::pair prevPeekData = wait(fPrevPeekData); reqBegin = std::max(prevPeekData.first, reqBegin); reqOnlySpilled = prevPeekData.second; wait(yield()); } catch (Error& e) { if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) { replyPromise.sendError(e); return Void(); } else { throw; } } } state double blockStart = now(); if (reqReturnIfBlocked && logData->version.get() < reqBegin) { replyPromise.sendError(end_of_stream()); if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; auto& sequenceData = trackerData.sequence_version[sequence + 1]; if (!sequenceData.isSet()) { sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled)); } } return Void(); } //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); // Wait until we have something to return that the caller doesn't already have if (logData->version.get() < reqBegin) { wait(logData->version.whenAtLeast(reqBegin)); wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); } if (logData->locality != tagLocalitySatellite && reqTag.locality == tagLocalityLogRouter) { wait(self->concurrentLogRouterReads.take()); state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads); wait(delay(0.0, TaskPriority::Low)); } if (reqBegin <= logData->persistentDataDurableVersion && reqTag.locality != tagLocalityTxs && reqTag != txsTag) { // Reading spilled data will almost always imply that the storage server is >5s behind the rest // of the cluster. We shouldn't prioritize spending CPU on helping this server catch up // slightly faster over keeping the rest of the cluster operating normally. // txsTag is only ever peeked on recovery, and we would still wish to prioritize requests // that impact recovery duration. wait(delay(0, TaskPriority::TLogSpilledPeekReply)); } state double workStart = now(); Version poppedVer = poppedVersion(logData, reqTag); if (poppedVer > reqBegin) { TLogPeekReply rep; rep.maxKnownVersion = logData->version.get(); rep.minKnownCommittedVersion = logData->minKnownCommittedVersion; rep.popped = poppedVer; rep.end = poppedVer; rep.onlySpilled = false; if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; auto& sequenceData = trackerData.sequence_version[sequence + 1]; trackerData.lastUpdate = now(); if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { replyPromise.sendError(operation_obsolete()); if (!sequenceData.isSet()) sequenceData.sendError(operation_obsolete()); return Void(); } if (sequenceData.isSet()) { if (sequenceData.getFuture().get().first != rep.end) { TEST(true); // tlog peek second attempt ended at a different version replyPromise.sendError(operation_obsolete()); return Void(); } } else { sequenceData.send(std::make_pair(rep.end, rep.onlySpilled)); } rep.begin = reqBegin; } replyPromise.send(rep); return Void(); } state Version endVersion = logData->version.get() + 1; state bool onlySpilled = false; // grab messages from disk //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); if (reqBegin <= logData->persistentDataDurableVersion) { // Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We // may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if // an initial attempt to read from disk results in insufficient data and the required data is no longer in // memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the // result? if (reqOnlySpilled) { endVersion = logData->persistentDataDurableVersion + 1; } else { peekMessagesFromMemory(logData, reqTag, reqBegin, messages2, endVersion); } RangeResult kvs = wait(self->persistentData->readRange( KeyRangeRef(persistTagMessagesKey(logData->logId, reqTag, reqBegin), persistTagMessagesKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES)); //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().address).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); for (auto& kv : kvs) { auto ver = decodeTagMessagesKey(kv.key); messages << VERSION_HEADER << ver; messages.serializeBytes(kv.value); } if (kvs.expectedSize() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES) { endVersion = decodeTagMessagesKey(kvs.end()[-1].key) + 1; onlySpilled = true; } else { messages.serializeBytes(messages2.toValue()); } } else { peekMessagesFromMemory(logData, reqTag, reqBegin, messages, endVersion); //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().address).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); } TLogPeekReply reply; reply.maxKnownVersion = logData->version.get(); reply.minKnownCommittedVersion = logData->minKnownCommittedVersion; reply.messages = StringRef(reply.arena, messages.toValue()); reply.end = endVersion; reply.onlySpilled = onlySpilled; //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", replyPromise.getEndpoint().address); if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; trackerData.lastUpdate = now(); double queueT = blockStart - queueStart; double blockT = workStart - blockStart; double workT = now() - workStart; trackerData.totalPeeks++; trackerData.replyBytes += reply.messages.size(); if (queueT > trackerData.queueMax) trackerData.queueMax = queueT; if (blockT > trackerData.blockMax) trackerData.blockMax = blockT; if (workT > trackerData.workMax) trackerData.workMax = workT; trackerData.queueTime += queueT; trackerData.blockTime += blockT; trackerData.workTime += workT; auto& sequenceData = trackerData.sequence_version[sequence + 1]; if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { replyPromise.sendError(operation_obsolete()); if (!sequenceData.isSet()) sequenceData.sendError(operation_obsolete()); return Void(); } if (sequenceData.isSet()) { trackerData.duplicatePeeks++; if (sequenceData.getFuture().get().first != reply.end) { TEST(true); // tlog peek second attempt ended at a different version (2) replyPromise.sendError(operation_obsolete()); return Void(); } } else { sequenceData.send(std::make_pair(reply.end, reply.onlySpilled)); } reply.begin = reqBegin; } replyPromise.send(reply); return Void(); } // This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference logData) { self->activePeekStreams++; state Version begin = req.begin; state bool onlySpilled = false; req.reply.setByteLimit(std::min(SERVER_KNOBS->MAXIMUM_PEEK_BYTES, req.limitBytes)); loop { state TLogPeekStreamReply reply; state Promise promise; state Future future(promise.getFuture()); try { wait(req.reply.onReady() && store(reply.rep, future) && tLogPeekMessages(promise, self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled)); reply.rep.begin = begin; req.reply.send(reply); begin = reply.rep.end; onlySpilled = reply.rep.onlySpilled; if (reply.rep.end > logData->version.get()) { wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); } else { wait(delay(0, g_network->getCurrentTask())); } } catch (Error& e) { self->activePeekStreams--; TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId) .errorUnsuppressed(e) .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()); if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { req.reply.sendError(e); return Void(); } else { throw; } } } } ACTOR Future doQueueCommit(TLogData* self, Reference logData, std::vector> missingFinalCommit) { state Version ver = logData->version.get(); state Version commitNumber = self->queueCommitBegin + 1; state Version knownCommittedVersion = logData->knownCommittedVersion; self->queueCommitBegin = commitNumber; logData->queueCommittingVersion = ver; g_network->setCurrentTask(TaskPriority::TLogCommitReply); Future c = self->persistentQueue->commit(); self->diskQueueCommitBytes = 0; self->largeDiskQueueCommitBytes.set(false); wait(ioDegradedOrTimeoutError( c, SERVER_KNOBS->MAX_STORAGE_COMMIT_TIME, self->degraded, SERVER_KNOBS->TLOG_DEGRADED_DURATION)); if (g_network->isSimulated() && !g_simulator.speedUpSimulation && BUGGIFY_WITH_PROB(0.0001)) { wait(delay(6.0)); } wait(self->queueCommitEnd.whenAtLeast(commitNumber - 1)); // Calling check_yield instead of yield to avoid a destruction ordering problem in simulation if (g_network->check_yield(g_network->getCurrentTask())) { wait(delay(0, g_network->getCurrentTask())); } ASSERT(ver > logData->queueCommittedVersion.get()); logData->durableKnownCommittedVersion = knownCommittedVersion; if (logData->unpoppedRecoveredTags == 0 && knownCommittedVersion >= logData->recoveredAt && logData->recoveryComplete.canBeSet()) { TraceEvent("TLogRecoveryComplete", logData->logId) .detail("Tags", logData->unpoppedRecoveredTags) .detail("DurableKCVer", logData->durableKnownCommittedVersion) .detail("RecoveredAt", logData->recoveredAt); logData->recoveryComplete.send(Void()); } //TraceEvent("TLogCommitDurable", self->dbgid).detail("Version", ver); if (logData->logSystem->get() && (!logData->isPrimary || logData->logRouterPoppedVersion < logData->logRouterPopToVersion)) { logData->logRouterPoppedVersion = ver; logData->logSystem->get()->pop(ver, logData->remoteTag, knownCommittedVersion, logData->locality); } logData->queueCommittedVersion.set(ver); self->queueCommitEnd.set(commitNumber); for (auto& it : missingFinalCommit) { TraceEvent("TLogCommitMissingFinalCommit", self->dbgid) .detail("LogId", logData->logId) .detail("Version", it->version.get()) .detail("QueueVer", it->queueCommittedVersion.get()); TEST(true); // A TLog was replaced before having a chance to commit its queue it->queueCommittedVersion.set(it->version.get()); } return Void(); } ACTOR Future commitQueue(TLogData* self) { state Reference logData; state std::vector> missingFinalCommit; loop { int foundCount = 0; for (auto it : self->id_data) { if (!it.second->stopped) { logData = it.second; foundCount++; } else if (it.second->version.get() > std::max(it.second->queueCommittingVersion, it.second->queueCommittedVersion.get())) { missingFinalCommit.push_back(it.second); } } ASSERT(foundCount < 2); if (!foundCount) { wait(self->newLogData.onTrigger()); continue; } TraceEvent("CommitQueueNewLog", self->dbgid) .detail("LogId", logData->logId) .detail("Version", logData->version.get()) .detail("Committing", logData->queueCommittingVersion) .detail("Commmitted", logData->queueCommittedVersion.get()); if (logData->committingQueue.canBeSet()) { logData->committingQueue.send(Void()); } loop { if (logData->stopped && logData->version.get() == std::max(logData->queueCommittingVersion, logData->queueCommittedVersion.get())) { wait(logData->queueCommittedVersion.whenAtLeast(logData->version.get())); break; } choose { when(wait(logData->version.whenAtLeast( std::max(logData->queueCommittingVersion, logData->queueCommittedVersion.get()) + 1))) { while (self->queueCommitBegin != self->queueCommitEnd.get() && !self->largeDiskQueueCommitBytes.get()) { wait(self->queueCommitEnd.whenAtLeast(self->queueCommitBegin) || self->largeDiskQueueCommitBytes.onChange()); } if (logData->queueCommittedVersion.get() == std::numeric_limits::max()) { break; } self->sharedActors.send(doQueueCommit(self, logData, missingFinalCommit)); missingFinalCommit.clear(); } when(wait(self->newLogData.onTrigger())) {} } } } } ACTOR Future tLogCommit(TLogData* self, TLogCommitRequest req, Reference logData, PromiseStream warningCollectorInput) { state Optional tlogDebugID; if (req.debugID.present()) { tlogDebugID = nondeterministicRandom()->randomUniqueID(); g_traceBatch.addAttach("CommitAttachID", req.debugID.get().first(), tlogDebugID.get().first()); g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.BeforeWaitForVersion"); } logData->minKnownCommittedVersion = std::max(logData->minKnownCommittedVersion, req.minKnownCommittedVersion); wait(logData->version.whenAtLeast(req.prevVersion)); // Calling check_yield instead of yield to avoid a destruction ordering problem in simulation if (g_network->check_yield(g_network->getCurrentTask())) { wait(delay(0, g_network->getCurrentTask())); } state double waitStartT = 0; while (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES && !logData->stopped) { if (now() - waitStartT >= 1) { TraceEvent(SevWarn, "TLogUpdateLag", logData->logId) .detail("Version", logData->version.get()) .detail("PersistentDataVersion", logData->persistentDataVersion) .detail("PersistentDataDurableVersion", logData->persistentDataDurableVersion); waitStartT = now(); } wait(delayJittered(.005, TaskPriority::TLogCommit)); } if (logData->stopped) { req.reply.sendError(tlog_stopped()); return Void(); } if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on critical section between here self->version.set() below!) if (req.debugID.present()) g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.Before"); //TraceEvent("TLogCommit", logData->logId).detail("Version", req.version); commitMessages(self, logData, req.version, req.arena, req.messages); logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, req.knownCommittedVersion); TLogQueueEntryRef qe; // Log the changes to the persistent queue, to be committed by commitQueue() qe.version = req.version; qe.knownCommittedVersion = logData->knownCommittedVersion; qe.messages = req.messages; qe.id = logData->logId; self->persistentQueue->push(qe, logData); self->diskQueueCommitBytes += qe.expectedSize(); if (self->diskQueueCommitBytes > SERVER_KNOBS->MAX_QUEUE_COMMIT_BYTES) { self->largeDiskQueueCommitBytes.set(true); } // Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors logData->version.set(req.version); if (req.debugID.present()) g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.AfterTLogCommit"); } // Send replies only once all prior messages have been received and committed. state Future stopped = logData->stopCommit.onTrigger(); wait( timeoutWarning(logData->queueCommittedVersion.whenAtLeast(req.version) || stopped, 0.1, warningCollectorInput)); if (stopped.isReady()) { ASSERT(logData->stopped); req.reply.sendError(tlog_stopped()); return Void(); } if (req.debugID.present()) g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.After"); req.reply.send(logData->durableKnownCommittedVersion); return Void(); } ACTOR Future initPersistentState(TLogData* self, Reference logData) { wait(self->persistentDataCommitLock.take()); state FlowLock::Releaser commitLockReleaser(self->persistentDataCommitLock); // PERSIST: Initial setup of persistentData for a brand new tLog for a new database state IKeyValueStore* storage = self->persistentData; wait(ioTimeoutError(storage->init(), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION)); storage->set(persistFormat); storage->set( KeyValueRef(BinaryWriter::toValue(logData->logId, Unversioned()).withPrefix(persistCurrentVersionKeys.begin), BinaryWriter::toValue(logData->version.get(), Unversioned()))); storage->set(KeyValueRef( BinaryWriter::toValue(logData->logId, Unversioned()).withPrefix(persistKnownCommittedVersionKeys.begin), BinaryWriter::toValue(logData->knownCommittedVersion, Unversioned()))); storage->set(KeyValueRef(BinaryWriter::toValue(logData->logId, Unversioned()).withPrefix(persistLocalityKeys.begin), BinaryWriter::toValue(logData->locality, Unversioned()))); storage->set( KeyValueRef(BinaryWriter::toValue(logData->logId, Unversioned()).withPrefix(persistLogRouterTagsKeys.begin), BinaryWriter::toValue(logData->logRouterTags, Unversioned()))); storage->set(KeyValueRef(BinaryWriter::toValue(logData->logId, Unversioned()).withPrefix(persistTxsTagsKeys.begin), BinaryWriter::toValue(logData->txsTags, Unversioned()))); storage->set( KeyValueRef(BinaryWriter::toValue(logData->logId, Unversioned()).withPrefix(persistRecoveryCountKeys.begin), BinaryWriter::toValue(logData->recoveryCount, Unversioned()))); for (auto tag : logData->allTags) { ASSERT(!logData->getTagData(tag)); logData->createTagData(tag, 0, true, true, true); updatePersistentPopped(self, logData, logData->getTagData(tag)); } TraceEvent("TLogInitCommit", logData->logId).log(); wait(ioTimeoutError(self->persistentData->commit(), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION)); return Void(); } ACTOR Future rejoinClusterController(TLogData* self, TLogInterface tli, DBRecoveryCount recoveryCount, Future registerWithCC, bool isPrimary) { state LifetimeToken lastMasterLifetime; loop { auto const& inf = self->dbInfo->get(); bool isDisplaced = !std::count(inf.priorCommittedLogServers.begin(), inf.priorCommittedLogServers.end(), tli.id()); if (isPrimary) { isDisplaced = isDisplaced && inf.recoveryCount >= recoveryCount && inf.recoveryState != RecoveryState::UNINITIALIZED; } else { isDisplaced = isDisplaced && ((inf.recoveryCount > recoveryCount && inf.recoveryState != RecoveryState::UNINITIALIZED) || (inf.recoveryCount == recoveryCount && inf.recoveryState == RecoveryState::FULLY_RECOVERED)); } isDisplaced = isDisplaced && !inf.logSystemConfig.hasTLog(tli.id()); if (isDisplaced) { TraceEvent("TLogDisplaced", tli.id()) .detail("Reason", "DBInfoDoesNotContain") .detail("RecoveryCount", recoveryCount) .detail("InfRecoveryCount", inf.recoveryCount) .detail("RecoveryState", (int)inf.recoveryState) .detail("LogSysConf", describe(inf.logSystemConfig.tLogs)) .detail("PriorLogs", describe(inf.priorCommittedLogServers)) .detail("OldLogGens", inf.logSystemConfig.oldTLogs.size()); if (BUGGIFY) wait(delay(SERVER_KNOBS->BUGGIFY_WORKER_REMOVED_MAX_LAG * deterministicRandom()->random01())); throw worker_removed(); } if (registerWithCC.isReady()) { if (!lastMasterLifetime.isEqual(self->dbInfo->get().masterLifetime)) { // The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our // TLogInterface TLogRejoinRequest req(tli); TraceEvent("TLogRejoining", tli.id()) .detail("ClusterController", self->dbInfo->get().clusterInterface.id()) .detail("DbInfoMasterLifeTime", self->dbInfo->get().masterLifetime.toString()) .detail("LastMasterLifeTime", lastMasterLifetime.toString()); choose { when(TLogRejoinReply rep = wait( brokenPromiseToNever(self->dbInfo->get().clusterInterface.tlogRejoin.getReply(req)))) { if (rep.masterIsRecovered) lastMasterLifetime = self->dbInfo->get().masterLifetime; } when(wait(self->dbInfo->onChange())) {} } } else { wait(self->dbInfo->onChange()); } } else { wait(registerWithCC || self->dbInfo->onChange()); } } } ACTOR Future respondToRecovered(TLogInterface tli, Promise recoveryComplete) { state bool finishedRecovery = true; try { wait(recoveryComplete.getFuture()); } catch (Error& e) { if (e.code() != error_code_end_of_stream) { throw; } finishedRecovery = false; } TraceEvent("TLogRespondToRecovered", tli.id()).detail("Finished", finishedRecovery); loop { TLogRecoveryFinishedRequest req = waitNext(tli.recoveryFinished.getFuture()); if (finishedRecovery) { req.reply.send(Void()); } else { req.reply.send(Never()); } } } ACTOR Future cleanupPeekTrackers(LogData* logData) { loop { double minTimeUntilExpiration = SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME; auto it = logData->peekTracker.begin(); while (it != logData->peekTracker.end()) { double timeUntilExpiration = it->second.lastUpdate + SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME - now(); if (timeUntilExpiration < 1.0e-6) { for (auto seq : it->second.sequence_version) { if (!seq.second.isSet()) { seq.second.sendError(timed_out()); } } it = logData->peekTracker.erase(it); } else { minTimeUntilExpiration = std::min(minTimeUntilExpiration, timeUntilExpiration); ++it; } } wait(delay(minTimeUntilExpiration)); } } ACTOR Future logPeekTrackers(LogData* logData) { loop { int64_t logThreshold = 1; if (logData->peekTracker.size() > SERVER_KNOBS->PEEK_LOGGING_AMOUNT) { std::vector peekCounts; peekCounts.reserve(logData->peekTracker.size()); for (auto& it : logData->peekTracker) { peekCounts.push_back(it.second.totalPeeks); } size_t pivot = peekCounts.size() - SERVER_KNOBS->PEEK_LOGGING_AMOUNT; std::nth_element(peekCounts.begin(), peekCounts.begin() + pivot, peekCounts.end()); logThreshold = std::max(1, peekCounts[pivot]); } int logCount = 0; for (auto& it : logData->peekTracker) { if (it.second.totalPeeks >= logThreshold) { logCount++; TraceEvent("PeekMetrics", logData->logId) .detail("Tag", it.second.tag.toString()) .detail("Elapsed", now() - it.second.lastLogged) .detail("MeanReplyBytes", it.second.replyBytes / it.second.totalPeeks) .detail("TotalPeeks", it.second.totalPeeks) .detail("UnblockedPeeks", it.second.unblockedPeeks) .detail("DuplicatePeeks", it.second.duplicatePeeks) .detail("Sequence", it.second.sequence_version.size() ? it.second.sequence_version.begin()->first : -1) .detail("IdleSeconds", it.second.idleTime) .detail("IdleMax", it.second.idleMax) .detail("QueueSeconds", it.second.queueTime) .detail("QueueMax", it.second.queueMax) .detail("BlockSeconds", it.second.blockTime) .detail("BlockMax", it.second.blockMax) .detail("WorkSeconds", it.second.workTime) .detail("WorkMax", it.second.workMax); it.second.resetMetrics(); } } wait(delay(SERVER_KNOBS->PEEK_LOGGING_DELAY * std::max(1, logCount))); } } void getQueuingMetrics(TLogData* self, Reference logData, TLogQueuingMetricsRequest const& req) { TLogQueuingMetricsReply reply; reply.localTime = now(); reply.instanceID = self->instanceID; reply.bytesInput = self->bytesInput; reply.bytesDurable = self->bytesDurable; reply.storageBytes = self->persistentData->getStorageBytes(); // FIXME: Add the knownCommittedVersion to this message and change ratekeeper to use that version. reply.v = logData->durableKnownCommittedVersion; req.reply.send(reply); } ACTOR Future tLogSnapCreate(TLogSnapRequest snapReq, TLogData* self, Reference logData) { if (self->ignorePopUid != snapReq.snapUID.toString()) { snapReq.reply.sendError(operation_failed()); return Void(); } ExecCmdValueString snapArg(snapReq.snapPayload); try { int err = wait(execHelper(&snapArg, snapReq.snapUID, self->dataFolder, snapReq.role.toString())); std::string uidStr = snapReq.snapUID.toString(); TraceEvent("ExecTraceTLog") .detail("Uid", uidStr) .detail("Status", err) .detail("Role", snapReq.role) .detail("Value", self->dataFolder) .detail("ExecPayload", snapReq.snapPayload) .detail("PersistentDataVersion", logData->persistentDataVersion) .detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion) .detail("QueueCommittedVersion", logData->queueCommittedVersion.get()) .detail("Version", logData->version.get()); if (err != 0) { throw operation_failed(); } snapReq.reply.send(Void()); } catch (Error& e) { TraceEvent("TLogSnapCreateError").errorUnsuppressed(e); if (e.code() != error_code_operation_cancelled) { snapReq.reply.sendError(e); } else { throw e; } } return Void(); } ACTOR Future tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData* self, Reference logData) { if (self->ignorePopUid != enablePopReq.snapUID.toString()) { TraceEvent(SevWarn, "TLogPopDisableEnableUidMismatch") .detail("IgnorePopUid", self->ignorePopUid) .detail("UidStr", enablePopReq.snapUID.toString()); enablePopReq.reply.sendError(operation_failed()); return Void(); } TraceEvent("EnableTLogPlayAllIgnoredPops2").log(); // use toBePopped and issue all the pops std::map::iterator it; std::vector> ignoredPops; self->ignorePopRequest = false; self->ignorePopDeadline = 0.0; self->ignorePopUid = ""; for (it = logData->toBePopped.begin(); it != logData->toBePopped.end(); it++) { TraceEvent("PlayIgnoredPop").detail("Tag", it->first.toString()).detail("Version", it->second); ignoredPops.push_back(tLogPopCore(self, it->first, it->second, logData)); } TraceEvent("TLogExecCmdPopEnable") .detail("UidStr", enablePopReq.snapUID.toString()) .detail("IgnorePopUid", self->ignorePopUid) .detail("IgnporePopRequest", self->ignorePopRequest) .detail("IgnporePopDeadline", self->ignorePopDeadline) .detail("PersistentDataVersion", logData->persistentDataVersion) .detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion) .detail("QueueCommittedVersion", logData->queueCommittedVersion.get()) .detail("Version", logData->version.get()); wait(waitForAll(ignoredPops)); logData->toBePopped.clear(); enablePopReq.reply.send(Void()); return Void(); } ACTOR Future serveTLogInterface(TLogData* self, TLogInterface tli, Reference logData, PromiseStream warningCollectorInput) { state Future dbInfoChange = Void(); loop choose { when(wait(dbInfoChange)) { dbInfoChange = self->dbInfo->onChange(); bool found = false; if (self->dbInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS) { for (auto& logs : self->dbInfo->get().logSystemConfig.tLogs) { if (std::count(logs.tLogs.begin(), logs.tLogs.end(), logData->logId)) { found = true; break; } } } if (found && self->dbInfo->get().logSystemConfig.recruitmentID == logData->recruitmentID) { logData->logSystem->set(ILogSystem::fromServerDBInfo(self->dbgid, self->dbInfo->get())); if (!logData->isPrimary) { logData->logSystem->get()->pop(logData->logRouterPoppedVersion, logData->remoteTag, logData->durableKnownCommittedVersion, logData->locality); } if (!logData->isPrimary && logData->stopped) { TraceEvent("TLogAlreadyStopped", self->dbgid).log(); logData->removed = logData->removed && logData->logSystem->get()->endEpoch(); } } else { logData->logSystem->set(Reference()); } } when(TLogPeekRequest req = waitNext(tli.peekMessages.getFuture())) { logData->addActor.send(tLogPeekMessages( req.reply, self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence)); } when(TLogPeekStreamRequest req = waitNext(tli.peekStreamMessages.getFuture())) { TraceEvent(SevDebug, "TLogPeekStream", logData->logId) .detail("Token", tli.peekStreamMessages.getEndpoint().token); logData->addActor.send(tLogPeekStream(self, req, logData)); } when(TLogPopRequest req = waitNext(tli.popMessages.getFuture())) { logData->addActor.send(tLogPop(self, req, logData)); } when(TLogCommitRequest req = waitNext(tli.commit.getFuture())) { //TraceEvent("TLogCommitReq", logData->logId).detail("Ver", req.version).detail("PrevVer", req.prevVersion).detail("LogVer", logData->version.get()); ASSERT(logData->isPrimary); TEST(logData->stopped); // TLogCommitRequest while stopped if (!logData->stopped) logData->addActor.send(tLogCommit(self, req, logData, warningCollectorInput)); else req.reply.sendError(tlog_stopped()); } when(ReplyPromise reply = waitNext(tli.lock.getFuture())) { logData->addActor.send(tLogLock(self, reply, logData)); } when(TLogQueuingMetricsRequest req = waitNext(tli.getQueuingMetrics.getFuture())) { getQueuingMetrics(self, logData, req); } when(TLogConfirmRunningRequest req = waitNext(tli.confirmRunning.getFuture())) { if (req.debugID.present()) { UID tlogDebugID = nondeterministicRandom()->randomUniqueID(); g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), tlogDebugID.first()); g_traceBatch.addEvent("TransactionDebug", tlogDebugID.first(), "TLogServer.TLogConfirmRunningRequest"); } if (!logData->stopped) req.reply.send(Void()); else req.reply.sendError(tlog_stopped()); } when(TLogDisablePopRequest req = waitNext(tli.disablePopRequest.getFuture())) { if (self->ignorePopUid != "") { TraceEvent(SevWarn, "TLogPopDisableonDisable") .detail("IgnorePopUid", self->ignorePopUid) .detail("UidStr", req.snapUID.toString()) .detail("PersistentDataVersion", logData->persistentDataVersion) .detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion) .detail("QueueCommittedVersion", logData->queueCommittedVersion.get()) .detail("Version", logData->version.get()); req.reply.sendError(operation_failed()); } else { // FIXME: As part of reverting snapshot V1, make ignorePopUid a UID instead of string self->ignorePopRequest = true; self->ignorePopUid = req.snapUID.toString(); self->ignorePopDeadline = g_network->now() + SERVER_KNOBS->TLOG_IGNORE_POP_AUTO_ENABLE_DELAY; req.reply.send(Void()); } } when(TLogEnablePopRequest enablePopReq = waitNext(tli.enablePopRequest.getFuture())) { logData->addActor.send(tLogEnablePopReq(enablePopReq, self, logData)); } when(TLogSnapRequest snapReq = waitNext(tli.snapRequest.getFuture())) { logData->addActor.send(tLogSnapCreate(snapReq, self, logData)); } } } void removeLog(TLogData* self, Reference logData) { TraceEvent("TLogRemoved", logData->logId) .detail("Input", logData->bytesInput.getValue()) .detail("Durable", logData->bytesDurable.getValue()); logData->stopped = true; if (!logData->recoveryComplete.isSet()) { logData->recoveryComplete.sendError(end_of_stream()); } logData->addActor = PromiseStream>(); // there could be items still in the promise stream if one of the // actors threw an error immediately self->id_data.erase(logData->logId); if (self->id_data.size()) { return; } else { throw worker_removed(); } if (logData->queueCommittingVersion == 0) { // If the removed tlog never attempted a queue commit, the update storage loop could become stuck waiting for // queueCommittedVersion to advance. logData->queueCommittedVersion.set(std::numeric_limits::max()); } } ACTOR Future pullAsyncData(TLogData* self, Reference logData, std::vector tags, Version beginVersion, Optional endVersion, bool poppedIsKnownCommitted) { state Future dbInfoChange = Void(); state Reference r; state Version tagAt = beginVersion; state Version lastVer = 0; while (!endVersion.present() || logData->version.get() < endVersion.get()) { loop { choose { when(wait(r ? r->getMore(TaskPriority::TLogCommit) : Never())) { break; } when(wait(dbInfoChange)) { if (logData->logSystem->get()) { r = logData->logSystem->get()->peek(logData->logId, tagAt, endVersion, tags, true); } else { r = Reference(); } dbInfoChange = logData->logSystem->onChange(); } } } state double waitStartT = 0; while (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES && !logData->stopped) { if (now() - waitStartT >= 1) { TraceEvent(SevWarn, "TLogUpdateLag", logData->logId) .detail("Version", logData->version.get()) .detail("PersistentDataVersion", logData->persistentDataVersion) .detail("PersistentDataDurableVersion", logData->persistentDataDurableVersion); waitStartT = now(); } wait(delayJittered(.005, TaskPriority::TLogCommit)); } state Version ver = 0; state std::vector messages; loop { state bool foundMessage = r->hasMessage(); if (!foundMessage || r->version().version != ver) { ASSERT(r->version().version > lastVer); if (ver) { if (logData->stopped || (endVersion.present() && ver > endVersion.get())) { return Void(); } if (poppedIsKnownCommitted) { logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, r->popped()); logData->minKnownCommittedVersion = std::max(logData->minKnownCommittedVersion, r->getMinKnownCommittedVersion()); } commitMessages(self, logData, ver, messages); if (self->terminated.isSet()) { return Void(); } // Log the changes to the persistent queue, to be committed by commitQueue() AlternativeTLogQueueEntryRef qe; qe.version = ver; qe.knownCommittedVersion = logData->knownCommittedVersion; qe.alternativeMessages = &messages; qe.id = logData->logId; self->persistentQueue->push(qe, logData); self->diskQueueCommitBytes += qe.expectedSize(); if (self->diskQueueCommitBytes > SERVER_KNOBS->MAX_QUEUE_COMMIT_BYTES) { self->largeDiskQueueCommitBytes.set(true); } // Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages // actors logData->version.set(ver); wait(yield(TaskPriority::TLogCommit)); } lastVer = ver; ver = r->version().version; messages.clear(); if (!foundMessage) { ver--; if (ver > logData->version.get()) { if (logData->stopped || (endVersion.present() && ver > endVersion.get())) { return Void(); } if (poppedIsKnownCommitted) { logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, r->popped()); logData->minKnownCommittedVersion = std::max(logData->minKnownCommittedVersion, r->getMinKnownCommittedVersion()); } if (self->terminated.isSet()) { return Void(); } // Log the changes to the persistent queue, to be committed by commitQueue() TLogQueueEntryRef qe; qe.version = ver; qe.knownCommittedVersion = logData->knownCommittedVersion; qe.messages = StringRef(); qe.id = logData->logId; self->persistentQueue->push(qe, logData); self->diskQueueCommitBytes += qe.expectedSize(); if (self->diskQueueCommitBytes > SERVER_KNOBS->MAX_QUEUE_COMMIT_BYTES) { self->largeDiskQueueCommitBytes.set(true); } // Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages // actors logData->version.set(ver); wait(yield(TaskPriority::TLogCommit)); } break; } } messages.emplace_back(r->getMessageWithTags(), r->getTags()); r->nextMessage(); } tagAt = std::max(r->version().version, logData->version.get() + 1); } return Void(); } ACTOR Future tLogCore(TLogData* self, Reference logData, TLogInterface tli, bool pulledRecoveryVersions) { if (logData->removed.isReady()) { wait(delay(0)); // to avoid iterator invalidation in restorePersistentState when removed is already ready ASSERT(logData->removed.isError()); if (logData->removed.getError().code() != error_code_worker_removed) { throw logData->removed.getError(); } removeLog(self, logData); return Void(); } state PromiseStream warningCollectorInput; state Future warningCollector = timeoutWarningCollector(warningCollectorInput.getFuture(), 1.0, "TLogQueueCommitSlow", self->dbgid); state Future error = actorCollection(logData->addActor.getFuture()); logData->addActor.send(waitFailureServer(tli.waitFailure.getFuture())); logData->addActor.send(logData->removed); // FIXME: update tlogMetrics to include new information, or possibly only have one copy for the shared instance logData->addActor.send(traceCounters("TLogMetrics", logData->logId, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &logData->cc, logData->logId.toString() + "/TLogMetrics")); logData->addActor.send(serveTLogInterface(self, tli, logData, warningCollectorInput)); logData->addActor.send(cleanupPeekTrackers(logData.getPtr())); logData->addActor.send(logPeekTrackers(logData.getPtr())); if (!logData->isPrimary) { std::vector tags; tags.push_back(logData->remoteTag); logData->addActor.send( pullAsyncData(self, logData, tags, pulledRecoveryVersions ? logData->recoveredAt + 1 : logData->unrecoveredBefore, Optional(), true)); } try { wait(error); throw internal_error(); } catch (Error& e) { if (e.code() != error_code_worker_removed) throw; removeLog(self, logData); return Void(); } } ACTOR Future checkEmptyQueue(TLogData* self) { TraceEvent("TLogCheckEmptyQueueBegin", self->dbgid).log(); try { TLogQueueEntry r = wait(self->persistentQueue->readNext(self)); throw internal_error(); } catch (Error& e) { if (e.code() != error_code_end_of_stream) throw; TraceEvent("TLogCheckEmptyQueueEnd", self->dbgid).log(); return Void(); } } ACTOR Future checkRecovered(TLogData* self) { TraceEvent("TLogCheckRecoveredBegin", self->dbgid).log(); Optional v = wait(self->persistentData->readValue(StringRef())); TraceEvent("TLogCheckRecoveredEnd", self->dbgid).log(); return Void(); } ACTOR Future restorePersistentState(TLogData* self, LocalityData locality, Promise oldLog, Promise recovered, PromiseStream tlogRequests) { state double startt = now(); state Reference logData; state KeyRange tagKeys; // PERSIST: Read basic state from persistentData; replay persistentQueue but don't erase it TraceEvent("TLogRestorePersistentState", self->dbgid).log(); state IKeyValueStore* storage = self->persistentData; wait(storage->init()); state Future> fFormat = storage->readValue(persistFormat.key); state Future fVers = storage->readRange(persistCurrentVersionKeys); state Future fKnownCommitted = storage->readRange(persistKnownCommittedVersionKeys); state Future fLocality = storage->readRange(persistLocalityKeys); state Future fLogRouterTags = storage->readRange(persistLogRouterTagsKeys); state Future fTxsTags = storage->readRange(persistTxsTagsKeys); state Future fRecoverCounts = storage->readRange(persistRecoveryCountKeys); // FIXME: metadata in queue? wait(waitForAll(std::vector{ fFormat })); wait(waitForAll(std::vector{ fVers, fKnownCommitted, fLocality, fLogRouterTags, fTxsTags, fRecoverCounts })); if (fFormat.get().present() && !persistFormatReadableRange.contains(fFormat.get().get())) { // FIXME: remove when we no longer need to test upgrades from 4.X releases if (g_network->isSimulated()) { TraceEvent("ElapsedTime").detail("SimTime", now()).detail("RealTime", 0).detail("RandomUnseed", 0); flushAndExit(0); } TraceEvent(SevError, "UnsupportedDBFormat", self->dbgid) .detail("Format", fFormat.get().get()) .detail("Expected", persistFormat.value.toString()); throw worker_recovery_failed(); } if (!fFormat.get().present()) { RangeResult v = wait(self->persistentData->readRange(KeyRangeRef(StringRef(), LiteralStringRef("\xff")), 1)); if (!v.size()) { TEST(true); // The DB is completely empty, so it was never initialized. Delete it. throw worker_removed(); } else { // This should never happen TraceEvent(SevError, "NoDBFormatKey", self->dbgid).detail("FirstKey", v[0].key); ASSERT(false); throw worker_recovery_failed(); } } state std::vector>> removed; if (fFormat.get().get() == LiteralStringRef("FoundationDB/LogServer/2/3")) { // FIXME: need for upgrades from 5.X to 6.0, remove once this upgrade path is no longer needed if (recovered.canBeSet()) recovered.send(Void()); oldLog.send(Void()); while (!tlogRequests.isEmpty()) { tlogRequests.getFuture().pop().reply.sendError(recruitment_failed()); } wait(oldTLog_4_6::tLog( self->persistentData, self->rawPersistentQueue, self->dbInfo, locality, self->dbgid, self->workerID)); throw internal_error(); } ASSERT(fVers.get().size() == fRecoverCounts.get().size()); state std::map id_locality; for (auto it : fLocality.get()) { id_locality[BinaryReader::fromStringRef(it.key.removePrefix(persistLocalityKeys.begin), Unversioned())] = BinaryReader::fromStringRef(it.value, Unversioned()); } state std::map id_logRouterTags; for (auto it : fLogRouterTags.get()) { id_logRouterTags[BinaryReader::fromStringRef(it.key.removePrefix(persistLogRouterTagsKeys.begin), Unversioned())] = BinaryReader::fromStringRef(it.value, Unversioned()); } state std::map id_txsTags; for (auto it : fTxsTags.get()) { id_txsTags[BinaryReader::fromStringRef(it.key.removePrefix(persistTxsTagsKeys.begin), Unversioned())] = BinaryReader::fromStringRef(it.value, Unversioned()); } state std::map id_knownCommitted; for (auto it : fKnownCommitted.get()) { id_knownCommitted[BinaryReader::fromStringRef(it.key.removePrefix(persistKnownCommittedVersionKeys.begin), Unversioned())] = BinaryReader::fromStringRef(it.value, Unversioned()); } state int idx = 0; state Promise registerWithCC; state std::map id_interf; for (idx = 0; idx < fVers.get().size(); idx++) { state KeyRef rawId = fVers.get()[idx].key.removePrefix(persistCurrentVersionKeys.begin); UID id1 = BinaryReader::fromStringRef(rawId, Unversioned()); UID id2 = BinaryReader::fromStringRef( fRecoverCounts.get()[idx].key.removePrefix(persistRecoveryCountKeys.begin), Unversioned()); ASSERT(id1 == id2); TLogInterface recruited(id1, self->dbgid, locality); recruited.initEndpoints(); DUMPTOKEN(recruited.peekMessages); DUMPTOKEN(recruited.peekStreamMessages); DUMPTOKEN(recruited.popMessages); DUMPTOKEN(recruited.commit); DUMPTOKEN(recruited.lock); DUMPTOKEN(recruited.getQueuingMetrics); DUMPTOKEN(recruited.confirmRunning); DUMPTOKEN(recruited.waitFailure); DUMPTOKEN(recruited.recoveryFinished); DUMPTOKEN(recruited.disablePopRequest); DUMPTOKEN(recruited.enablePopRequest); DUMPTOKEN(recruited.snapRequest); // We do not need the remoteTag, because we will not be loading any additional data logData = Reference(new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], id_txsTags[id1], UID(), std::vector(), "Restored")); logData->locality = id_locality[id1]; logData->stopped = true; self->id_data[id1] = logData; id_interf[id1] = recruited; logData->knownCommittedVersion = id_knownCommitted[id1]; Version ver = BinaryReader::fromStringRef(fVers.get()[idx].value, Unversioned()); logData->persistentDataVersion = ver; logData->persistentDataDurableVersion = ver; logData->version.set(ver); logData->recoveryCount = BinaryReader::fromStringRef(fRecoverCounts.get()[idx].value, Unversioned()); logData->removed = rejoinClusterController(self, recruited, logData->recoveryCount, registerWithCC.getFuture(), false); removed.push_back(errorOr(logData->removed)); TraceEvent("TLogRestorePersistentStateVer", id1).detail("Ver", ver); // Restore popped keys. Pop operations that took place after the last (committed) updatePersistentDataVersion // might be lost, but that is fine because we will get the corresponding data back, too. tagKeys = prefixRange(rawId.withPrefix(persistTagPoppedKeys.begin)); loop { if (logData->removed.isReady()) break; RangeResult data = wait(self->persistentData->readRange(tagKeys, BUGGIFY ? 3 : 1 << 30, 1 << 20)); if (!data.size()) break; ((KeyRangeRef&)tagKeys) = KeyRangeRef(keyAfter(data.back().key, tagKeys.arena()), tagKeys.end); for (auto& kv : data) { Tag tag = decodeTagPoppedKey(rawId, kv.key); Version popped = decodeTagPoppedValue(kv.value); TraceEvent("TLogRestorePopped", logData->logId).detail("Tag", tag.toString()).detail("To", popped); auto tagData = logData->getTagData(tag); ASSERT(!tagData); logData->createTagData(tag, popped, false, false, false); } } } state Future allRemoved = waitForAll(removed); state UID lastId = UID(1, 1); // initialized so it will not compare equal to a default UID state double recoverMemoryLimit = SERVER_KNOBS->TLOG_RECOVER_MEMORY_LIMIT; if (BUGGIFY) recoverMemoryLimit = std::max(SERVER_KNOBS->BUGGIFY_RECOVER_MEMORY_LIMIT, SERVER_KNOBS->TLOG_SPILL_THRESHOLD); try { loop { if (allRemoved.isReady()) { TEST(true); // all tlogs removed during queue recovery throw worker_removed(); } choose { when(TLogQueueEntry qe = wait(self->persistentQueue->readNext(self))) { if (!self->queueOrder.size() || self->queueOrder.back() != qe.id) self->queueOrder.push_back(qe.id); if (qe.id != lastId) { lastId = qe.id; auto it = self->id_data.find(qe.id); if (it != self->id_data.end()) { logData = it->second; } else { logData = Reference(); } } //TraceEvent("TLogRecoveredQE", self->dbgid).detail("LogId", qe.id).detail("Ver", qe.version).detail("MessageBytes", qe.messages.size()).detail("Tags", qe.tags.size()) // .detail("Tag0", qe.tags.size() ? qe.tags[0].tag : invalidTag).detail("Version", // logData->version.get()); if (logData) { logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, qe.knownCommittedVersion); if (qe.version > logData->version.get()) { commitMessages(self, logData, qe.version, qe.arena(), qe.messages); logData->version.set(qe.version); logData->queueCommittedVersion.set(qe.version); while (self->bytesInput - self->bytesDurable >= recoverMemoryLimit) { TEST(true); // Flush excess data during TLog queue recovery TraceEvent("FlushLargeQueueDuringRecovery", self->dbgid) .detail("BytesInput", self->bytesInput) .detail("BytesDurable", self->bytesDurable) .detail("Version", logData->version.get()) .detail("PVer", logData->persistentDataVersion); choose { when(wait(updateStorage(self))) {} when(wait(allRemoved)) { throw worker_removed(); } } } } } } when(wait(allRemoved)) { throw worker_removed(); } } } } catch (Error& e) { if (e.code() != error_code_end_of_stream) throw; } TraceEvent("TLogRestorePersistentStateDone", self->dbgid).detail("Took", now() - startt); TEST(now() - startt >= 1.0); // TLog recovery took more than 1 second for (auto it : self->id_data) { if (it.second->queueCommittedVersion.get() == 0) { TraceEvent("TLogZeroVersion", self->dbgid).detail("LogId", it.first); it.second->queueCommittedVersion.set(it.second->version.get()); } it.second->recoveryComplete.sendError(end_of_stream()); self->sharedActors.send(tLogCore(self, it.second, id_interf[it.first], false)); } if (registerWithCC.canBeSet()) registerWithCC.send(Void()); return Void(); } bool tlogTerminated(TLogData* self, IKeyValueStore* persistentData, TLogQueue* persistentQueue, Error const& e) { // Dispose the IKVS (destroying its data permanently) only if this shutdown is definitely permanent. Otherwise just // close it. if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed) { persistentData->dispose(); persistentQueue->dispose(); } else { persistentData->close(); persistentQueue->close(); } if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed || e.code() == error_code_file_not_found) { TraceEvent("TLogTerminated", self->dbgid).errorUnsuppressed(e); return true; } else return false; } ACTOR Future updateLogSystem(TLogData* self, Reference logData, LogSystemConfig recoverFrom, Reference>> logSystem) { loop { bool found = false; if (self->dbInfo->get().logSystemConfig.recruitmentID == logData->recruitmentID) { if (self->dbInfo->get().logSystemConfig.isNextGenerationOf(recoverFrom)) { logSystem->set(ILogSystem::fromOldLogSystemConfig( logData->logId, self->dbInfo->get().myLocality, self->dbInfo->get().logSystemConfig)); found = true; } else if (self->dbInfo->get().logSystemConfig.isEqualIds(recoverFrom)) { logSystem->set(ILogSystem::fromLogSystemConfig( logData->logId, self->dbInfo->get().myLocality, self->dbInfo->get().logSystemConfig, false, true)); found = true; } else if (self->dbInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS) { logSystem->set(ILogSystem::fromLogSystemConfig( logData->logId, self->dbInfo->get().myLocality, self->dbInfo->get().logSystemConfig, true)); found = true; } } if (!found) { logSystem->set(Reference()); } else { logData->logSystem->get()->pop(logData->logRouterPoppedVersion, logData->remoteTag, logData->durableKnownCommittedVersion, logData->locality); } TraceEvent("TLogUpdate", self->dbgid) .detail("LogId", logData->logId) .detail("RecruitmentID", logData->recruitmentID) .detail("DbRecruitmentID", self->dbInfo->get().logSystemConfig.recruitmentID) .detail("RecoverFrom", recoverFrom.toString()) .detail("DbInfo", self->dbInfo->get().logSystemConfig.toString()) .detail("Found", found) .detail("LogSystem", (bool)logSystem->get()) .detail("RecoveryState", (int)self->dbInfo->get().recoveryState); for (auto it : self->dbInfo->get().logSystemConfig.oldTLogs) { TraceEvent("TLogUpdateOld", self->dbgid).detail("LogId", logData->logId).detail("DbInfo", it.toString()); } wait(self->dbInfo->onChange()); } } ACTOR Future tLogStart(TLogData* self, InitializeTLogRequest req, LocalityData locality) { state TLogInterface recruited(self->dbgid, locality); recruited.initEndpoints(); DUMPTOKEN(recruited.peekMessages); DUMPTOKEN(recruited.peekStreamMessages); DUMPTOKEN(recruited.popMessages); DUMPTOKEN(recruited.commit); DUMPTOKEN(recruited.lock); DUMPTOKEN(recruited.getQueuingMetrics); DUMPTOKEN(recruited.confirmRunning); DUMPTOKEN(recruited.waitFailure); DUMPTOKEN(recruited.recoveryFinished); DUMPTOKEN(recruited.disablePopRequest); DUMPTOKEN(recruited.enablePopRequest); DUMPTOKEN(recruited.snapRequest); for (auto it : self->id_data) { if (!it.second->stopped) { TraceEvent("TLogStoppedByNewRecruitment", self->dbgid) .detail("StoppedId", it.first.toString()) .detail("RecruitedId", recruited.id()) .detail("EndEpoch", it.second->logSystem->get().getPtr() != 0); if (!it.second->isPrimary && it.second->logSystem->get()) { it.second->removed = it.second->removed && it.second->logSystem->get()->endEpoch(); } if (it.second->committingQueue.canBeSet()) { it.second->committingQueue.sendError(worker_removed()); } } it.second->stopped = true; if (!it.second->recoveryComplete.isSet()) { it.second->recoveryComplete.sendError(end_of_stream()); } it.second->stopCommit.trigger(); } bool recovering = (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned); state Reference logData = Reference(new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, req.allTags, recovering ? "Recovered" : "Recruited")); self->id_data[recruited.id()] = logData; logData->locality = req.locality; logData->recoveryCount = req.epoch; logData->removed = rejoinClusterController(self, recruited, req.epoch, Future(Void()), req.isPrimary); self->queueOrder.push_back(recruited.id()); TraceEvent("TLogStart", logData->logId).log(); state Future updater; state bool pulledRecoveryVersions = false; try { if (logData->removed.isReady()) { throw logData->removed.getError(); } if (recovering) { logData->unrecoveredBefore = req.startVersion; logData->recoveredAt = req.recoverAt; logData->knownCommittedVersion = req.startVersion - 1; logData->persistentDataVersion = logData->unrecoveredBefore - 1; logData->persistentDataDurableVersion = logData->unrecoveredBefore - 1; logData->queueCommittedVersion.set(logData->unrecoveredBefore - 1); logData->version.set(logData->unrecoveredBefore - 1); logData->unpoppedRecoveredTags = req.allTags.size(); wait(initPersistentState(self, logData) || logData->removed); TraceEvent("TLogRecover", self->dbgid) .detail("LogId", logData->logId) .detail("At", req.recoverAt) .detail("Known", req.knownCommittedVersion) .detail("Unrecovered", logData->unrecoveredBefore) .detail("Tags", describe(req.recoverTags)) .detail("Locality", req.locality) .detail("LogRouterTags", logData->logRouterTags); if (logData->recoveryComplete.isSet()) { throw worker_removed(); } updater = updateLogSystem(self, logData, req.recoverFrom, logData->logSystem); logData->initialized = true; self->newLogData.trigger(); if ((req.isPrimary || req.recoverFrom.logRouterTags == 0) && !logData->stopped && logData->unrecoveredBefore <= req.recoverAt) { if (req.recoverFrom.logRouterTags > 0 && req.locality != tagLocalitySatellite) { logData->logRouterPopToVersion = req.recoverAt; std::vector tags; tags.push_back(logData->remoteTag); wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, req.recoverAt, true) || logData->removed); } else if (!req.recoverTags.empty()) { ASSERT(logData->unrecoveredBefore > req.knownCommittedVersion); wait(pullAsyncData( self, logData, req.recoverTags, req.knownCommittedVersion + 1, req.recoverAt, false) || logData->removed); } pulledRecoveryVersions = true; logData->knownCommittedVersion = req.recoverAt; } if ((req.isPrimary || req.recoverFrom.logRouterTags == 0) && logData->version.get() < req.recoverAt && !logData->stopped) { // Log the changes to the persistent queue, to be committed by commitQueue() TLogQueueEntryRef qe; qe.version = req.recoverAt; qe.knownCommittedVersion = logData->knownCommittedVersion; qe.messages = StringRef(); qe.id = logData->logId; self->persistentQueue->push(qe, logData); self->diskQueueCommitBytes += qe.expectedSize(); if (self->diskQueueCommitBytes > SERVER_KNOBS->MAX_QUEUE_COMMIT_BYTES) { self->largeDiskQueueCommitBytes.set(true); } logData->version.set(req.recoverAt); } if (logData->recoveryComplete.isSet()) { throw worker_removed(); } logData->addActor.send(respondToRecovered(recruited, logData->recoveryComplete)); } else { // Brand new tlog, initialization has already been done by caller wait(initPersistentState(self, logData) || logData->removed); if (logData->recoveryComplete.isSet()) { throw worker_removed(); } logData->initialized = true; self->newLogData.trigger(); logData->recoveryComplete.send(Void()); } wait(logData->committingQueue.getFuture() || logData->removed); } catch (Error& e) { req.reply.sendError(recruitment_failed()); if (e.code() != error_code_worker_removed) { throw; } wait(delay(0.0)); // if multiple recruitment requests were already in the promise stream make sure they are all // started before any are removed removeLog(self, logData); return Void(); } req.reply.send(recruited); TraceEvent("TLogReady", logData->logId) .detail("AllTags", describe(req.allTags)) .detail("Locality", logData->locality); updater = Void(); wait(tLogCore(self, logData, recruited, pulledRecoveryVersions)); return Void(); } ACTOR Future startSpillingInTenSeconds(TLogData* self, UID tlogId, Reference> activeSharedTLog) { wait(delay(10)); if (activeSharedTLog->get() != tlogId) { // TODO: This should fully spill, but currently doing so will cause us to no longer update poppedVersion // and QuietDatabase will hang thinking our TLog is behind. self->targetVolatileBytes = SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT * 2; } return Void(); } // New tLog (if !recoverFrom.size()) or restore from network ACTOR Future tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference const> db, LocalityData locality, PromiseStream tlogRequests, UID tlogId, UID workerID, bool restoreFromDisk, Promise oldLog, Promise recovered, std::string folder, Reference> degraded, Reference> activeSharedTLog) { state TLogData self(tlogId, workerID, persistentData, persistentQueue, db, degraded, folder); state Future error = actorCollection(self.sharedActors.getFuture()); TraceEvent("SharedTlog", tlogId).detail("Version", "6.0"); try { if (restoreFromDisk) { wait(restorePersistentState(&self, locality, oldLog, recovered, tlogRequests)); } else { wait(ioTimeoutError(checkEmptyQueue(&self) && checkRecovered(&self), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION)); } // Disk errors need a chance to kill this actor. wait(delay(0.000001)); if (recovered.canBeSet()) recovered.send(Void()); self.sharedActors.send(commitQueue(&self)); self.sharedActors.send(updateStorageLoop(&self)); state Future activeSharedChange = Void(); loop { choose { when(InitializeTLogRequest req = waitNext(tlogRequests.getFuture())) { if (!self.tlogCache.exists(req.recruitmentID)) { self.tlogCache.set(req.recruitmentID, req.reply.getFuture()); self.sharedActors.send( self.tlogCache.removeOnReady(req.recruitmentID, tLogStart(&self, req, locality))); } else { forwardPromise(req.reply, self.tlogCache.get(req.recruitmentID)); } } when(wait(error)) { throw internal_error(); } when(wait(activeSharedChange)) { if (activeSharedTLog->get() == tlogId) { self.targetVolatileBytes = SERVER_KNOBS->TLOG_SPILL_THRESHOLD; } else { self.sharedActors.send(startSpillingInTenSeconds(&self, tlogId, activeSharedTLog)); } activeSharedChange = activeSharedTLog->onChange(); } } } } catch (Error& e) { self.terminated.send(Void()); TraceEvent("TLogError", tlogId).errorUnsuppressed(e); if (recovered.canBeSet()) recovered.send(Void()); while (!tlogRequests.isEmpty()) { tlogRequests.getFuture().pop().reply.sendError(recruitment_failed()); } for (auto& it : self.id_data) { if (!it.second->recoveryComplete.isSet()) { it.second->recoveryComplete.sendError(end_of_stream()); } } if (tlogTerminated(&self, persistentData, self.persistentQueue, e)) { return Void(); } else { throw; } } } // UNIT TESTS struct DequeAllocatorStats { static int64_t allocatedBytes; }; int64_t DequeAllocatorStats::allocatedBytes = 0; template struct DequeAllocator : std::allocator { template struct rebind { typedef DequeAllocator other; }; DequeAllocator() {} template DequeAllocator(DequeAllocator const& u) : std::allocator(u) {} T* allocate(std::size_t n, std::allocator::const_pointer hint = 0) { DequeAllocatorStats::allocatedBytes += n * sizeof(T); // fprintf(stderr, "Allocating %lld objects for %lld bytes (total allocated: %lld)\n", n, n * sizeof(T), // DequeAllocatorStats::allocatedBytes); return std::allocator::allocate(n, hint); } void deallocate(T* p, std::size_t n) { DequeAllocatorStats::allocatedBytes -= n * sizeof(T); // fprintf(stderr, "Deallocating %lld objects for %lld bytes (total allocated: %lld)\n", n, n * sizeof(T), // DequeAllocatorStats::allocatedBytes); return std::allocator::deallocate(p, n); } }; TEST_CASE("Lfdbserver/tlogserver/VersionMessagesOverheadFactor") { typedef std::pair TestType; // type used by versionMessages for (int i = 1; i < 9; ++i) { for (int j = 0; j < 20; ++j) { DequeAllocatorStats::allocatedBytes = 0; DequeAllocator allocator; std::deque> d(allocator); int numElements = deterministicRandom()->randomInt(pow(10, i - 1), pow(10, i)); for (int k = 0; k < numElements; ++k) { d.push_back(TestType()); } int removedElements = 0; // deterministicRandom()->randomInt(0, numElements); // FIXME: the overhead factor // does not accurately account for removal! for (int k = 0; k < removedElements; ++k) { d.pop_front(); } int64_t dequeBytes = DequeAllocatorStats::allocatedBytes + sizeof(std::deque); int64_t insertedBytes = (numElements - removedElements) * sizeof(TestType); double overheadFactor = std::max(insertedBytes, dequeBytes - 10000) / insertedBytes; // We subtract 10K here as an estimated upper bound for the fixed cost of an std::deque // fprintf(stderr, "%d elements (%d inserted, %d removed):\n", numElements-removedElements, numElements, // removedElements); fprintf(stderr, "Allocated %lld bytes to store %lld bytes (%lf overhead factor)\n", // dequeBytes, insertedBytes, overheadFactor); ASSERT(overheadFactor * 1024 <= SERVER_KNOBS->VERSION_MESSAGES_OVERHEAD_FACTOR_1024THS); } } return Void(); } } // namespace oldTLog_6_0