completely ignore messages to unknown endpoint

This commit is contained in:
Markus Pilman 2021-03-11 12:34:07 -07:00
parent 335df9b052
commit 4bee976b37
3 changed files with 18 additions and 8 deletions

View File

@ -166,8 +166,13 @@ NetworkMessageReceiver* EndpointMap::get(Endpoint::Token const& token) {
TaskPriority EndpointMap::getPriority(Endpoint::Token const& token) { TaskPriority EndpointMap::getPriority(Endpoint::Token const& token) {
uint32_t index = token.second(); uint32_t index = token.second();
if (index < data.size() && data[index].token().first() == token.first() && if (index < data.size() && data[index].token().first() == token.first() &&
((data[index].token().second() & 0xffffffff00000000LL) | index) == token.second()) ((data[index].token().second() & 0xffffffff00000000LL) | index) == token.second()) {
auto res = static_cast<TaskPriority>(data[index].token().second());
// we don't allow this priority to be "misused" for other stuff as we won't even
// attempt to find an endpoint if UnknownEndpoint is returned here
ASSERT(res != TaskPriority::UnknownEndpoint);
return static_cast<TaskPriority>(data[index].token().second()); return static_cast<TaskPriority>(data[index].token().second());
}
return TaskPriority::UnknownEndpoint; return TaskPriority::UnknownEndpoint;
} }
@ -894,6 +899,12 @@ static bool checkCompatible(const PeerCompatibilityPolicy& policy, ProtocolVersi
ACTOR static void deliver(TransportData* self, Endpoint destination, ArenaReader reader, bool inReadSocket) { ACTOR static void deliver(TransportData* self, Endpoint destination, ArenaReader reader, bool inReadSocket) {
TaskPriority priority = self->endpoints.getPriority(destination.token); TaskPriority priority = self->endpoints.getPriority(destination.token);
if (priority == TaskPriority::UnknownEndpoint && (destination.token.first() & TOKEN_STREAM_FLAG) == 0) {
// we ignore packets to unknown endpoints if they're not going to a stream anyways, so we can just
// return here. The main place where this seems to happen is if a ReplyPromise is not waited on
// long enough.
return;
}
if (priority < TaskPriority::ReadSocket || !inReadSocket) { if (priority < TaskPriority::ReadSocket || !inReadSocket) {
wait(delay(0, priority)); wait(delay(0, priority));
} else { } else {
@ -940,9 +951,6 @@ ACTOR static void deliver(TransportData* self, Endpoint destination, ArenaReader
} }
} }
} }
if (inReadSocket)
g_network->setCurrentTask(TaskPriority::ReadSocket);
} }
static void scanPackets(TransportData* transport, static void scanPackets(TransportData* transport,

View File

@ -52,6 +52,7 @@ struct FlowReceiver : public NetworkMessageReceiver {
// If already a remote endpoint, returns that. Otherwise makes this // If already a remote endpoint, returns that. Otherwise makes this
// a local endpoint and returns that. // a local endpoint and returns that.
const Endpoint& getEndpoint(TaskPriority taskID) { const Endpoint& getEndpoint(TaskPriority taskID) {
ASSERT(taskID != TaskPriority::UnknownEndpoint);
if (!endpoint.isValid()) { if (!endpoint.isValid()) {
m_isLocalEndpoint = true; m_isLocalEndpoint = true;
FlowTransport::transport().addEndpoint(endpoint, this, taskID); FlowTransport::transport().addEndpoint(endpoint, this, taskID);

View File

@ -1477,10 +1477,6 @@ void Net2::run() {
while (!ready.empty()) { while (!ready.empty()) {
++countTasks; ++countTasks;
currentTaskID = ready.top().taskID; currentTaskID = ready.top().taskID;
if (currentTaskID < minTaskID) {
trackAtPriority(currentTaskID, taskBegin);
minTaskID = currentTaskID;
}
priorityMetric = static_cast<int64_t>(currentTaskID); priorityMetric = static_cast<int64_t>(currentTaskID);
Task* task = ready.top().task; Task* task = ready.top().task;
ready.pop(); ready.pop();
@ -1493,6 +1489,11 @@ void Net2::run() {
TraceEvent(SevError, "TaskError").error(unknown_error()); TraceEvent(SevError, "TaskError").error(unknown_error());
} }
if (currentTaskID < minTaskID) {
trackAtPriority(currentTaskID, taskBegin);
minTaskID = currentTaskID;
}
double tscNow = timestampCounter(); double tscNow = timestampCounter();
double newTaskBegin = timer_monotonic(); double newTaskBegin = timer_monotonic();
if (check_yield(TaskPriority::Max, tscNow)) { if (check_yield(TaskPriority::Max, tscNow)) {