mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-16 02:42:23 +08:00
Merge pull request #2242 from alexmiller-apple/fix-10min-stall-again
Fix the 10min multi-region recovery stall again
This commit is contained in:
commit
6c0b934dda
@ -348,14 +348,14 @@ ACTOR Future<Void> logRouterPeekMessages( LogRouterData* self, TLogPeekRequest r
|
|||||||
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
|
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
|
||||||
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
|
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
|
||||||
if(seqBegin->second.canBeSet()) {
|
if(seqBegin->second.canBeSet()) {
|
||||||
seqBegin->second.sendError(timed_out());
|
seqBegin->second.sendError(operation_obsolete());
|
||||||
}
|
}
|
||||||
trackerData.sequence_version.erase(seqBegin);
|
trackerData.sequence_version.erase(seqBegin);
|
||||||
seqBegin = trackerData.sequence_version.begin();
|
seqBegin = trackerData.sequence_version.begin();
|
||||||
}
|
}
|
||||||
|
|
||||||
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
|
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
|
||||||
throw timed_out();
|
throw operation_obsolete();
|
||||||
}
|
}
|
||||||
|
|
||||||
trackerData.lastUpdate = now();
|
trackerData.lastUpdate = now();
|
||||||
@ -364,8 +364,8 @@ ACTOR Future<Void> logRouterPeekMessages( LogRouterData* self, TLogPeekRequest r
|
|||||||
req.onlySpilled = prevPeekData.second;
|
req.onlySpilled = prevPeekData.second;
|
||||||
wait(yield());
|
wait(yield());
|
||||||
} catch( Error &e ) {
|
} catch( Error &e ) {
|
||||||
if(e.code() == error_code_timed_out) {
|
if(e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(e);
|
||||||
return Void();
|
return Void();
|
||||||
} else {
|
} else {
|
||||||
throw;
|
throw;
|
||||||
@ -425,15 +425,15 @@ ACTOR Future<Void> logRouterPeekMessages( LogRouterData* self, TLogPeekRequest r
|
|||||||
trackerData.lastUpdate = now();
|
trackerData.lastUpdate = now();
|
||||||
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
||||||
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(operation_obsolete());
|
||||||
if(!sequenceData.isSet())
|
if(!sequenceData.isSet())
|
||||||
sequenceData.sendError(timed_out());
|
sequenceData.sendError(operation_obsolete());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
if(sequenceData.isSet()) {
|
if(sequenceData.isSet()) {
|
||||||
if(sequenceData.getFuture().get().first != reply.end) {
|
if(sequenceData.getFuture().get().first != reply.end) {
|
||||||
TEST(true); //tlog peek second attempt ended at a different version
|
TEST(true); //tlog peek second attempt ended at a different version
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(operation_obsolete());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -151,11 +151,8 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
|
|||||||
self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled, std::make_pair(self->randomID, self->sequence++)), taskID) ) );
|
self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled, std::make_pair(self->randomID, self->sequence++)), taskID) ) );
|
||||||
}
|
}
|
||||||
if (self->sequence == std::numeric_limits<decltype(self->sequence)>::max()) {
|
if (self->sequence == std::numeric_limits<decltype(self->sequence)>::max()) {
|
||||||
throw timed_out();
|
throw operation_obsolete();
|
||||||
}
|
}
|
||||||
} else if (self->futureResults.size() == 1) {
|
|
||||||
self->randomID = deterministicRandom()->randomUniqueID();
|
|
||||||
self->sequence = 0;
|
|
||||||
} else if (self->futureResults.size() == 0) {
|
} else if (self->futureResults.size() == 0) {
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
@ -166,7 +163,7 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
|
|||||||
choose {
|
choose {
|
||||||
when( TLogPeekReply res = wait( self->interf->get().present() ? self->futureResults.front() : Never() ) ) {
|
when( TLogPeekReply res = wait( self->interf->get().present() ? self->futureResults.front() : Never() ) ) {
|
||||||
if(res.begin.get() != expectedBegin) {
|
if(res.begin.get() != expectedBegin) {
|
||||||
throw timed_out();
|
throw operation_obsolete();
|
||||||
}
|
}
|
||||||
expectedBegin = res.end;
|
expectedBegin = res.end;
|
||||||
self->futureResults.pop_front();
|
self->futureResults.pop_front();
|
||||||
@ -194,8 +191,11 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
|
|||||||
if(e.code() == error_code_end_of_stream) {
|
if(e.code() == error_code_end_of_stream) {
|
||||||
self->end.reset( self->messageVersion.version );
|
self->end.reset( self->messageVersion.version );
|
||||||
return Void();
|
return Void();
|
||||||
} else if(e.code() == error_code_timed_out) {
|
} else if(e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
|
||||||
TraceEvent("PeekCursorTimedOut", self->randomID);
|
TraceEvent("PeekCursorTimedOut", self->randomID).error(e);
|
||||||
|
// We *should* never get timed_out(), as it means the TLog got stuck while handling a parallel peek,
|
||||||
|
// and thus we've likely just wasted 10min.
|
||||||
|
ASSERT_WE_THINK(e.code() == error_code_operation_obsolete || SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME < 10);
|
||||||
self->interfaceChanged = self->interf->onChange();
|
self->interfaceChanged = self->interf->onChange();
|
||||||
self->randomID = deterministicRandom()->randomUniqueID();
|
self->randomID = deterministicRandom()->randomUniqueID();
|
||||||
self->sequence = 0;
|
self->sequence = 0;
|
||||||
|
@ -876,18 +876,18 @@ namespace oldTLog_4_6 {
|
|||||||
peekId = req.sequence.get().first;
|
peekId = req.sequence.get().first;
|
||||||
sequence = req.sequence.get().second;
|
sequence = req.sequence.get().second;
|
||||||
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->peekTracker.find(peekId) == self->peekTracker.end()) {
|
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->peekTracker.find(peekId) == self->peekTracker.end()) {
|
||||||
throw timed_out();
|
throw operation_obsolete();
|
||||||
}
|
}
|
||||||
if(sequence > 0) {
|
if(sequence > 0) {
|
||||||
auto& trackerData = self->peekTracker[peekId];
|
auto& trackerData = self->peekTracker[peekId];
|
||||||
trackerData.lastUpdate = now();
|
trackerData.lastUpdate = now();
|
||||||
Version ver = wait(trackerData.sequence_version[sequence].getFuture());
|
Version ver = wait(trackerData.sequence_version[sequence].getFuture());
|
||||||
req.begin = ver;
|
req.begin = std::max(ver, req.begin);
|
||||||
wait(yield());
|
wait(yield());
|
||||||
}
|
}
|
||||||
} catch( Error &e ) {
|
} catch( Error &e ) {
|
||||||
if(e.code() == error_code_timed_out) {
|
if(e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(e);
|
||||||
return Void();
|
return Void();
|
||||||
} else {
|
} else {
|
||||||
throw;
|
throw;
|
||||||
@ -923,15 +923,15 @@ namespace oldTLog_4_6 {
|
|||||||
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
||||||
trackerData.lastUpdate = now();
|
trackerData.lastUpdate = now();
|
||||||
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(operation_obsolete());
|
||||||
if (!sequenceData.isSet())
|
if (!sequenceData.isSet())
|
||||||
sequenceData.sendError(timed_out());
|
sequenceData.sendError(operation_obsolete());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
if(sequenceData.isSet()) {
|
if(sequenceData.isSet()) {
|
||||||
if(sequenceData.getFuture().get() != rep.end) {
|
if(sequenceData.getFuture().get() != rep.end) {
|
||||||
TEST(true); //tlog peek second attempt ended at a different version
|
TEST(true); //tlog peek second attempt ended at a different version
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(operation_obsolete());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -1002,7 +1002,7 @@ namespace oldTLog_4_6 {
|
|||||||
if(sequenceData.isSet()) {
|
if(sequenceData.isSet()) {
|
||||||
if(sequenceData.getFuture().get() != reply.end) {
|
if(sequenceData.getFuture().get() != reply.end) {
|
||||||
TEST(true); //tlog peek second attempt ended at a different version
|
TEST(true); //tlog peek second attempt ended at a different version
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(operation_obsolete());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1052,7 +1052,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||||||
peekId = req.sequence.get().first;
|
peekId = req.sequence.get().first;
|
||||||
sequence = req.sequence.get().second;
|
sequence = req.sequence.get().second;
|
||||||
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && logData->peekTracker.find(peekId) == logData->peekTracker.end()) {
|
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && logData->peekTracker.find(peekId) == logData->peekTracker.end()) {
|
||||||
throw timed_out();
|
throw operation_obsolete();
|
||||||
}
|
}
|
||||||
auto& trackerData = logData->peekTracker[peekId];
|
auto& trackerData = logData->peekTracker[peekId];
|
||||||
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
|
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
|
||||||
@ -1061,24 +1061,24 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||||||
auto seqBegin = trackerData.sequence_version.begin();
|
auto seqBegin = trackerData.sequence_version.begin();
|
||||||
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
|
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
|
||||||
if(seqBegin->second.canBeSet()) {
|
if(seqBegin->second.canBeSet()) {
|
||||||
seqBegin->second.sendError(timed_out());
|
seqBegin->second.sendError(operation_obsolete());
|
||||||
}
|
}
|
||||||
trackerData.sequence_version.erase(seqBegin);
|
trackerData.sequence_version.erase(seqBegin);
|
||||||
seqBegin = trackerData.sequence_version.begin();
|
seqBegin = trackerData.sequence_version.begin();
|
||||||
}
|
}
|
||||||
|
|
||||||
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
|
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
|
||||||
throw timed_out();
|
throw operation_obsolete();
|
||||||
}
|
}
|
||||||
|
|
||||||
trackerData.lastUpdate = now();
|
trackerData.lastUpdate = now();
|
||||||
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
|
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
|
||||||
req.begin = prevPeekData.first;
|
req.begin = std::max(prevPeekData.first, req.begin);
|
||||||
req.onlySpilled = prevPeekData.second;
|
req.onlySpilled = prevPeekData.second;
|
||||||
wait(yield());
|
wait(yield());
|
||||||
} catch( Error &e ) {
|
} catch( Error &e ) {
|
||||||
if(e.code() == error_code_timed_out) {
|
if(e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(e);
|
||||||
return Void();
|
return Void();
|
||||||
} else {
|
} else {
|
||||||
throw;
|
throw;
|
||||||
@ -1134,15 +1134,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||||||
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
||||||
trackerData.lastUpdate = now();
|
trackerData.lastUpdate = now();
|
||||||
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(operation_obsolete());
|
||||||
if (!sequenceData.isSet())
|
if (!sequenceData.isSet())
|
||||||
sequenceData.sendError(timed_out());
|
sequenceData.sendError(operation_obsolete());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
if(sequenceData.isSet()) {
|
if(sequenceData.isSet()) {
|
||||||
if(sequenceData.getFuture().get().first != rep.end) {
|
if(sequenceData.getFuture().get().first != rep.end) {
|
||||||
TEST(true); //tlog peek second attempt ended at a different version
|
TEST(true); //tlog peek second attempt ended at a different version
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(operation_obsolete());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -1210,15 +1210,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||||||
trackerData.lastUpdate = now();
|
trackerData.lastUpdate = now();
|
||||||
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
||||||
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(operation_obsolete());
|
||||||
if(!sequenceData.isSet())
|
if(!sequenceData.isSet())
|
||||||
sequenceData.sendError(timed_out());
|
sequenceData.sendError(operation_obsolete());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
if(sequenceData.isSet()) {
|
if(sequenceData.isSet()) {
|
||||||
if(sequenceData.getFuture().get().first != reply.end) {
|
if(sequenceData.getFuture().get().first != reply.end) {
|
||||||
TEST(true); //tlog peek second attempt ended at a different version
|
TEST(true); //tlog peek second attempt ended at a different version
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(operation_obsolete());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1356,6 +1356,9 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||||||
try {
|
try {
|
||||||
peekId = req.sequence.get().first;
|
peekId = req.sequence.get().first;
|
||||||
sequence = req.sequence.get().second;
|
sequence = req.sequence.get().second;
|
||||||
|
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && logData->peekTracker.find(peekId) == logData->peekTracker.end()) {
|
||||||
|
throw operation_obsolete();
|
||||||
|
}
|
||||||
auto& trackerData = logData->peekTracker[peekId];
|
auto& trackerData = logData->peekTracker[peekId];
|
||||||
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
|
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
|
||||||
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
|
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
|
||||||
@ -1364,24 +1367,24 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||||||
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
|
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
|
||||||
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
|
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
|
||||||
if(seqBegin->second.canBeSet()) {
|
if(seqBegin->second.canBeSet()) {
|
||||||
seqBegin->second.sendError(timed_out());
|
seqBegin->second.sendError(operation_obsolete());
|
||||||
}
|
}
|
||||||
trackerData.sequence_version.erase(seqBegin);
|
trackerData.sequence_version.erase(seqBegin);
|
||||||
seqBegin = trackerData.sequence_version.begin();
|
seqBegin = trackerData.sequence_version.begin();
|
||||||
}
|
}
|
||||||
|
|
||||||
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
|
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
|
||||||
throw timed_out();
|
throw operation_obsolete();
|
||||||
}
|
}
|
||||||
|
|
||||||
trackerData.lastUpdate = now();
|
trackerData.lastUpdate = now();
|
||||||
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
|
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
|
||||||
req.begin = prevPeekData.first;
|
req.begin = std::max(prevPeekData.first, req.begin);
|
||||||
req.onlySpilled = prevPeekData.second;
|
req.onlySpilled = prevPeekData.second;
|
||||||
wait(yield());
|
wait(yield());
|
||||||
} catch( Error &e ) {
|
} catch( Error &e ) {
|
||||||
if(e.code() == error_code_timed_out) {
|
if(e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(e);
|
||||||
return Void();
|
return Void();
|
||||||
} else {
|
} else {
|
||||||
throw;
|
throw;
|
||||||
@ -1437,15 +1440,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||||||
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
||||||
trackerData.lastUpdate = now();
|
trackerData.lastUpdate = now();
|
||||||
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(operation_obsolete());
|
||||||
if (!sequenceData.isSet())
|
if (!sequenceData.isSet())
|
||||||
sequenceData.sendError(timed_out());
|
sequenceData.sendError(operation_obsolete());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
if(sequenceData.isSet()) {
|
if(sequenceData.isSet()) {
|
||||||
if(sequenceData.getFuture().get().first != rep.end) {
|
if(sequenceData.getFuture().get().first != rep.end) {
|
||||||
TEST(true); //tlog peek second attempt ended at a different version
|
TEST(true); //tlog peek second attempt ended at a different version
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(operation_obsolete());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -1601,15 +1604,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||||||
trackerData.lastUpdate = now();
|
trackerData.lastUpdate = now();
|
||||||
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
||||||
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(operation_obsolete());
|
||||||
if(!sequenceData.isSet())
|
if(!sequenceData.isSet())
|
||||||
sequenceData.sendError(timed_out());
|
sequenceData.sendError(operation_obsolete());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
if(sequenceData.isSet()) {
|
if(sequenceData.isSet()) {
|
||||||
if(sequenceData.getFuture().get().first != reply.end) {
|
if(sequenceData.getFuture().get().first != reply.end) {
|
||||||
TEST(true); //tlog peek second attempt ended at a different version
|
TEST(true); //tlog peek second attempt ended at a different version
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(operation_obsolete());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1374,7 +1374,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||||||
peekId = req.sequence.get().first;
|
peekId = req.sequence.get().first;
|
||||||
sequence = req.sequence.get().second;
|
sequence = req.sequence.get().second;
|
||||||
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && logData->peekTracker.find(peekId) == logData->peekTracker.end()) {
|
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && logData->peekTracker.find(peekId) == logData->peekTracker.end()) {
|
||||||
throw timed_out();
|
throw operation_obsolete();
|
||||||
}
|
}
|
||||||
auto& trackerData = logData->peekTracker[peekId];
|
auto& trackerData = logData->peekTracker[peekId];
|
||||||
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
|
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
|
||||||
@ -1384,24 +1384,24 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||||||
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
|
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
|
||||||
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
|
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
|
||||||
if(seqBegin->second.canBeSet()) {
|
if(seqBegin->second.canBeSet()) {
|
||||||
seqBegin->second.sendError(timed_out());
|
seqBegin->second.sendError(operation_obsolete());
|
||||||
}
|
}
|
||||||
trackerData.sequence_version.erase(seqBegin);
|
trackerData.sequence_version.erase(seqBegin);
|
||||||
seqBegin = trackerData.sequence_version.begin();
|
seqBegin = trackerData.sequence_version.begin();
|
||||||
}
|
}
|
||||||
|
|
||||||
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
|
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
|
||||||
throw timed_out();
|
throw operation_obsolete();
|
||||||
}
|
}
|
||||||
|
|
||||||
trackerData.lastUpdate = now();
|
trackerData.lastUpdate = now();
|
||||||
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
|
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
|
||||||
req.begin = prevPeekData.first;
|
req.begin = std::max(prevPeekData.first, req.begin);
|
||||||
req.onlySpilled = prevPeekData.second;
|
req.onlySpilled = prevPeekData.second;
|
||||||
wait(yield());
|
wait(yield());
|
||||||
} catch( Error &e ) {
|
} catch( Error &e ) {
|
||||||
if(e.code() == error_code_timed_out) {
|
if(e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(e);
|
||||||
return Void();
|
return Void();
|
||||||
} else {
|
} else {
|
||||||
throw;
|
throw;
|
||||||
@ -1414,6 +1414,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||||||
if(req.sequence.present()) {
|
if(req.sequence.present()) {
|
||||||
auto& trackerData = logData->peekTracker[peekId];
|
auto& trackerData = logData->peekTracker[peekId];
|
||||||
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
||||||
|
trackerData.lastUpdate = now();
|
||||||
if (!sequenceData.isSet()) {
|
if (!sequenceData.isSet()) {
|
||||||
sequenceData.send(std::make_pair(req.begin, req.onlySpilled));
|
sequenceData.send(std::make_pair(req.begin, req.onlySpilled));
|
||||||
}
|
}
|
||||||
@ -1457,15 +1458,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||||||
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
||||||
trackerData.lastUpdate = now();
|
trackerData.lastUpdate = now();
|
||||||
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(operation_obsolete());
|
||||||
if (!sequenceData.isSet())
|
if (!sequenceData.isSet())
|
||||||
sequenceData.sendError(timed_out());
|
sequenceData.sendError(operation_obsolete());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
if(sequenceData.isSet()) {
|
if(sequenceData.isSet()) {
|
||||||
if(sequenceData.getFuture().get().first != rep.end) {
|
if(sequenceData.getFuture().get().first != rep.end) {
|
||||||
TEST(true); //tlog peek second attempt ended at a different version
|
TEST(true); //tlog peek second attempt ended at a different version
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(operation_obsolete());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -1618,18 +1619,23 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||||||
|
|
||||||
if(req.sequence.present()) {
|
if(req.sequence.present()) {
|
||||||
auto& trackerData = logData->peekTracker[peekId];
|
auto& trackerData = logData->peekTracker[peekId];
|
||||||
trackerData.lastUpdate = now();
|
|
||||||
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
||||||
|
trackerData.lastUpdate = now();
|
||||||
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(operation_obsolete());
|
||||||
if(!sequenceData.isSet())
|
if(!sequenceData.isSet()) {
|
||||||
sequenceData.sendError(timed_out());
|
// It would technically be more correct to .send({req.begin, req.onlySpilled}), as the next
|
||||||
|
// request might still be in the window of active requests, but LogSystemPeekCursor will
|
||||||
|
// throw away all future responses upon getting an operation_obsolete(), so computing a
|
||||||
|
// response will probably be a waste of CPU.
|
||||||
|
sequenceData.sendError(operation_obsolete());
|
||||||
|
}
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
if(sequenceData.isSet()) {
|
if(sequenceData.isSet()) {
|
||||||
if(sequenceData.getFuture().get().first != reply.end) {
|
if(sequenceData.getFuture().get().first != reply.end) {
|
||||||
TEST(true); //tlog peek second attempt ended at a different version
|
TEST(true); //tlog peek second attempt ended at a different version
|
||||||
req.reply.sendError(timed_out());
|
req.reply.sendError(operation_obsolete());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -34,6 +34,7 @@ ERROR( success, 0, "Success" )
|
|||||||
ERROR( end_of_stream, 1, "End of stream" )
|
ERROR( end_of_stream, 1, "End of stream" )
|
||||||
ERROR( operation_failed, 1000, "Operation failed")
|
ERROR( operation_failed, 1000, "Operation failed")
|
||||||
ERROR( wrong_shard_server, 1001, "Shard is not available from this server")
|
ERROR( wrong_shard_server, 1001, "Shard is not available from this server")
|
||||||
|
ERROR( operation_obsolete, 1002, "Operation result no longer necessary")
|
||||||
ERROR( timed_out, 1004, "Operation timed out" )
|
ERROR( timed_out, 1004, "Operation timed out" )
|
||||||
ERROR( coordinated_state_conflict, 1005, "Conflict occurred while changing coordination information" )
|
ERROR( coordinated_state_conflict, 1005, "Conflict occurred while changing coordination information" )
|
||||||
ERROR( all_alternatives_failed, 1006, "All alternatives failed" )
|
ERROR( all_alternatives_failed, 1006, "All alternatives failed" )
|
||||||
|
Loading…
x
Reference in New Issue
Block a user