working implementation

This commit is contained in:
Evan Tschannen 2020-04-12 22:18:51 -07:00
parent 0c2e8b9462
commit ff5543b579
4 changed files with 47 additions and 26 deletions

View File

@ -51,6 +51,7 @@ class EndpointMap : NonCopyable {
public:
EndpointMap();
void insert( NetworkMessageReceiver* r, Endpoint::Token& token, TaskPriority priority );
const Endpoint& insert( NetworkAddressList localAddresses, std::vector<std::pair<FlowReceiver*, TaskPriority>> const& streams );
NetworkMessageReceiver* get( Endpoint::Token const& token );
TaskPriority getPriority( Endpoint::Token const& token );
void remove( Endpoint::Token const& token, NetworkMessageReceiver* r );
@ -95,6 +96,19 @@ void EndpointMap::insert( NetworkMessageReceiver* r, Endpoint::Token& token, Tas
data[index].receiver = r;
}
const Endpoint& EndpointMap::insert( NetworkAddressList localAddresses, std::vector<std::pair<FlowReceiver*, TaskPriority>> const& streams ) {
UID base = deterministicRandom()->randomUniqueID();
int oldSize = data.size();
data.resize( oldSize+streams.size() );
for(int i=0; i<streams.size(); i++) {
int index = oldSize+i;
streams[i].first->setEndpoint( Endpoint( localAddresses, UID( base.first() | TOKEN_STREAM_FLAG, (base.second()&0xffffffff00000000LL) | index) ) );
data[index].token() = Endpoint::Token( base.first() | TOKEN_STREAM_FLAG, (base.second()&0xffffffff00000000LL) | static_cast<uint32_t>(streams[i].second) );
data[index].receiver = (NetworkMessageReceiver*) streams[i].first;
}
return streams[0].first->getEndpoint(TaskPriority::DefaultEndpoint);
}
NetworkMessageReceiver* EndpointMap::get( Endpoint::Token const& token ) {
uint32_t index = token.second();
if ( index < data.size() && data[index].token().first() == token.first() && ((data[index].token().second()&0xffffffff00000000LL)|index)==token.second() )
@ -1146,10 +1160,8 @@ void FlowTransport::removePeerReference(const Endpoint& endpoint, bool isStream)
}
}
void FlowTransport::addEndpoint( Endpoint& endpoint, NetworkMessageReceiver* receiver, TaskPriority taskID, bool randomizeEndpoint ) {
if(randomizeEndpoint) {
endpoint.token = deterministicRandom()->randomUniqueID();
}
void FlowTransport::addEndpoint( Endpoint& endpoint, NetworkMessageReceiver* receiver, TaskPriority taskID ) {
endpoint.token = deterministicRandom()->randomUniqueID();
if (receiver->isStream()) {
endpoint.addresses = self->localAddresses;
endpoint.token = UID( endpoint.token.first() | TOKEN_STREAM_FLAG, endpoint.token.second() );
@ -1160,6 +1172,10 @@ void FlowTransport::addEndpoint( Endpoint& endpoint, NetworkMessageReceiver* rec
self->endpoints.insert( receiver, endpoint.token, taskID );
}
const Endpoint& FlowTransport::addEndpoints( std::vector<std::pair<FlowReceiver*, TaskPriority>> const& streams ) {
return self->endpoints.insert( self->localAddresses, streams );
}
void FlowTransport::removeEndpoint( const Endpoint& endpoint, NetworkMessageReceiver* receiver ) {
self->endpoints.remove(endpoint.token, receiver);
}

View File

@ -65,7 +65,9 @@ public:
}
Endpoint getAdjustedEndpoint( uint32_t index ) {
return Endpoint( addresses, UID(token.first(), (token.second()&0xffffffff00000000LL) | index) );
uint32_t newIndex = token.second();
newIndex += index;
return Endpoint( addresses, UID(token.first(), (token.second()&0xffffffff00000000LL) | newIndex) );
}
bool operator == (Endpoint const& r) const {
@ -183,9 +185,11 @@ public:
void removePeerReference(const Endpoint&, bool isStream);
// Signal that a peer connection is no longer being used
void addEndpoint( Endpoint& endpoint, NetworkMessageReceiver*, TaskPriority taskID, bool randomizeEndpoint );
void addEndpoint( Endpoint& endpoint, NetworkMessageReceiver*, TaskPriority taskID );
// Sets endpoint to be a new local endpoint which delivers messages to the given receiver
const Endpoint& addEndpoints( std::vector<std::pair<struct FlowReceiver*, TaskPriority>> const& streams );
void removeEndpoint( const Endpoint&, NetworkMessageReceiver* );
// The given local endpoint no longer delivers messages to the given receiver or uses resources

View File

@ -28,7 +28,7 @@
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/networksender.actor.h"
struct FlowReceiver : private NetworkMessageReceiver {
struct FlowReceiver : public NetworkMessageReceiver {
// Common endpoint code for NetSAV<> and NetNotifiedQueue<>
FlowReceiver() : m_isLocalEndpoint(false), m_stream(false) {
@ -55,17 +55,15 @@ struct FlowReceiver : private NetworkMessageReceiver {
const Endpoint& getEndpoint(TaskPriority taskID) {
if (!endpoint.isValid()) {
m_isLocalEndpoint = true;
FlowTransport::transport().addEndpoint(endpoint, this, taskID, true);
FlowTransport::transport().addEndpoint(endpoint, this, taskID);
}
return endpoint;
}
const Endpoint& initEndpoint(Endpoint base, TaskPriority taskID) {
void setEndpoint(Endpoint const& e) {
ASSERT(!endpoint.isValid());
m_isLocalEndpoint = true;
endpoint = base;
FlowTransport::transport().addEndpoint(endpoint, this, taskID, false);
return endpoint;
endpoint = e;
}
void makeWellKnownEndpoint(Endpoint::Token token, TaskPriority taskID) {
@ -382,8 +380,6 @@ public:
//queue = (NetNotifiedQueue<T>*)0xdeadbeef;
}
Endpoint initEndpoint(Endpoint base, uint32_t taskID = TaskPriority::DefaultEndpoint) { return queue->initEndpoint(base, taskID); }
Endpoint getEndpoint(TaskPriority taskID = TaskPriority::DefaultEndpoint) const { return queue->getEndpoint(taskID); }
void makeWellKnownEndpoint(Endpoint::Token token, TaskPriority taskID) {
queue->makeWellKnownEndpoint(token, taskID);
@ -393,6 +389,10 @@ public:
bool isEmpty() const { return !queue->isReady(); }
uint32_t size() const { return queue->size(); }
std::pair<FlowReceiver*, TaskPriority> getReceiver( TaskPriority taskID = TaskPriority::DefaultEndpoint ) {
return std::make_pair((FlowReceiver*)queue, taskID);
}
private:
NetNotifiedQueue<T>* queue;
};

View File

@ -50,22 +50,23 @@ struct MasterInterface {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, locality, base);
if( Ar::isDeserializing ) {
waitFailure = RequestStream< ReplyPromise<Void> >( base.getAdjustedEndpoint(1) );
tlogRejoin = RequestStream< struct TLogRejoinRequest >( base.getAdjustedEndpoint(2) );
changeCoordinators = RequestStream< struct ChangeCoordinatorsRequest >( base.getAdjustedEndpoint(3) );
getCommitVersion = RequestStream< struct GetCommitVersionRequest >( base.getAdjustedEndpoint(4) );
notifyBackupWorkerDone = RequestStream<struct BackupWorkerDoneRequest>( base.getAdjustedEndpoint(5) );
if( Archive::isDeserializing ) {
waitFailure = RequestStream< ReplyPromise<Void> >( base.getAdjustedEndpoint(0) );
tlogRejoin = RequestStream< struct TLogRejoinRequest >( base.getAdjustedEndpoint(1) );
changeCoordinators = RequestStream< struct ChangeCoordinatorsRequest >( base.getAdjustedEndpoint(2) );
getCommitVersion = RequestStream< struct GetCommitVersionRequest >( base.getAdjustedEndpoint(3) );
notifyBackupWorkerDone = RequestStream<struct BackupWorkerDoneRequest>( base.getAdjustedEndpoint(4) );
}
}
void initEndpoints() {
base = Endpoint( g_network->getLocalAddresses(), deterministicRandom()->randomUniqueID() );
waitFailure.initEndpoint( base.getAdjustedEndpoint(1) );
tlogRejoin.initEndpoint( base.getAdjustedEndpoint(2), TaskPriority::MasterTLogRejoin );
changeCoordinators.initEndpoint( base.getAdjustedEndpoint(3) );
getCommitVersion.initEndpoint( base.getAdjustedEndpoint(4), TaskPriority::GetConsistentReadVersion );
notifyBackupWorkerDone.initEndpoint( base.getAdjustedEndpoint(5) );
std::vector<std::pair<FlowReceiver*, TaskPriority>> streams;
streams.push_back(waitFailure.getReceiver());
streams.push_back(tlogRejoin.getReceiver(TaskPriority::MasterTLogRejoin));
streams.push_back(changeCoordinators.getReceiver());
streams.push_back(getCommitVersion.getReceiver(TaskPriority::GetConsistentReadVersion));
streams.push_back(notifyBackupWorkerDone.getReceiver());
base = FlowTransport::transport().addEndpoints(streams);
}
};