Watches could return future_version errors unnecessarily

This commit is contained in:
A.J. Beamon 2020-06-04 17:18:25 -07:00
parent 4f24dd7bd0
commit 2f85ee360a

View File

@ -923,16 +923,18 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
if( req.debugID.present() )
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
state Version minVersion = data->data().latestVersion;
loop {
try {
state Version latest = data->data().latestVersion;
state Future<Void> watchFuture = data->watches.onChange(req.key);
state Version latest = data->version.get();
GetValueRequest getReq( req.key, latest, req.debugID );
state Future<Void> getValue = getValueQ( data, getReq ); //we are relying on the delay zero at the top of getValueQ, if removed we need one here
GetValueReply reply = wait( getReq.reply.getFuture() );
//TraceEvent("WatcherCheckValue").detail("Key", req.key ).detail("Value", req.value ).detail("CurrentValue", v ).detail("Ver", latest);
if(reply.error.present()) {
ASSERT(reply.error.get().code() != error_code_future_version);
throw reply.error.get();
}
@ -955,7 +957,13 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
++data->numWatches;
data->watchBytes += ( req.key.expectedSize() + req.value.expectedSize() + 1000 );
try {
wait( watchFuture );
if(latest < minVersion) {
// If the version we read is less than minVersion, then we may fail to be notified of any changes that occur up to or including minVersion
// To prevent that, we'll check the key again once the version reaches our minVersion
watchFuture = watchFuture || data->version.whenAtLeast(minVersion);
}
wait(watchFuture);
wait(data->version.whenAtLeast(data->data().latestVersion));
--data->numWatches;
data->watchBytes -= ( req.key.expectedSize() + req.value.expectedSize() + 1000 );
} catch( Error &e ) {