Reorder call to setting up watch future with waiting for data->version to advance to avoid missing potential mutations. Also add tests for and fix the case where reading the value throws a transaction_too_old error.

This commit is contained in:
A.J. Beamon 2020-06-05 11:24:47 -07:00
parent 2f85ee360a
commit c00e6e7ad9

View File

@ -924,10 +924,11 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
state Version minVersion = data->data().latestVersion;
state Future<Void> watchFuture = data->watches.onChange(req.key);
loop {
try {
state Future<Void> watchFuture = data->watches.onChange(req.key);
state Version latest = data->version.get();
TEST(latest >= minVersion && latest < data->data().latestVersion); // Starting watch loop with latestVersion > data->version
GetValueRequest getReq( req.key, latest, req.debugID );
state Future<Void> getValue = getValueQ( data, getReq ); //we are relying on the delay zero at the top of getValueQ, if removed we need one here
GetValueReply reply = wait( getReq.reply.getFuture() );
@ -937,6 +938,9 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
ASSERT(reply.error.get().code() != error_code_future_version);
throw reply.error.get();
}
if(BUGGIFY) {
throw transaction_too_old();
}
debugMutation("ShardWatchValue", latest, MutationRef(MutationRef::DebugKey, req.key, reply.value.present() ? StringRef( reply.value.get() ) : LiteralStringRef("<null>") ) );
@ -962,8 +966,11 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
// To prevent that, we'll check the key again once the version reaches our minVersion
watchFuture = watchFuture || data->version.whenAtLeast(minVersion);
}
if(BUGGIFY) {
// Simulate a trigger on the watch that results in the loop going around without the value changing
watchFuture = watchFuture || delay(deterministicRandom()->random01());
}
wait(watchFuture);
wait(data->version.whenAtLeast(data->data().latestVersion));
--data->numWatches;
data->watchBytes -= ( req.key.expectedSize() + req.value.expectedSize() + 1000 );
} catch( Error &e ) {
@ -972,9 +979,15 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
throw;
}
} catch( Error &e ) {
if( e.code() != error_code_transaction_too_old )
if( e.code() != error_code_transaction_too_old ) {
throw;
}
TEST(true); // Reading a watched key failed with transaction_too_old
}
watchFuture = data->watches.onChange(req.key);
wait(data->version.whenAtLeast(data->data().latestVersion));
}
} catch (Error& e) {
if(!canReplyWith(e))