mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-14 09:58:50 +08:00
Changes: LogSystem.h, LogSystemPeekCursor.actor.cpp: Add APIs to find the ID of the tLog from which an SS has fetched the latest set of versions. storageserver.actor.cpp: Capture the number of latest set of versions fetched, the time (in seconds) in which those versions were fetched, and the tLog from which they were fetched. Add this information to a TraceLogEvent. Capture how many versions an SS has fetched in the
1438 lines
48 KiB
C++
1438 lines
48 KiB
C++
/*
|
|
* LogSystemPeekCursor.actor.cpp
|
|
*
|
|
* This source file is part of the FoundationDB open source project
|
|
*
|
|
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
#include "fdbserver/LogSystem.h"
|
|
#include "fdbrpc/FailureMonitor.h"
|
|
#include "fdbserver/Knobs.h"
|
|
#include "fdbserver/MutationTracking.h"
|
|
#include "fdbrpc/ReplicationUtils.h"
|
|
#include "flow/actorcompiler.h" // has to be last include
|
|
|
|
ILogSystem::ServerPeekCursor::ServerPeekCursor(Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf,
|
|
Tag tag,
|
|
Version begin,
|
|
Version end,
|
|
bool returnIfBlocked,
|
|
bool parallelGetMore)
|
|
: interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false),
|
|
rd(results.arena, results.messages, Unversioned()), randomID(deterministicRandom()->randomUniqueID()),
|
|
poppedVersion(0), returnIfBlocked(returnIfBlocked), sequence(0), onlySpilled(false),
|
|
parallelGetMore(parallelGetMore), lastReset(0), slowReplies(0), fastReplies(0), unknownReplies(0),
|
|
resetCheck(Void()) {
|
|
this->results.maxKnownVersion = 0;
|
|
this->results.minKnownCommittedVersion = 0;
|
|
//TraceEvent("SPC_Starting", randomID).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).backtrace();
|
|
}
|
|
|
|
ILogSystem::ServerPeekCursor::ServerPeekCursor(TLogPeekReply const& results,
|
|
LogMessageVersion const& messageVersion,
|
|
LogMessageVersion const& end,
|
|
TagsAndMessage const& message,
|
|
bool hasMsg,
|
|
Version poppedVersion,
|
|
Tag tag)
|
|
: results(results), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(messageVersion),
|
|
end(end), messageAndTags(message), hasMsg(hasMsg), randomID(deterministicRandom()->randomUniqueID()),
|
|
poppedVersion(poppedVersion), returnIfBlocked(false), sequence(0), onlySpilled(false), parallelGetMore(false),
|
|
lastReset(0), slowReplies(0), fastReplies(0), unknownReplies(0), resetCheck(Void()) {
|
|
//TraceEvent("SPC_Clone", randomID);
|
|
this->results.maxKnownVersion = 0;
|
|
this->results.minKnownCommittedVersion = 0;
|
|
if (hasMsg)
|
|
nextMessage();
|
|
|
|
advanceTo(messageVersion);
|
|
}
|
|
|
|
Reference<ILogSystem::IPeekCursor> ILogSystem::ServerPeekCursor::cloneNoMore() {
|
|
return makeReference<ILogSystem::ServerPeekCursor>(
|
|
results, messageVersion, end, messageAndTags, hasMsg, poppedVersion, tag);
|
|
}
|
|
|
|
void ILogSystem::ServerPeekCursor::setProtocolVersion(ProtocolVersion version) {
|
|
rd.setProtocolVersion(version);
|
|
}
|
|
|
|
Arena& ILogSystem::ServerPeekCursor::arena() {
|
|
return results.arena;
|
|
}
|
|
|
|
ArenaReader* ILogSystem::ServerPeekCursor::reader() {
|
|
return &rd;
|
|
}
|
|
|
|
bool ILogSystem::ServerPeekCursor::hasMessage() const {
|
|
//TraceEvent("SPC_HasMessage", randomID).detail("HasMsg", hasMsg);
|
|
return hasMsg;
|
|
}
|
|
|
|
void ILogSystem::ServerPeekCursor::nextMessage() {
|
|
//TraceEvent("SPC_NextMessage", randomID).detail("MessageVersion", messageVersion.toString());
|
|
ASSERT(hasMsg);
|
|
if (rd.empty()) {
|
|
messageVersion.reset(std::min(results.end, end.version));
|
|
hasMsg = false;
|
|
return;
|
|
}
|
|
if (*(int32_t*)rd.peekBytes(4) == VERSION_HEADER) {
|
|
// A version
|
|
int32_t dummy;
|
|
Version ver;
|
|
rd >> dummy >> ver;
|
|
|
|
//TraceEvent("SPC_ProcessSeq", randomID).detail("MessageVersion", messageVersion.toString()).detail("Ver", ver).detail("Tag", tag.toString());
|
|
// ASSERT( ver >= messageVersion.version );
|
|
|
|
messageVersion.reset(ver);
|
|
|
|
if (messageVersion >= end) {
|
|
messageVersion = end;
|
|
hasMsg = false;
|
|
return;
|
|
}
|
|
ASSERT(!rd.empty());
|
|
}
|
|
|
|
messageAndTags.loadFromArena(&rd, &messageVersion.sub);
|
|
DEBUG_TAGS_AND_MESSAGE("ServerPeekCursor", messageVersion.version, messageAndTags.getRawMessage())
|
|
.detail("CursorID", this->randomID);
|
|
// Rewind and consume the header so that reader() starts from the message.
|
|
rd.rewind();
|
|
rd.readBytes(messageAndTags.getHeaderSize());
|
|
hasMsg = true;
|
|
//TraceEvent("SPC_NextMessageB", randomID).detail("MessageVersion", messageVersion.toString());
|
|
}
|
|
|
|
StringRef ILogSystem::ServerPeekCursor::getMessage() {
|
|
//TraceEvent("SPC_GetMessage", randomID);
|
|
StringRef message = messageAndTags.getMessageWithoutTags();
|
|
rd.readBytes(message.size()); // Consumes the message.
|
|
return message;
|
|
}
|
|
|
|
StringRef ILogSystem::ServerPeekCursor::getMessageWithTags() {
|
|
StringRef rawMessage = messageAndTags.getRawMessage();
|
|
rd.readBytes(rawMessage.size() - messageAndTags.getHeaderSize()); // Consumes the message.
|
|
return rawMessage;
|
|
}
|
|
|
|
VectorRef<Tag> ILogSystem::ServerPeekCursor::getTags() const {
|
|
return messageAndTags.tags;
|
|
}
|
|
|
|
void ILogSystem::ServerPeekCursor::advanceTo(LogMessageVersion n) {
|
|
//TraceEvent("SPC_AdvanceTo", randomID).detail("N", n.toString());
|
|
while (messageVersion < n && hasMessage()) {
|
|
getMessage();
|
|
nextMessage();
|
|
}
|
|
|
|
if (hasMessage())
|
|
return;
|
|
|
|
// if( more.isValid() && !more.isReady() ) more.cancel();
|
|
|
|
if (messageVersion < n) {
|
|
messageVersion = n;
|
|
}
|
|
}
|
|
|
|
ACTOR Future<Void> resetChecker(ILogSystem::ServerPeekCursor* self, NetworkAddress addr) {
|
|
self->slowReplies = 0;
|
|
self->unknownReplies = 0;
|
|
self->fastReplies = 0;
|
|
wait(delay(SERVER_KNOBS->PEEK_STATS_INTERVAL));
|
|
TraceEvent("SlowPeekStats", self->randomID)
|
|
.detail("PeerAddress", addr)
|
|
.detail("SlowReplies", self->slowReplies)
|
|
.detail("FastReplies", self->fastReplies)
|
|
.detail("UnknownReplies", self->unknownReplies);
|
|
|
|
if (self->slowReplies >= SERVER_KNOBS->PEEK_STATS_SLOW_AMOUNT &&
|
|
self->slowReplies / double(self->slowReplies + self->fastReplies) >= SERVER_KNOBS->PEEK_STATS_SLOW_RATIO) {
|
|
|
|
TraceEvent("ConnectionResetSlowPeek", self->randomID)
|
|
.detail("PeerAddress", addr)
|
|
.detail("SlowReplies", self->slowReplies)
|
|
.detail("FastReplies", self->fastReplies)
|
|
.detail("UnknownReplies", self->unknownReplies);
|
|
FlowTransport::transport().resetConnection(addr);
|
|
self->lastReset = now();
|
|
}
|
|
return Void();
|
|
}
|
|
|
|
ACTOR Future<TLogPeekReply> recordRequestMetrics(ILogSystem::ServerPeekCursor* self,
|
|
NetworkAddress addr,
|
|
Future<TLogPeekReply> in) {
|
|
try {
|
|
state double startTime = now();
|
|
TLogPeekReply t = wait(in);
|
|
if (now() - self->lastReset > SERVER_KNOBS->PEEK_RESET_INTERVAL) {
|
|
if (now() - startTime > SERVER_KNOBS->PEEK_MAX_LATENCY) {
|
|
if (t.messages.size() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES || SERVER_KNOBS->PEEK_COUNT_SMALL_MESSAGES) {
|
|
if (self->resetCheck.isReady()) {
|
|
self->resetCheck = resetChecker(self, addr);
|
|
}
|
|
self->slowReplies++;
|
|
} else {
|
|
self->unknownReplies++;
|
|
}
|
|
} else {
|
|
self->fastReplies++;
|
|
}
|
|
}
|
|
return t;
|
|
} catch (Error& e) {
|
|
if (e.code() != error_code_broken_promise)
|
|
throw;
|
|
wait(Never()); // never return
|
|
throw internal_error(); // does not happen
|
|
}
|
|
}
|
|
|
|
ACTOR Future<Void> serverPeekParallelGetMore(ILogSystem::ServerPeekCursor* self, TaskPriority taskID) {
|
|
if (!self->interf || self->messageVersion >= self->end) {
|
|
if (self->hasMessage())
|
|
return Void();
|
|
wait(Future<Void>(Never()));
|
|
throw internal_error();
|
|
}
|
|
|
|
if (!self->interfaceChanged.isValid()) {
|
|
self->interfaceChanged = self->interf->onChange();
|
|
}
|
|
|
|
loop {
|
|
state Version expectedBegin = self->messageVersion.version;
|
|
try {
|
|
if (self->parallelGetMore || self->onlySpilled) {
|
|
while (self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS &&
|
|
self->interf->get().present()) {
|
|
self->futureResults.push_back(recordRequestMetrics(
|
|
self,
|
|
self->interf->get().interf().peekMessages.getEndpoint().getPrimaryAddress(),
|
|
self->interf->get().interf().peekMessages.getReply(
|
|
TLogPeekRequest(self->messageVersion.version,
|
|
self->tag,
|
|
self->returnIfBlocked,
|
|
self->onlySpilled,
|
|
std::make_pair(self->randomID, self->sequence++)),
|
|
taskID)));
|
|
}
|
|
if (self->sequence == std::numeric_limits<decltype(self->sequence)>::max()) {
|
|
throw operation_obsolete();
|
|
}
|
|
} else if (self->futureResults.size() == 0) {
|
|
return Void();
|
|
}
|
|
|
|
if (self->hasMessage())
|
|
return Void();
|
|
|
|
choose {
|
|
when(TLogPeekReply res = wait(self->interf->get().present() ? self->futureResults.front() : Never())) {
|
|
if (res.begin.get() != expectedBegin) {
|
|
throw operation_obsolete();
|
|
}
|
|
expectedBegin = res.end;
|
|
self->futureResults.pop_front();
|
|
self->results = res;
|
|
self->onlySpilled = res.onlySpilled;
|
|
if (res.popped.present())
|
|
self->poppedVersion =
|
|
std::min(std::max(self->poppedVersion, res.popped.get()), self->end.version);
|
|
self->rd = ArenaReader(self->results.arena, self->results.messages, Unversioned());
|
|
LogMessageVersion skipSeq = self->messageVersion;
|
|
self->hasMsg = true;
|
|
self->nextMessage();
|
|
self->advanceTo(skipSeq);
|
|
//TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0);
|
|
return Void();
|
|
}
|
|
when(wait(self->interfaceChanged)) {
|
|
self->interfaceChanged = self->interf->onChange();
|
|
self->randomID = deterministicRandom()->randomUniqueID();
|
|
self->sequence = 0;
|
|
self->onlySpilled = false;
|
|
self->futureResults.clear();
|
|
}
|
|
}
|
|
} catch (Error& e) {
|
|
if (e.code() == error_code_end_of_stream) {
|
|
self->end.reset(self->messageVersion.version);
|
|
return Void();
|
|
} else if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
|
|
TraceEvent("PeekCursorTimedOut", self->randomID).error(e);
|
|
// We *should* never get timed_out(), as it means the TLog got stuck while handling a parallel peek,
|
|
// and thus we've likely just wasted 10min.
|
|
// timed_out() is sent by cleanupPeekTrackers as value PEEK_TRACKER_EXPIRATION_TIME
|
|
ASSERT_WE_THINK(e.code() == error_code_operation_obsolete ||
|
|
SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME < 10);
|
|
self->interfaceChanged = self->interf->onChange();
|
|
self->randomID = deterministicRandom()->randomUniqueID();
|
|
self->sequence = 0;
|
|
self->futureResults.clear();
|
|
} else {
|
|
throw e;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
ACTOR Future<Void> serverPeekGetMore(ILogSystem::ServerPeekCursor* self, TaskPriority taskID) {
|
|
if (!self->interf || self->messageVersion >= self->end) {
|
|
wait(Future<Void>(Never()));
|
|
throw internal_error();
|
|
}
|
|
try {
|
|
loop {
|
|
choose {
|
|
when(TLogPeekReply res =
|
|
wait(self->interf->get().present()
|
|
? brokenPromiseToNever(self->interf->get().interf().peekMessages.getReply(
|
|
TLogPeekRequest(self->messageVersion.version,
|
|
self->tag,
|
|
self->returnIfBlocked,
|
|
self->onlySpilled),
|
|
taskID))
|
|
: Never())) {
|
|
self->results = res;
|
|
self->onlySpilled = res.onlySpilled;
|
|
if (res.popped.present())
|
|
self->poppedVersion =
|
|
std::min(std::max(self->poppedVersion, res.popped.get()), self->end.version);
|
|
self->rd = ArenaReader(self->results.arena, self->results.messages, Unversioned());
|
|
LogMessageVersion skipSeq = self->messageVersion;
|
|
self->hasMsg = true;
|
|
self->nextMessage();
|
|
self->advanceTo(skipSeq);
|
|
//TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0);
|
|
return Void();
|
|
}
|
|
when(wait(self->interf->onChange())) { self->onlySpilled = false; }
|
|
}
|
|
}
|
|
} catch (Error& e) {
|
|
if (e.code() == error_code_end_of_stream) {
|
|
self->end.reset(self->messageVersion.version);
|
|
return Void();
|
|
}
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
Future<Void> ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) {
|
|
//TraceEvent("SPC_GetMore", randomID).detail("HasMessage", hasMessage()).detail("More", !more.isValid() || more.isReady()).detail("MessageVersion", messageVersion.toString()).detail("End", end.toString());
|
|
if (hasMessage() && !parallelGetMore)
|
|
return Void();
|
|
if (!more.isValid() || more.isReady()) {
|
|
if (parallelGetMore || onlySpilled || futureResults.size()) {
|
|
more = serverPeekParallelGetMore(this, taskID);
|
|
} else {
|
|
more = serverPeekGetMore(this, taskID);
|
|
}
|
|
}
|
|
return more;
|
|
}
|
|
|
|
ACTOR Future<Void> serverPeekOnFailed(ILogSystem::ServerPeekCursor* self) {
|
|
loop {
|
|
choose {
|
|
when(wait(self->interf->get().present()
|
|
? IFailureMonitor::failureMonitor().onStateEqual(
|
|
self->interf->get().interf().peekMessages.getEndpoint(), FailureStatus())
|
|
: Never())) {
|
|
return Void();
|
|
}
|
|
when(wait(self->interf->onChange())) {}
|
|
}
|
|
}
|
|
}
|
|
|
|
Future<Void> ILogSystem::ServerPeekCursor::onFailed() {
|
|
return serverPeekOnFailed(this);
|
|
}
|
|
|
|
bool ILogSystem::ServerPeekCursor::isActive() const {
|
|
if (!interf->get().present())
|
|
return false;
|
|
if (messageVersion >= end)
|
|
return false;
|
|
return IFailureMonitor::failureMonitor().getState(interf->get().interf().peekMessages.getEndpoint()).isAvailable();
|
|
}
|
|
|
|
bool ILogSystem::ServerPeekCursor::isExhausted() const {
|
|
return messageVersion >= end;
|
|
}
|
|
|
|
const LogMessageVersion& ILogSystem::ServerPeekCursor::version() const {
|
|
return messageVersion;
|
|
} // Call only after nextMessage(). The sequence of the current message, or results.end if nextMessage() has returned
|
|
// false.
|
|
|
|
Version ILogSystem::ServerPeekCursor::getMinKnownCommittedVersion() const {
|
|
return results.minKnownCommittedVersion;
|
|
}
|
|
|
|
Optional<UID> ILogSystem::ServerPeekCursor::getPrimaryPeekLocation() const {
|
|
if (interf->get().present()) {
|
|
return interf->get().id();
|
|
}
|
|
return Optional<UID>();
|
|
}
|
|
|
|
Optional<UID> ILogSystem::ServerPeekCursor::getCurrentPeekLocation() const {
|
|
return ILogSystem::ServerPeekCursor::getPrimaryPeekLocation();
|
|
}
|
|
|
|
Version ILogSystem::ServerPeekCursor::popped() const {
|
|
return poppedVersion;
|
|
}
|
|
|
|
ILogSystem::MergedPeekCursor::MergedPeekCursor(vector<Reference<ILogSystem::IPeekCursor>> const& serverCursors,
|
|
Version begin)
|
|
: serverCursors(serverCursors), bestServer(-1), readQuorum(serverCursors.size()), tag(invalidTag), currentCursor(0),
|
|
hasNextMessage(false), messageVersion(begin), randomID(deterministicRandom()->randomUniqueID()),
|
|
tLogReplicationFactor(0) {
|
|
sortedVersions.resize(serverCursors.size());
|
|
}
|
|
|
|
ILogSystem::MergedPeekCursor::MergedPeekCursor(
|
|
std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers,
|
|
int bestServer,
|
|
int readQuorum,
|
|
Tag tag,
|
|
Version begin,
|
|
Version end,
|
|
bool parallelGetMore,
|
|
std::vector<LocalityData> const& tLogLocalities,
|
|
Reference<IReplicationPolicy> const tLogPolicy,
|
|
int tLogReplicationFactor)
|
|
: bestServer(bestServer), readQuorum(readQuorum), tag(tag), currentCursor(0), hasNextMessage(false),
|
|
messageVersion(begin), randomID(deterministicRandom()->randomUniqueID()),
|
|
tLogReplicationFactor(tLogReplicationFactor) {
|
|
if (tLogPolicy) {
|
|
logSet = makeReference<LogSet>();
|
|
logSet->tLogPolicy = tLogPolicy;
|
|
logSet->tLogLocalities = tLogLocalities;
|
|
filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities);
|
|
logSet->updateLocalitySet(logSet->tLogLocalities);
|
|
}
|
|
|
|
for (int i = 0; i < logServers.size(); i++) {
|
|
auto cursor = makeReference<ILogSystem::ServerPeekCursor>(
|
|
logServers[i], tag, begin, end, bestServer >= 0, parallelGetMore);
|
|
//TraceEvent("MPC_Starting", randomID).detail("Cursor", cursor->randomID).detail("End", end);
|
|
serverCursors.push_back(cursor);
|
|
}
|
|
sortedVersions.resize(serverCursors.size());
|
|
}
|
|
|
|
ILogSystem::MergedPeekCursor::MergedPeekCursor(vector<Reference<ILogSystem::IPeekCursor>> const& serverCursors,
|
|
LogMessageVersion const& messageVersion,
|
|
int bestServer,
|
|
int readQuorum,
|
|
Optional<LogMessageVersion> nextVersion,
|
|
Reference<LogSet> logSet,
|
|
int tLogReplicationFactor)
|
|
: serverCursors(serverCursors), bestServer(bestServer), readQuorum(readQuorum), currentCursor(0),
|
|
hasNextMessage(false), messageVersion(messageVersion), nextVersion(nextVersion), logSet(logSet),
|
|
randomID(deterministicRandom()->randomUniqueID()), tLogReplicationFactor(tLogReplicationFactor) {
|
|
sortedVersions.resize(serverCursors.size());
|
|
calcHasMessage();
|
|
}
|
|
|
|
Reference<ILogSystem::IPeekCursor> ILogSystem::MergedPeekCursor::cloneNoMore() {
|
|
vector<Reference<ILogSystem::IPeekCursor>> cursors;
|
|
for (auto it : serverCursors) {
|
|
cursors.push_back(it->cloneNoMore());
|
|
}
|
|
return makeReference<ILogSystem::MergedPeekCursor>(
|
|
cursors, messageVersion, bestServer, readQuorum, nextVersion, logSet, tLogReplicationFactor);
|
|
}
|
|
|
|
void ILogSystem::MergedPeekCursor::setProtocolVersion(ProtocolVersion version) {
|
|
for (auto it : serverCursors)
|
|
if (it->hasMessage())
|
|
it->setProtocolVersion(version);
|
|
}
|
|
|
|
Arena& ILogSystem::MergedPeekCursor::arena() {
|
|
return serverCursors[currentCursor]->arena();
|
|
}
|
|
|
|
ArenaReader* ILogSystem::MergedPeekCursor::reader() {
|
|
return serverCursors[currentCursor]->reader();
|
|
}
|
|
|
|
void ILogSystem::MergedPeekCursor::calcHasMessage() {
|
|
if (bestServer >= 0) {
|
|
if (nextVersion.present())
|
|
serverCursors[bestServer]->advanceTo(nextVersion.get());
|
|
if (serverCursors[bestServer]->hasMessage()) {
|
|
messageVersion = serverCursors[bestServer]->version();
|
|
currentCursor = bestServer;
|
|
hasNextMessage = true;
|
|
|
|
for (auto& c : serverCursors)
|
|
c->advanceTo(messageVersion);
|
|
|
|
return;
|
|
}
|
|
|
|
auto bestVersion = serverCursors[bestServer]->version();
|
|
for (auto& c : serverCursors)
|
|
c->advanceTo(bestVersion);
|
|
}
|
|
|
|
hasNextMessage = false;
|
|
updateMessage(false);
|
|
|
|
if (!hasNextMessage && logSet) {
|
|
updateMessage(true);
|
|
}
|
|
}
|
|
|
|
void ILogSystem::MergedPeekCursor::updateMessage(bool usePolicy) {
|
|
loop {
|
|
bool advancedPast = false;
|
|
sortedVersions.clear();
|
|
for (int i = 0; i < serverCursors.size(); i++) {
|
|
auto& serverCursor = serverCursors[i];
|
|
if (nextVersion.present())
|
|
serverCursor->advanceTo(nextVersion.get());
|
|
sortedVersions.push_back(std::pair<LogMessageVersion, int>(serverCursor->version(), i));
|
|
}
|
|
|
|
if (usePolicy) {
|
|
ASSERT(logSet->tLogPolicy);
|
|
std::sort(sortedVersions.begin(), sortedVersions.end());
|
|
|
|
locations.clear();
|
|
for (auto sortedVersion : sortedVersions) {
|
|
locations.push_back(logSet->logEntryArray[sortedVersion.second]);
|
|
if (locations.size() >= tLogReplicationFactor && logSet->satisfiesPolicy(locations)) {
|
|
messageVersion = sortedVersion.first;
|
|
break;
|
|
}
|
|
}
|
|
} else {
|
|
std::nth_element(sortedVersions.begin(), sortedVersions.end() - readQuorum, sortedVersions.end());
|
|
messageVersion = sortedVersions[sortedVersions.size() - readQuorum].first;
|
|
}
|
|
|
|
for (int i = 0; i < serverCursors.size(); i++) {
|
|
auto& c = serverCursors[i];
|
|
auto start = c->version();
|
|
c->advanceTo(messageVersion);
|
|
if (start <= messageVersion && messageVersion < c->version()) {
|
|
advancedPast = true;
|
|
TEST(true); // Merge peek cursor advanced past desired sequence
|
|
}
|
|
}
|
|
|
|
if (!advancedPast)
|
|
break;
|
|
}
|
|
|
|
for (int i = 0; i < serverCursors.size(); i++) {
|
|
auto& c = serverCursors[i];
|
|
ASSERT_WE_THINK(!c->hasMessage() ||
|
|
c->version() >= messageVersion); // Seems like the loop above makes this unconditionally true
|
|
if (c->version() == messageVersion && c->hasMessage()) {
|
|
hasNextMessage = true;
|
|
currentCursor = i;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
bool ILogSystem::MergedPeekCursor::hasMessage() const {
|
|
return hasNextMessage;
|
|
}
|
|
|
|
void ILogSystem::MergedPeekCursor::nextMessage() {
|
|
nextVersion = version();
|
|
nextVersion.get().sub++;
|
|
serverCursors[currentCursor]->nextMessage();
|
|
calcHasMessage();
|
|
ASSERT(hasMessage() || !version().sub);
|
|
}
|
|
|
|
StringRef ILogSystem::MergedPeekCursor::getMessage() {
|
|
return serverCursors[currentCursor]->getMessage();
|
|
}
|
|
|
|
StringRef ILogSystem::MergedPeekCursor::getMessageWithTags() {
|
|
return serverCursors[currentCursor]->getMessageWithTags();
|
|
}
|
|
|
|
VectorRef<Tag> ILogSystem::MergedPeekCursor::getTags() const {
|
|
return serverCursors[currentCursor]->getTags();
|
|
}
|
|
|
|
void ILogSystem::MergedPeekCursor::advanceTo(LogMessageVersion n) {
|
|
bool canChange = false;
|
|
for (auto& c : serverCursors) {
|
|
if (c->version() < n) {
|
|
canChange = true;
|
|
c->advanceTo(n);
|
|
}
|
|
}
|
|
if (canChange) {
|
|
calcHasMessage();
|
|
}
|
|
}
|
|
|
|
ACTOR Future<Void> mergedPeekGetMore(ILogSystem::MergedPeekCursor* self,
|
|
LogMessageVersion startVersion,
|
|
TaskPriority taskID) {
|
|
loop {
|
|
//TraceEvent("MPC_GetMoreA", self->randomID).detail("Start", startVersion.toString());
|
|
if (self->bestServer >= 0 && self->serverCursors[self->bestServer]->isActive()) {
|
|
ASSERT(!self->serverCursors[self->bestServer]->hasMessage());
|
|
wait(self->serverCursors[self->bestServer]->getMore(taskID) ||
|
|
self->serverCursors[self->bestServer]->onFailed());
|
|
} else {
|
|
vector<Future<Void>> q;
|
|
for (auto& c : self->serverCursors)
|
|
if (!c->hasMessage())
|
|
q.push_back(c->getMore(taskID));
|
|
wait(quorum(q, 1));
|
|
}
|
|
self->calcHasMessage();
|
|
//TraceEvent("MPC_GetMoreB", self->randomID).detail("HasMessage", self->hasMessage()).detail("Start", startVersion.toString()).detail("Seq", self->version().toString());
|
|
if (self->hasMessage() || self->version() > startVersion) {
|
|
return Void();
|
|
}
|
|
}
|
|
}
|
|
|
|
Future<Void> ILogSystem::MergedPeekCursor::getMore(TaskPriority taskID) {
|
|
if (more.isValid() && !more.isReady()) {
|
|
return more;
|
|
}
|
|
|
|
if (!serverCursors.size())
|
|
return Never();
|
|
|
|
auto startVersion = version();
|
|
calcHasMessage();
|
|
if (hasMessage())
|
|
return Void();
|
|
if (nextVersion.present())
|
|
advanceTo(nextVersion.get());
|
|
ASSERT(!hasMessage());
|
|
if (version() > startVersion)
|
|
return Void();
|
|
|
|
more = mergedPeekGetMore(this, startVersion, taskID);
|
|
return more;
|
|
}
|
|
|
|
Future<Void> ILogSystem::MergedPeekCursor::onFailed() {
|
|
ASSERT(false);
|
|
return Never();
|
|
}
|
|
|
|
bool ILogSystem::MergedPeekCursor::isActive() const {
|
|
ASSERT(false);
|
|
return false;
|
|
}
|
|
|
|
bool ILogSystem::MergedPeekCursor::isExhausted() const {
|
|
return serverCursors[currentCursor]->isExhausted();
|
|
}
|
|
|
|
const LogMessageVersion& ILogSystem::MergedPeekCursor::version() const {
|
|
return messageVersion;
|
|
}
|
|
|
|
Version ILogSystem::MergedPeekCursor::getMinKnownCommittedVersion() const {
|
|
return serverCursors[currentCursor]->getMinKnownCommittedVersion();
|
|
}
|
|
|
|
Optional<UID> ILogSystem::MergedPeekCursor::getPrimaryPeekLocation() const {
|
|
if (bestServer >= 0) {
|
|
return serverCursors[bestServer]->getPrimaryPeekLocation();
|
|
}
|
|
return Optional<UID>();
|
|
}
|
|
|
|
Optional<UID> ILogSystem::MergedPeekCursor::getCurrentPeekLocation() const {
|
|
if (currentCursor >= 0) {
|
|
return serverCursors[currentCursor]->getPrimaryPeekLocation();
|
|
}
|
|
return Optional<UID>();
|
|
}
|
|
|
|
Version ILogSystem::MergedPeekCursor::popped() const {
|
|
Version poppedVersion = 0;
|
|
for (auto& c : serverCursors)
|
|
poppedVersion = std::max(poppedVersion, c->popped());
|
|
return poppedVersion;
|
|
}
|
|
|
|
ILogSystem::SetPeekCursor::SetPeekCursor(std::vector<Reference<LogSet>> const& logSets,
|
|
int bestSet,
|
|
int bestServer,
|
|
Tag tag,
|
|
Version begin,
|
|
Version end,
|
|
bool parallelGetMore)
|
|
: logSets(logSets), bestSet(bestSet), bestServer(bestServer), tag(tag), currentCursor(0), currentSet(bestSet),
|
|
hasNextMessage(false), messageVersion(begin), useBestSet(true), randomID(deterministicRandom()->randomUniqueID()) {
|
|
serverCursors.resize(logSets.size());
|
|
int maxServers = 0;
|
|
for (int i = 0; i < logSets.size(); i++) {
|
|
for (int j = 0; j < logSets[i]->logServers.size(); j++) {
|
|
auto cursor = makeReference<ILogSystem::ServerPeekCursor>(
|
|
logSets[i]->logServers[j], tag, begin, end, true, parallelGetMore);
|
|
serverCursors[i].push_back(cursor);
|
|
}
|
|
maxServers = std::max<int>(maxServers, serverCursors[i].size());
|
|
}
|
|
sortedVersions.resize(maxServers);
|
|
}
|
|
|
|
ILogSystem::SetPeekCursor::SetPeekCursor(std::vector<Reference<LogSet>> const& logSets,
|
|
std::vector<std::vector<Reference<IPeekCursor>>> const& serverCursors,
|
|
LogMessageVersion const& messageVersion,
|
|
int bestSet,
|
|
int bestServer,
|
|
Optional<LogMessageVersion> nextVersion,
|
|
bool useBestSet)
|
|
: logSets(logSets), serverCursors(serverCursors), messageVersion(messageVersion), bestSet(bestSet),
|
|
bestServer(bestServer), nextVersion(nextVersion), currentSet(bestSet), currentCursor(0), hasNextMessage(false),
|
|
useBestSet(useBestSet), randomID(deterministicRandom()->randomUniqueID()) {
|
|
int maxServers = 0;
|
|
for (int i = 0; i < logSets.size(); i++) {
|
|
maxServers = std::max<int>(maxServers, serverCursors[i].size());
|
|
}
|
|
sortedVersions.resize(maxServers);
|
|
calcHasMessage();
|
|
}
|
|
|
|
Reference<ILogSystem::IPeekCursor> ILogSystem::SetPeekCursor::cloneNoMore() {
|
|
vector<vector<Reference<ILogSystem::IPeekCursor>>> cursors;
|
|
cursors.resize(logSets.size());
|
|
for (int i = 0; i < logSets.size(); i++) {
|
|
for (int j = 0; j < logSets[i]->logServers.size(); j++) {
|
|
cursors[i].push_back(serverCursors[i][j]->cloneNoMore());
|
|
}
|
|
}
|
|
return makeReference<ILogSystem::SetPeekCursor>(
|
|
logSets, cursors, messageVersion, bestSet, bestServer, nextVersion, useBestSet);
|
|
}
|
|
|
|
void ILogSystem::SetPeekCursor::setProtocolVersion(ProtocolVersion version) {
|
|
for (auto& cursors : serverCursors) {
|
|
for (auto& it : cursors) {
|
|
if (it->hasMessage()) {
|
|
it->setProtocolVersion(version);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Arena& ILogSystem::SetPeekCursor::arena() {
|
|
return serverCursors[currentSet][currentCursor]->arena();
|
|
}
|
|
|
|
ArenaReader* ILogSystem::SetPeekCursor::reader() {
|
|
return serverCursors[currentSet][currentCursor]->reader();
|
|
}
|
|
|
|
void ILogSystem::SetPeekCursor::calcHasMessage() {
|
|
if (bestSet >= 0 && bestServer >= 0) {
|
|
if (nextVersion.present()) {
|
|
//TraceEvent("LPC_CalcNext").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage).detail("NextVersion", nextVersion.get().toString());
|
|
serverCursors[bestSet][bestServer]->advanceTo(nextVersion.get());
|
|
}
|
|
if (serverCursors[bestSet][bestServer]->hasMessage()) {
|
|
messageVersion = serverCursors[bestSet][bestServer]->version();
|
|
currentSet = bestSet;
|
|
currentCursor = bestServer;
|
|
hasNextMessage = true;
|
|
|
|
//TraceEvent("LPC_Calc1").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
|
|
|
|
for (auto& cursors : serverCursors) {
|
|
for (auto& c : cursors) {
|
|
c->advanceTo(messageVersion);
|
|
}
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
auto bestVersion = serverCursors[bestSet][bestServer]->version();
|
|
for (auto& cursors : serverCursors) {
|
|
for (auto& c : cursors) {
|
|
c->advanceTo(bestVersion);
|
|
}
|
|
}
|
|
}
|
|
|
|
hasNextMessage = false;
|
|
if (useBestSet) {
|
|
updateMessage(bestSet, false); // Use Quorum logic
|
|
|
|
//TraceEvent("LPC_Calc2").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
|
|
if (!hasNextMessage) {
|
|
updateMessage(bestSet, true);
|
|
//TraceEvent("LPC_Calc3").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
|
|
}
|
|
} else {
|
|
for (int i = 0; i < logSets.size() && !hasNextMessage; i++) {
|
|
if (i != bestSet) {
|
|
updateMessage(i, false); // Use Quorum logic
|
|
}
|
|
}
|
|
//TraceEvent("LPC_Calc4").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
|
|
for (int i = 0; i < logSets.size() && !hasNextMessage; i++) {
|
|
if (i != bestSet) {
|
|
updateMessage(i, true);
|
|
}
|
|
}
|
|
//TraceEvent("LPC_Calc5").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
|
|
}
|
|
}
|
|
|
|
void ILogSystem::SetPeekCursor::updateMessage(int logIdx, bool usePolicy) {
|
|
loop {
|
|
bool advancedPast = false;
|
|
sortedVersions.clear();
|
|
for (int i = 0; i < serverCursors[logIdx].size(); i++) {
|
|
auto& serverCursor = serverCursors[logIdx][i];
|
|
if (nextVersion.present())
|
|
serverCursor->advanceTo(nextVersion.get());
|
|
sortedVersions.push_back(std::pair<LogMessageVersion, int>(serverCursor->version(), i));
|
|
//TraceEvent("LPC_Update1").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage).detail("ServerVer", serverCursor->version().toString()).detail("I", i);
|
|
}
|
|
|
|
if (usePolicy) {
|
|
std::sort(sortedVersions.begin(), sortedVersions.end());
|
|
locations.clear();
|
|
for (auto sortedVersion : sortedVersions) {
|
|
locations.push_back(logSets[logIdx]->logEntryArray[sortedVersion.second]);
|
|
if (locations.size() >= logSets[logIdx]->tLogReplicationFactor &&
|
|
logSets[logIdx]->satisfiesPolicy(locations)) {
|
|
messageVersion = sortedVersion.first;
|
|
break;
|
|
}
|
|
}
|
|
} else {
|
|
//(int)oldLogData[i].logServers.size() + 1 - oldLogData[i].tLogReplicationFactor
|
|
std::nth_element(sortedVersions.begin(),
|
|
sortedVersions.end() -
|
|
(logSets[logIdx]->logServers.size() + 1 - logSets[logIdx]->tLogReplicationFactor),
|
|
sortedVersions.end());
|
|
messageVersion = sortedVersions[sortedVersions.size() - (logSets[logIdx]->logServers.size() + 1 -
|
|
logSets[logIdx]->tLogReplicationFactor)]
|
|
.first;
|
|
}
|
|
|
|
for (auto& cursors : serverCursors) {
|
|
for (auto& c : cursors) {
|
|
auto start = c->version();
|
|
c->advanceTo(messageVersion);
|
|
if (start <= messageVersion && messageVersion < c->version()) {
|
|
advancedPast = true;
|
|
TEST(true); // Merge peek cursor with logIdx advanced past desired sequence
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!advancedPast)
|
|
break;
|
|
}
|
|
|
|
for (int i = 0; i < serverCursors[logIdx].size(); i++) {
|
|
auto& c = serverCursors[logIdx][i];
|
|
ASSERT_WE_THINK(!c->hasMessage() ||
|
|
c->version() >= messageVersion); // Seems like the loop above makes this unconditionally true
|
|
if (c->version() == messageVersion && c->hasMessage()) {
|
|
hasNextMessage = true;
|
|
currentSet = logIdx;
|
|
currentCursor = i;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
bool ILogSystem::SetPeekCursor::hasMessage() const {
|
|
return hasNextMessage;
|
|
}
|
|
|
|
void ILogSystem::SetPeekCursor::nextMessage() {
|
|
nextVersion = version();
|
|
nextVersion.get().sub++;
|
|
serverCursors[currentSet][currentCursor]->nextMessage();
|
|
calcHasMessage();
|
|
ASSERT(hasMessage() || !version().sub);
|
|
}
|
|
|
|
StringRef ILogSystem::SetPeekCursor::getMessage() {
|
|
return serverCursors[currentSet][currentCursor]->getMessage();
|
|
}
|
|
|
|
StringRef ILogSystem::SetPeekCursor::getMessageWithTags() {
|
|
return serverCursors[currentSet][currentCursor]->getMessageWithTags();
|
|
}
|
|
|
|
VectorRef<Tag> ILogSystem::SetPeekCursor::getTags() const {
|
|
return serverCursors[currentSet][currentCursor]->getTags();
|
|
}
|
|
|
|
void ILogSystem::SetPeekCursor::advanceTo(LogMessageVersion n) {
|
|
bool canChange = false;
|
|
for (auto& cursors : serverCursors) {
|
|
for (auto& c : cursors) {
|
|
if (c->version() < n) {
|
|
canChange = true;
|
|
c->advanceTo(n);
|
|
}
|
|
}
|
|
}
|
|
if (canChange) {
|
|
calcHasMessage();
|
|
}
|
|
}
|
|
|
|
ACTOR Future<Void> setPeekGetMore(ILogSystem::SetPeekCursor* self,
|
|
LogMessageVersion startVersion,
|
|
TaskPriority taskID) {
|
|
loop {
|
|
//TraceEvent("LPC_GetMore1", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag.toString());
|
|
if (self->bestServer >= 0 && self->bestSet >= 0 &&
|
|
self->serverCursors[self->bestSet][self->bestServer]->isActive()) {
|
|
ASSERT(!self->serverCursors[self->bestSet][self->bestServer]->hasMessage());
|
|
//TraceEvent("LPC_GetMore2", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag.toString());
|
|
wait(self->serverCursors[self->bestSet][self->bestServer]->getMore(taskID) ||
|
|
self->serverCursors[self->bestSet][self->bestServer]->onFailed());
|
|
self->useBestSet = true;
|
|
} else {
|
|
// FIXME: if best set is exhausted, do not peek remote servers
|
|
bool bestSetValid = self->bestSet >= 0;
|
|
if (bestSetValid) {
|
|
self->locations.clear();
|
|
for (int i = 0; i < self->serverCursors[self->bestSet].size(); i++) {
|
|
if (!self->serverCursors[self->bestSet][i]->isActive() &&
|
|
self->serverCursors[self->bestSet][i]->version() <= self->messageVersion) {
|
|
self->locations.push_back(self->logSets[self->bestSet]->logEntryArray[i]);
|
|
}
|
|
}
|
|
bestSetValid = self->locations.size() < self->logSets[self->bestSet]->tLogReplicationFactor ||
|
|
!self->logSets[self->bestSet]->satisfiesPolicy(self->locations);
|
|
}
|
|
if (bestSetValid || self->logSets.size() == 1) {
|
|
if (!self->useBestSet) {
|
|
self->useBestSet = true;
|
|
self->calcHasMessage();
|
|
if (self->hasMessage() || self->version() > startVersion)
|
|
return Void();
|
|
}
|
|
|
|
//TraceEvent("LPC_GetMore3", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag.toString()).detail("BestSetSize", self->serverCursors[self->bestSet].size());
|
|
vector<Future<Void>> q;
|
|
for (auto& c : self->serverCursors[self->bestSet]) {
|
|
if (!c->hasMessage()) {
|
|
q.push_back(c->getMore(taskID));
|
|
if (c->isActive()) {
|
|
q.push_back(c->onFailed());
|
|
}
|
|
}
|
|
}
|
|
wait(quorum(q, 1));
|
|
} else {
|
|
// FIXME: this will peeking way too many cursors when satellites exist, and does not need to peek
|
|
// bestSet cursors since we cannot get anymore data from them
|
|
vector<Future<Void>> q;
|
|
//TraceEvent("LPC_GetMore4", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag.toString());
|
|
for (auto& cursors : self->serverCursors) {
|
|
for (auto& c : cursors) {
|
|
if (!c->hasMessage()) {
|
|
q.push_back(c->getMore(taskID));
|
|
}
|
|
}
|
|
}
|
|
wait(quorum(q, 1));
|
|
self->useBestSet = false;
|
|
}
|
|
}
|
|
self->calcHasMessage();
|
|
//TraceEvent("LPC_GetMoreB", self->randomID).detail("HasMessage", self->hasMessage()).detail("Start", startVersion.toString()).detail("Seq", self->version().toString());
|
|
if (self->hasMessage() || self->version() > startVersion)
|
|
return Void();
|
|
}
|
|
}
|
|
|
|
Future<Void> ILogSystem::SetPeekCursor::getMore(TaskPriority taskID) {
|
|
if (more.isValid() && !more.isReady()) {
|
|
return more;
|
|
}
|
|
|
|
auto startVersion = version();
|
|
calcHasMessage();
|
|
if (hasMessage())
|
|
return Void();
|
|
if (nextVersion.present())
|
|
advanceTo(nextVersion.get());
|
|
ASSERT(!hasMessage());
|
|
if (version() > startVersion)
|
|
return Void();
|
|
|
|
more = setPeekGetMore(this, startVersion, taskID);
|
|
return more;
|
|
}
|
|
|
|
Future<Void> ILogSystem::SetPeekCursor::onFailed() {
|
|
ASSERT(false);
|
|
return Never();
|
|
}
|
|
|
|
bool ILogSystem::SetPeekCursor::isActive() const {
|
|
ASSERT(false);
|
|
return false;
|
|
}
|
|
|
|
bool ILogSystem::SetPeekCursor::isExhausted() const {
|
|
return serverCursors[currentSet][currentCursor]->isExhausted();
|
|
}
|
|
|
|
const LogMessageVersion& ILogSystem::SetPeekCursor::version() const {
|
|
return messageVersion;
|
|
}
|
|
|
|
Version ILogSystem::SetPeekCursor::getMinKnownCommittedVersion() const {
|
|
return serverCursors[currentSet][currentCursor]->getMinKnownCommittedVersion();
|
|
}
|
|
|
|
Optional<UID> ILogSystem::SetPeekCursor::getPrimaryPeekLocation() const {
|
|
if (bestServer >= 0 && bestSet >= 0) {
|
|
return serverCursors[bestSet][bestServer]->getPrimaryPeekLocation();
|
|
}
|
|
return Optional<UID>();
|
|
}
|
|
|
|
Optional<UID> ILogSystem::SetPeekCursor::getCurrentPeekLocation() const {
|
|
if (currentCursor >= 0 && currentSet >= 0) {
|
|
return serverCursors[currentSet][currentCursor]->getPrimaryPeekLocation();
|
|
}
|
|
return Optional<UID>();
|
|
}
|
|
|
|
Version ILogSystem::SetPeekCursor::popped() const {
|
|
Version poppedVersion = 0;
|
|
for (auto& cursors : serverCursors) {
|
|
for (auto& c : cursors) {
|
|
poppedVersion = std::max(poppedVersion, c->popped());
|
|
}
|
|
}
|
|
return poppedVersion;
|
|
}
|
|
|
|
ILogSystem::MultiCursor::MultiCursor(std::vector<Reference<IPeekCursor>> cursors,
|
|
std::vector<LogMessageVersion> epochEnds)
|
|
: cursors(cursors), epochEnds(epochEnds), poppedVersion(0) {
|
|
for (int i = 0; i < std::min<int>(cursors.size(), SERVER_KNOBS->MULTI_CURSOR_PRE_FETCH_LIMIT); i++) {
|
|
cursors[cursors.size() - i - 1]->getMore();
|
|
}
|
|
}
|
|
|
|
Reference<ILogSystem::IPeekCursor> ILogSystem::MultiCursor::cloneNoMore() {
|
|
return cursors.back()->cloneNoMore();
|
|
}
|
|
|
|
void ILogSystem::MultiCursor::setProtocolVersion(ProtocolVersion version) {
|
|
cursors.back()->setProtocolVersion(version);
|
|
}
|
|
|
|
Arena& ILogSystem::MultiCursor::arena() {
|
|
return cursors.back()->arena();
|
|
}
|
|
|
|
ArenaReader* ILogSystem::MultiCursor::reader() {
|
|
return cursors.back()->reader();
|
|
}
|
|
|
|
bool ILogSystem::MultiCursor::hasMessage() const {
|
|
return cursors.back()->hasMessage();
|
|
}
|
|
|
|
void ILogSystem::MultiCursor::nextMessage() {
|
|
cursors.back()->nextMessage();
|
|
}
|
|
|
|
StringRef ILogSystem::MultiCursor::getMessage() {
|
|
return cursors.back()->getMessage();
|
|
}
|
|
|
|
StringRef ILogSystem::MultiCursor::getMessageWithTags() {
|
|
return cursors.back()->getMessageWithTags();
|
|
}
|
|
|
|
VectorRef<Tag> ILogSystem::MultiCursor::getTags() const {
|
|
return cursors.back()->getTags();
|
|
}
|
|
|
|
void ILogSystem::MultiCursor::advanceTo(LogMessageVersion n) {
|
|
while (cursors.size() > 1 && n >= epochEnds.back()) {
|
|
poppedVersion = std::max(poppedVersion, cursors.back()->popped());
|
|
cursors.pop_back();
|
|
epochEnds.pop_back();
|
|
}
|
|
cursors.back()->advanceTo(n);
|
|
}
|
|
|
|
Future<Void> ILogSystem::MultiCursor::getMore(TaskPriority taskID) {
|
|
LogMessageVersion startVersion = cursors.back()->version();
|
|
while (cursors.size() > 1 && cursors.back()->version() >= epochEnds.back()) {
|
|
poppedVersion = std::max(poppedVersion, cursors.back()->popped());
|
|
cursors.pop_back();
|
|
epochEnds.pop_back();
|
|
}
|
|
if (cursors.back()->version() > startVersion) {
|
|
return Void();
|
|
}
|
|
return cursors.back()->getMore(taskID);
|
|
}
|
|
|
|
Future<Void> ILogSystem::MultiCursor::onFailed() {
|
|
return cursors.back()->onFailed();
|
|
}
|
|
|
|
bool ILogSystem::MultiCursor::isActive() const {
|
|
return cursors.back()->isActive();
|
|
}
|
|
|
|
bool ILogSystem::MultiCursor::isExhausted() const {
|
|
return cursors.back()->isExhausted();
|
|
}
|
|
|
|
const LogMessageVersion& ILogSystem::MultiCursor::version() const {
|
|
return cursors.back()->version();
|
|
}
|
|
|
|
Version ILogSystem::MultiCursor::getMinKnownCommittedVersion() const {
|
|
return cursors.back()->getMinKnownCommittedVersion();
|
|
}
|
|
|
|
Optional<UID> ILogSystem::MultiCursor::getPrimaryPeekLocation() const {
|
|
return cursors.back()->getPrimaryPeekLocation();
|
|
}
|
|
|
|
Optional<UID> ILogSystem::MultiCursor::getCurrentPeekLocation() const {
|
|
return cursors.back()->getCurrentPeekLocation();
|
|
}
|
|
|
|
Version ILogSystem::MultiCursor::popped() const {
|
|
return std::max(poppedVersion, cursors.back()->popped());
|
|
}
|
|
|
|
ILogSystem::BufferedCursor::BufferedCursor(std::vector<Reference<IPeekCursor>> cursors,
|
|
Version begin,
|
|
Version end,
|
|
bool withTags,
|
|
bool collectTags,
|
|
bool canDiscardPopped)
|
|
: cursors(cursors), messageVersion(begin), end(end), withTags(withTags), collectTags(collectTags),
|
|
hasNextMessage(false), messageIndex(0), poppedVersion(0), initialPoppedVersion(0),
|
|
canDiscardPopped(canDiscardPopped), knownUnique(false), minKnownCommittedVersion(0),
|
|
randomID(deterministicRandom()->randomUniqueID()) {
|
|
targetQueueSize = SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES / cursors.size();
|
|
messages.reserve(SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES);
|
|
cursorMessages.resize(cursors.size());
|
|
}
|
|
|
|
ILogSystem::BufferedCursor::BufferedCursor(
|
|
std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers,
|
|
Tag tag,
|
|
Version begin,
|
|
Version end,
|
|
bool parallelGetMore)
|
|
: messageVersion(begin), end(end), withTags(true), collectTags(false), hasNextMessage(false), messageIndex(0),
|
|
poppedVersion(0), initialPoppedVersion(0), canDiscardPopped(false), knownUnique(true), minKnownCommittedVersion(0),
|
|
randomID(deterministicRandom()->randomUniqueID()) {
|
|
targetQueueSize = SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES / logServers.size();
|
|
messages.reserve(SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES);
|
|
cursorMessages.resize(logServers.size());
|
|
for (int i = 0; i < logServers.size(); i++) {
|
|
auto cursor =
|
|
makeReference<ILogSystem::ServerPeekCursor>(logServers[i], tag, begin, end, false, parallelGetMore);
|
|
cursors.push_back(cursor);
|
|
}
|
|
}
|
|
|
|
void ILogSystem::BufferedCursor::combineMessages() {
|
|
if (!hasNextMessage) {
|
|
return;
|
|
}
|
|
|
|
std::vector<Tag> tags;
|
|
tags.push_back(messages[messageIndex].tags[0]);
|
|
for (int i = messageIndex + 1; i < messages.size() && messages[messageIndex].version == messages[i].version; i++) {
|
|
tags.push_back(messages[i].tags[0]);
|
|
messageIndex = i;
|
|
}
|
|
auto& msg = messages[messageIndex];
|
|
BinaryWriter messageWriter(Unversioned());
|
|
messageWriter << uint32_t(msg.message.size() + sizeof(uint32_t) + sizeof(uint16_t) + tags.size() * sizeof(Tag))
|
|
<< msg.version.sub << uint16_t(tags.size());
|
|
for (auto t : tags) {
|
|
messageWriter << t;
|
|
}
|
|
messageWriter.serializeBytes(msg.message);
|
|
Standalone<StringRef> val = messageWriter.toValue();
|
|
msg.arena = val.arena();
|
|
msg.message = val;
|
|
msg.tags = VectorRef<Tag>();
|
|
for (auto t : tags) {
|
|
msg.tags.push_back(msg.arena, t);
|
|
}
|
|
}
|
|
|
|
Reference<ILogSystem::IPeekCursor> ILogSystem::BufferedCursor::cloneNoMore() {
|
|
ASSERT(false);
|
|
return Reference<ILogSystem::IPeekCursor>();
|
|
}
|
|
|
|
void ILogSystem::BufferedCursor::setProtocolVersion(ProtocolVersion version) {
|
|
for (auto& c : cursors) {
|
|
c->setProtocolVersion(version);
|
|
}
|
|
}
|
|
|
|
Arena& ILogSystem::BufferedCursor::arena() {
|
|
return messages[messageIndex].arena;
|
|
}
|
|
|
|
ArenaReader* ILogSystem::BufferedCursor::reader() {
|
|
ASSERT(false);
|
|
return cursors[0]->reader();
|
|
}
|
|
|
|
bool ILogSystem::BufferedCursor::hasMessage() const {
|
|
return hasNextMessage;
|
|
}
|
|
|
|
void ILogSystem::BufferedCursor::nextMessage() {
|
|
messageIndex++;
|
|
if (messageIndex == messages.size()) {
|
|
hasNextMessage = false;
|
|
}
|
|
if (collectTags) {
|
|
combineMessages();
|
|
}
|
|
}
|
|
|
|
StringRef ILogSystem::BufferedCursor::getMessage() {
|
|
ASSERT(!withTags);
|
|
return messages[messageIndex].message;
|
|
}
|
|
|
|
StringRef ILogSystem::BufferedCursor::getMessageWithTags() {
|
|
ASSERT(withTags);
|
|
return messages[messageIndex].message;
|
|
}
|
|
|
|
VectorRef<Tag> ILogSystem::BufferedCursor::getTags() const {
|
|
ASSERT(withTags);
|
|
return messages[messageIndex].tags;
|
|
}
|
|
|
|
void ILogSystem::BufferedCursor::advanceTo(LogMessageVersion n) {
|
|
ASSERT(false);
|
|
}
|
|
|
|
ACTOR Future<Void> bufferedGetMoreLoader(ILogSystem::BufferedCursor* self,
|
|
Reference<ILogSystem::IPeekCursor> cursor,
|
|
int idx,
|
|
TaskPriority taskID) {
|
|
loop {
|
|
wait(yield());
|
|
if (cursor->version().version >= self->end || self->cursorMessages[idx].size() > self->targetQueueSize) {
|
|
return Void();
|
|
}
|
|
wait(cursor->getMore(taskID));
|
|
self->poppedVersion = std::max(self->poppedVersion, cursor->popped());
|
|
self->minKnownCommittedVersion =
|
|
std::max(self->minKnownCommittedVersion, cursor->getMinKnownCommittedVersion());
|
|
if (self->canDiscardPopped) {
|
|
self->initialPoppedVersion = std::max(self->initialPoppedVersion, cursor->popped());
|
|
}
|
|
if (cursor->version().version >= self->end) {
|
|
return Void();
|
|
}
|
|
while (cursor->hasMessage()) {
|
|
self->cursorMessages[idx].push_back(ILogSystem::BufferedCursor::BufferedMessage(
|
|
cursor->arena(),
|
|
(!self->withTags || self->collectTags) ? cursor->getMessage() : cursor->getMessageWithTags(),
|
|
!self->withTags ? VectorRef<Tag>() : cursor->getTags(),
|
|
cursor->version()));
|
|
cursor->nextMessage();
|
|
}
|
|
}
|
|
}
|
|
|
|
ACTOR Future<Void> bufferedGetMore(ILogSystem::BufferedCursor* self, TaskPriority taskID) {
|
|
if (self->messageVersion.version >= self->end) {
|
|
wait(Future<Void>(Never()));
|
|
throw internal_error();
|
|
}
|
|
|
|
self->messages.clear();
|
|
|
|
std::vector<Future<Void>> loaders;
|
|
loaders.reserve(self->cursors.size());
|
|
|
|
for (int i = 0; i < self->cursors.size(); i++) {
|
|
loaders.push_back(bufferedGetMoreLoader(self, self->cursors[i], i, taskID));
|
|
}
|
|
|
|
state Future<Void> allLoaders = waitForAll(loaders);
|
|
state Version minVersion;
|
|
loop {
|
|
wait(allLoaders || delay(SERVER_KNOBS->DESIRED_GET_MORE_DELAY, taskID));
|
|
minVersion = self->end;
|
|
for (int i = 0; i < self->cursors.size(); i++) {
|
|
auto cursor = self->cursors[i];
|
|
while (cursor->hasMessage()) {
|
|
self->cursorMessages[i].push_back(ILogSystem::BufferedCursor::BufferedMessage(
|
|
cursor->arena(),
|
|
(!self->withTags || self->collectTags) ? cursor->getMessage() : cursor->getMessageWithTags(),
|
|
!self->withTags ? VectorRef<Tag>() : cursor->getTags(),
|
|
cursor->version()));
|
|
cursor->nextMessage();
|
|
}
|
|
minVersion = std::min(minVersion, cursor->version().version);
|
|
}
|
|
if (minVersion > self->messageVersion.version) {
|
|
break;
|
|
}
|
|
if (allLoaders.isReady()) {
|
|
wait(Future<Void>(Never()));
|
|
}
|
|
}
|
|
wait(yield());
|
|
|
|
for (auto& it : self->cursorMessages) {
|
|
while (!it.empty() && it.front().version.version < minVersion) {
|
|
self->messages.push_back(it.front());
|
|
it.pop_front();
|
|
}
|
|
}
|
|
if (self->collectTags || self->knownUnique) {
|
|
std::sort(self->messages.begin(), self->messages.end());
|
|
} else {
|
|
uniquify(self->messages);
|
|
}
|
|
|
|
self->messageVersion = LogMessageVersion(minVersion);
|
|
self->messageIndex = 0;
|
|
self->hasNextMessage = self->messages.size() > 0;
|
|
|
|
if (self->collectTags) {
|
|
self->combineMessages();
|
|
}
|
|
|
|
wait(yield());
|
|
if (self->canDiscardPopped && self->poppedVersion > self->version().version) {
|
|
TraceEvent(SevWarn, "DiscardingPoppedData", self->randomID)
|
|
.detail("Version", self->version().version)
|
|
.detail("Popped", self->poppedVersion);
|
|
self->messageVersion = std::max(self->messageVersion, LogMessageVersion(self->poppedVersion));
|
|
for (auto cursor : self->cursors) {
|
|
cursor->advanceTo(self->messageVersion);
|
|
}
|
|
self->messageIndex = self->messages.size();
|
|
if (self->messages.size() > 0 &&
|
|
self->messages[self->messages.size() - 1].version.version < self->poppedVersion) {
|
|
self->hasNextMessage = false;
|
|
} else {
|
|
auto iter = std::lower_bound(self->messages.begin(),
|
|
self->messages.end(),
|
|
ILogSystem::BufferedCursor::BufferedMessage(self->poppedVersion));
|
|
self->hasNextMessage = iter != self->messages.end();
|
|
if (self->hasNextMessage) {
|
|
self->messageIndex = iter - self->messages.begin();
|
|
}
|
|
}
|
|
}
|
|
if (self->hasNextMessage) {
|
|
self->canDiscardPopped = false;
|
|
}
|
|
return Void();
|
|
}
|
|
|
|
Future<Void> ILogSystem::BufferedCursor::getMore(TaskPriority taskID) {
|
|
if (hasMessage()) {
|
|
return Void();
|
|
}
|
|
|
|
if (!more.isValid() || more.isReady()) {
|
|
more = bufferedGetMore(this, taskID);
|
|
}
|
|
return more;
|
|
}
|
|
|
|
Future<Void> ILogSystem::BufferedCursor::onFailed() {
|
|
ASSERT(false);
|
|
return Never();
|
|
}
|
|
|
|
bool ILogSystem::BufferedCursor::isActive() const {
|
|
ASSERT(false);
|
|
return false;
|
|
}
|
|
|
|
bool ILogSystem::BufferedCursor::isExhausted() const {
|
|
ASSERT(false);
|
|
return false;
|
|
}
|
|
|
|
const LogMessageVersion& ILogSystem::BufferedCursor::version() const {
|
|
if (hasNextMessage) {
|
|
return messages[messageIndex].version;
|
|
}
|
|
return messageVersion;
|
|
}
|
|
|
|
Version ILogSystem::BufferedCursor::getMinKnownCommittedVersion() const {
|
|
return minKnownCommittedVersion;
|
|
}
|
|
|
|
Optional<UID> ILogSystem::BufferedCursor::getPrimaryPeekLocation() const {
|
|
return Optional<UID>();
|
|
}
|
|
|
|
Optional<UID> ILogSystem::BufferedCursor::getCurrentPeekLocation() const {
|
|
return Optional<UID>();
|
|
}
|
|
|
|
Version ILogSystem::BufferedCursor::popped() const {
|
|
if (initialPoppedVersion == poppedVersion) {
|
|
return 0;
|
|
}
|
|
return poppedVersion;
|
|
}
|