Fix multiple PeekStream requests to log routers

There is a bug in how a log router handles streaming read:
* Log router has a `logRouterPeekStream` actor A running.
* Remote tlog detects some problem and starts another streaming connection (maybe just reuse the connection?)
* Log router now has a new `logRouterPeekStream` actor B running.
* B runs and found that popped version > reqBegin, so `LogRouterPeekPopped` . This is because A is still running and changed the popped version.
* A ends with `TLogPeekStreamEnd operation_obsolete`
* B become stuck at `wait(req.reply.onReady() && store(reply.rep, future)`, because the future was sent `Never()`.

As a result, the remote tlog can no longer retrieve data from this log router.

Fix by killing the `logRouterPeekStream` B.
This commit is contained in:
Jingyu Zhou 2022-04-15 09:35:41 -07:00
parent eb436ec14b
commit 0a03b190da

View File

@ -18,21 +18,24 @@
* limitations under the License. * limitations under the License.
*/ */
#include "flow/ActorCollection.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/LogSystem.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/ApplyMetadataMutation.h"
#include "fdbserver/RecoveryState.h"
#include "fdbclient/Atomic.h" #include "fdbclient/Atomic.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/ApplyMetadataMutation.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/TLogInterface.h"
#include "flow/ActorCollection.h"
#include "flow/Arena.h" #include "flow/Arena.h"
#include "flow/Histogram.h" #include "flow/Histogram.h"
#include "flow/TDMetric.actor.h" #include "flow/TDMetric.actor.h"
#include "flow/network.h"
#include "flow/actorcompiler.h" // This must be the last #include. #include "flow/actorcompiler.h" // This must be the last #include.
struct LogRouterData { struct LogRouterData {
@ -526,6 +529,10 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
.detail("Begin", reqBegin) .detail("Begin", reqBegin)
.detail("Popped", poppedVer) .detail("Popped", poppedVer)
.detail("Start", self->startVersion); .detail("Start", self->startVersion);
if (std::is_same<PromiseType, Promise<TLogPeekReply>>::value) {
// kills logRouterPeekStream actor, otherwise that actor becomes stuck
throw operation_obsolete();
}
replyPromise.send(Never()); replyPromise.send(Never());
if (reqSequence.present()) { if (reqSequence.present()) {
auto& trackerData = self->peekTracker[peekId]; auto& trackerData = self->peekTracker[peekId];