Merge pull request #5193 from sfc-gh-etschannen/fix-reply-promise-stream-local-delivery

fix: reply promise streams could receive messages out of order when using local delivery
This commit is contained in:
Evan Tschannen 2021-07-16 11:16:05 -07:00 committed by GitHub
commit e1f27e1cc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 15 additions and 14 deletions

View File

@ -919,11 +919,11 @@ ACTOR static void deliver(TransportData* self,
TaskPriority priority, TaskPriority priority,
ArenaReader reader, ArenaReader reader,
bool inReadSocket) { bool inReadSocket) {
// We want to run the task at the right priority. If the priority // We want to run the task at the right priority. If the priority is higher than the current priority (which is
// is higher than the current priority (which is ReadSocket) we // ReadSocket) we can just upgrade. Otherwise we'll context switch so that we don't block other tasks that might run
// can just upgrade. Otherwise we'll context switch so that we // with a higher priority. ReplyPromiseStream needs to guarentee that messages are recieved in the order they were
// don't block other tasks that might run with a higher priority. // sent, so even in the case of local delivery those messages need to skip this delay.
if (priority < TaskPriority::ReadSocket || !inReadSocket) { if (priority < TaskPriority::ReadSocket || (priority != TaskPriority::NoDeliverDelay && !inReadSocket)) {
wait(delay(0, priority)); wait(delay(0, priority));
} else { } else {
g_network->setCurrentTask(priority); g_network->setCurrentTask(priority);

View File

@ -361,7 +361,7 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
FlowTransport::transport().sendUnreliable( FlowTransport::transport().sendUnreliable(
SerializeSource<ErrorOr<AcknowledgementReply>>( SerializeSource<ErrorOr<AcknowledgementReply>>(
AcknowledgementReply(acknowledgements.bytesAcknowledged)), AcknowledgementReply(acknowledgements.bytesAcknowledged)),
acknowledgements.getEndpoint(TaskPriority::ReadSocket), acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay),
false); false);
} }
} }
@ -378,7 +378,7 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
acknowledgements.bytesAcknowledged += res.expectedSize(); acknowledgements.bytesAcknowledged += res.expectedSize();
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<AcknowledgementReply>>( FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<AcknowledgementReply>>(
AcknowledgementReply(acknowledgements.bytesAcknowledged)), AcknowledgementReply(acknowledgements.bytesAcknowledged)),
acknowledgements.getEndpoint(TaskPriority::ReadSocket), acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay),
false); false);
} }
return res; return res;
@ -389,13 +389,13 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
// Notify the server that a client is not using this ReplyPromiseStream anymore // Notify the server that a client is not using this ReplyPromiseStream anymore
FlowTransport::transport().sendUnreliable( FlowTransport::transport().sendUnreliable(
SerializeSource<ErrorOr<AcknowledgementReply>>(operation_obsolete()), SerializeSource<ErrorOr<AcknowledgementReply>>(operation_obsolete()),
acknowledgements.getEndpoint(TaskPriority::ReadSocket), acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay),
false); false);
} }
if (isRemoteEndpoint() && !sentError && !acknowledgements.failures.isReady()) { if (isRemoteEndpoint() && !sentError && !acknowledgements.failures.isReady()) {
// The ReplyPromiseStream was cancelled before sending an error, so the storage server must have died // The ReplyPromiseStream was cancelled before sending an error, so the storage server must have died
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(broken_promise()), FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(broken_promise()),
getEndpoint(TaskPriority::ReadSocket), getEndpoint(TaskPriority::NoDeliverDelay),
false); false);
} }
} }
@ -406,7 +406,7 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
template <class T> template <class T>
class ReplyPromiseStream { class ReplyPromiseStream {
public: public:
// The endpoints of a ReplyPromiseStream must be initialized at Task::ReadSocket, because with lower priorities a // The endpoints of a ReplyPromiseStream must be initialized at Task::NoDeliverDelay, because a
// delay(0) in FlowTransport deliver can cause out of order delivery. // delay(0) in FlowTransport deliver can cause out of order delivery.
// stream.send( request ) // stream.send( request )
@ -416,7 +416,7 @@ public:
void send(U&& value) const { void send(U&& value) const {
if (queue->isRemoteEndpoint()) { if (queue->isRemoteEndpoint()) {
if (!queue->acknowledgements.getRawEndpoint().isValid()) { if (!queue->acknowledgements.getRawEndpoint().isValid()) {
value.acknowledgeToken = queue->acknowledgements.getEndpoint(TaskPriority::ReadSocket).token; value.acknowledgeToken = queue->acknowledgements.getEndpoint(TaskPriority::NoDeliverDelay).token;
} }
queue->acknowledgements.bytesSent += value.expectedSize(); queue->acknowledgements.bytesSent += value.expectedSize();
FlowTransport::transport().sendUnreliable( FlowTransport::transport().sendUnreliable(
@ -477,7 +477,7 @@ public:
errors->delPromiseRef(); errors->delPromiseRef();
} }
const Endpoint& getEndpoint() const { return queue->getEndpoint(TaskPriority::ReadSocket); } const Endpoint& getEndpoint() const { return queue->getEndpoint(TaskPriority::NoDeliverDelay); }
bool operator==(const ReplyPromiseStream<T>& rhs) const { return queue == rhs.queue; } bool operator==(const ReplyPromiseStream<T>& rhs) const { return queue == rhs.queue; }
bool isEmpty() const { return !queue->isReady(); } bool isEmpty() const { return !queue->isReady(); }

View File

@ -3891,7 +3891,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
if (ver != invalidVersion && ver > data->version.get()) { if (ver != invalidVersion && ver > data->version.get()) {
// TODO(alexmiller): Update to version tracking. // TODO(alexmiller): Update to version tracking.
DEBUG_KEY_RANGE("SSUpdate", ver, KeyRangeRef()); // DEBUG_KEY_RANGE("SSUpdate", ver, KeyRangeRef());
data->mutableData().createNewVersion(ver); data->mutableData().createNewVersion(ver);
if (data->otherError.getFuture().isReady()) if (data->otherError.getFuture().isReady())
@ -4179,7 +4179,7 @@ bool StorageServerDisk::makeVersionMutationsDurable(Version& prevStorageVersion,
VerUpdateRef const& v = u->second; VerUpdateRef const& v = u->second;
ASSERT(v.version > prevStorageVersion && v.version <= newStorageVersion); ASSERT(v.version > prevStorageVersion && v.version <= newStorageVersion);
// TODO(alexmiller): Update to version tracking. // TODO(alexmiller): Update to version tracking.
DEBUG_KEY_RANGE("makeVersionMutationsDurable", v.version, KeyRangeRef()); // DEBUG_KEY_RANGE("makeVersionMutationsDurable", v.version, KeyRangeRef());
writeMutations(v.mutations, v.version, "makeVersionDurable"); writeMutations(v.mutations, v.version, "makeVersionDurable");
for (const auto& m : v.mutations) for (const auto& m : v.mutations)
bytesLeft -= mvccStorageBytes(m); bytesLeft -= mvccStorageBytes(m);

View File

@ -45,6 +45,7 @@ enum class TaskPriority {
WriteSocket = 10000, WriteSocket = 10000,
PollEIO = 9900, PollEIO = 9900,
DiskIOComplete = 9150, DiskIOComplete = 9150,
NoDeliverDelay = 9100,
LoadBalancedEndpoint = 9000, LoadBalancedEndpoint = 9000,
ReadSocket = 9000, ReadSocket = 9000,
AcceptSocket = 8950, AcceptSocket = 8950,