From 0a03b190da14e0365aefbb94e814f903940d6477 Mon Sep 17 00:00:00 2001
From: Jingyu Zhou <jingyuzhou@gmail.com>
Date: Fri, 15 Apr 2022 09:35:41 -0700
Subject: [PATCH] Fix multiple PeekStream requests to log routers

There is a bug in how a log router handles streaming read:
* Log router has a `logRouterPeekStream` actor A running.
* Remote tlog detects some problem and starts another streaming connection (maybe just reuse the connection?)
* Log router now has a new `logRouterPeekStream` actor B running.
* B runs and found that popped version > reqBegin, so `LogRouterPeekPopped` . This is because A is still running and changed the popped version.
* A ends with `TLogPeekStreamEnd operation_obsolete`
* B become stuck at `wait(req.reply.onReady() && store(reply.rep, future)`, because the future was sent `Never()`.

As a result, the remote tlog can no longer retrieve data from this log router.

Fix by killing the `logRouterPeekStream` B.
---
 fdbserver/LogRouter.actor.cpp | 29 ++++++++++++++++++-----------
 1 file changed, 18 insertions(+), 11 deletions(-)

diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp
index e595dc315f..8684fa5263 100644
--- a/fdbserver/LogRouter.actor.cpp
+++ b/fdbserver/LogRouter.actor.cpp
@@ -18,21 +18,24 @@
  * limitations under the License.
  */
 
-#include "flow/ActorCollection.h"
-#include "fdbclient/NativeAPI.actor.h"
-#include "fdbrpc/Stats.h"
-#include "fdbserver/WorkerInterface.actor.h"
-#include "fdbserver/WaitFailure.h"
-#include "fdbserver/Knobs.h"
-#include "fdbserver/ServerDBInfo.h"
-#include "fdbserver/LogSystem.h"
-#include "fdbclient/SystemData.h"
-#include "fdbserver/ApplyMetadataMutation.h"
-#include "fdbserver/RecoveryState.h"
 #include "fdbclient/Atomic.h"
+#include "fdbclient/NativeAPI.actor.h"
+#include "fdbclient/SystemData.h"
+#include "fdbrpc/Stats.h"
+#include "fdbserver/ApplyMetadataMutation.h"
+#include "fdbserver/Knobs.h"
+#include "fdbserver/LogSystem.h"
+#include "fdbserver/WaitFailure.h"
+#include "fdbserver/WorkerInterface.actor.h"
+#include "fdbserver/RecoveryState.h"
+#include "fdbserver/ServerDBInfo.h"
+#include "fdbserver/TLogInterface.h"
+#include "flow/ActorCollection.h"
 #include "flow/Arena.h"
 #include "flow/Histogram.h"
 #include "flow/TDMetric.actor.h"
+#include "flow/network.h"
+
 #include "flow/actorcompiler.h" // This must be the last #include.
 
 struct LogRouterData {
@@ -526,6 +529,10 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
 		    .detail("Begin", reqBegin)
 		    .detail("Popped", poppedVer)
 		    .detail("Start", self->startVersion);
+		if (std::is_same<PromiseType, Promise<TLogPeekReply>>::value) {
+			// kills logRouterPeekStream actor, otherwise that actor becomes stuck
+			throw operation_obsolete();
+		}
 		replyPromise.send(Never());
 		if (reqSequence.present()) {
 			auto& trackerData = self->peekTracker[peekId];