diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index f03f2e5e78..20f337dd5e 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -2452,7 +2452,16 @@ ACTOR Future<Void> localChangeFeedStream(StorageServer* data, } } } catch (Error& e) { - TraceEvent(SevError, "LocalChangeFeedError", data->thisServerID).error(e).detail("CFID", rangeID.printable()); + if (e.code() == error_code_unknown_change_feed) { + TEST(true); // CF was moved away, no more local data to merge with + // Send endVersion so local stream is effectively done. We couldn't have send that already, because that + // would mean the stream would have finished without error + results.send(MutationsAndVersionRef(end, invalidVersion)); + } else { + TraceEvent(SevError, "LocalChangeFeedError", data->thisServerID) + .error(e) + .detail("CFID", rangeID.printable()); + } throw; } } @@ -4975,12 +4984,12 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data, wait(yield()); } } catch (Error& e) { - TraceEvent(SevDebug, "FetchChangeFeedError", data->thisServerID) - .errorUnsuppressed(e) - .detail("RangeID", rangeId.printable()) - .detail("Range", range.toString()) - .detail("EndVersion", endVersion); if (e.code() != error_code_end_of_stream) { + TraceEvent(SevDebug, "FetchChangeFeedError", data->thisServerID) + .errorUnsuppressed(e) + .detail("RangeID", rangeId.printable()) + .detail("Range", range.toString()) + .detail("EndVersion", endVersion); throw; } }