reduced the size of proxy and tlog interfaces

This commit is contained in:
Evan Tschannen 2020-05-01 16:41:20 -07:00
parent 9e5037291d
commit ca92a39f5d
3 changed files with 57 additions and 16 deletions

View File

@ -40,6 +40,7 @@ struct MasterProxyInterface {
Optional<Key> processId;
bool provisional;
Endpoint base;
RequestStream< struct CommitTransactionRequest > commit;
RequestStream< struct GetReadVersionRequest > getConsistentReadVersion; // Returns a version which (1) is committed, and (2) is >= the latest version reported committed (by a commit response) when this request was sent
// (at some point between when this request is sent and when its response is received, the latest version reported committed)
@ -62,17 +63,34 @@ struct MasterProxyInterface {
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, processId, provisional, commit, getConsistentReadVersion, getKeyServersLocations,
waitFailure, getStorageServerRejoinInfo, getRawCommittedVersion,
txnState, getHealthMetrics, proxySnapReq, exclusionSafetyCheckReq);
serializer(ar, processId, provisional, base);
if( Archive::isDeserializing ) {
commit = RequestStream< struct CommitTransactionRequest >( base.getAdjustedEndpoint(0) );
getConsistentReadVersion = RequestStream< struct GetReadVersionRequest >( base.getAdjustedEndpoint(1) );
getKeyServersLocations = RequestStream< struct GetKeyServerLocationsRequest >( base.getAdjustedEndpoint(2) );
getStorageServerRejoinInfo = RequestStream< struct GetStorageServerRejoinInfoRequest >( base.getAdjustedEndpoint(3) );
waitFailure = RequestStream<ReplyPromise<Void>>( base.getAdjustedEndpoint(4) );
getRawCommittedVersion = RequestStream< struct GetRawCommittedVersionRequest >( base.getAdjustedEndpoint(5) );
txnState = RequestStream< struct TxnStateRequest >( base.getAdjustedEndpoint(6) );
getHealthMetrics = RequestStream< struct GetHealthMetricsRequest >( base.getAdjustedEndpoint(7) );
proxySnapReq = RequestStream< struct ProxySnapRequest >( base.getAdjustedEndpoint(8) );
exclusionSafetyCheckReq = RequestStream< struct ExclusionSafetyCheckRequest >( base.getAdjustedEndpoint(9) );
}
}
void initEndpoints() {
getConsistentReadVersion.getEndpoint(TaskPriority::ReadSocket);
getRawCommittedVersion.getEndpoint(TaskPriority::ProxyGetRawCommittedVersion);
commit.getEndpoint(TaskPriority::ReadSocket);
getStorageServerRejoinInfo.getEndpoint(TaskPriority::ProxyStorageRejoin);
getKeyServersLocations.getEndpoint(TaskPriority::ReadSocket); //priority lowered to TaskPriority::DefaultEndpoint on the proxy
std::vector<std::pair<FlowReceiver*, TaskPriority>> streams;
streams.push_back(commit.getReceiver(TaskPriority::ReadSocket));
streams.push_back(getConsistentReadVersion.getReceiver(TaskPriority::ReadSocket));
streams.push_back(getKeyServersLocations.getReceiver(TaskPriority::ReadSocket)); //priority lowered to TaskPriority::DefaultEndpoint on the proxy
streams.push_back(getStorageServerRejoinInfo.getReceiver(TaskPriority::ProxyStorageRejoin));
streams.push_back(waitFailure.getReceiver());
streams.push_back(getRawCommittedVersion.getReceiver(TaskPriority::ProxyGetRawCommittedVersion));
streams.push_back(txnState.getReceiver());
streams.push_back(getHealthMetrics.getReceiver());
streams.push_back(proxySnapReq.getReceiver());
streams.push_back(exclusionSafetyCheckReq.getReceiver());
base = FlowTransport::transport().addEndpoints(streams);
}
};

View File

@ -36,6 +36,8 @@ struct TLogInterface {
LocalityData filteredLocality;
UID uniqueID;
UID sharedTLogID;
Endpoint base;
RequestStream< struct TLogPeekRequest > peekMessages;
RequestStream< struct TLogPopRequest > popMessages;
@ -59,12 +61,21 @@ struct TLogInterface {
bool operator == ( TLogInterface const& r ) const { return id() == r.id(); }
NetworkAddress address() const { return peekMessages.getEndpoint().getPrimaryAddress(); }
Optional<NetworkAddress> secondaryAddress() const { return peekMessages.getEndpoint().addresses.secondaryAddress; }
void initEndpoints() {
getQueuingMetrics.getEndpoint( TaskPriority::TLogQueuingMetrics );
popMessages.getEndpoint( TaskPriority::TLogPop );
peekMessages.getEndpoint( TaskPriority::TLogPeek );
confirmRunning.getEndpoint( TaskPriority::TLogConfirmRunning );
commit.getEndpoint( TaskPriority::TLogCommit );
std::vector<std::pair<FlowReceiver*, TaskPriority>> streams;
streams.push_back(peekMessages.getReceiver(TaskPriority::TLogPeek));
streams.push_back(popMessages.getReceiver(TaskPriority::TLogPop));
streams.push_back(commit.getReceiver(TaskPriority::TLogCommit));
streams.push_back(lock.getReceiver());
streams.push_back(getQueuingMetrics.getReceiver(TaskPriority::TLogQueuingMetrics));
streams.push_back(confirmRunning.getReceiver(TaskPriority::TLogConfirmRunning));
streams.push_back(waitFailure.getReceiver());
streams.push_back(recoveryFinished.getReceiver());
streams.push_back(disablePopRequest.getReceiver());
streams.push_back(enablePopRequest.getReceiver());
streams.push_back(snapRequest.getReceiver());
base = FlowTransport::transport().addEndpoints(streams);
}
template <class Ar>
@ -72,9 +83,20 @@ struct TLogInterface {
if constexpr (!is_fb_function<Ar>) {
ASSERT(ar.isDeserializing || uniqueID != UID());
}
serializer(ar, uniqueID, sharedTLogID, filteredLocality, peekMessages, popMessages
, commit, lock, getQueuingMetrics, confirmRunning, waitFailure, recoveryFinished
, disablePopRequest, enablePopRequest, snapRequest);
serializer(ar, uniqueID, sharedTLogID, filteredLocality, base);
if( Ar::isDeserializing ) {
peekMessages = RequestStream< struct TLogPeekRequest >( base.getAdjustedEndpoint(0) );
popMessages = RequestStream< struct TLogPopRequest >( base.getAdjustedEndpoint(1) );
commit = RequestStream< struct TLogCommitRequest >( base.getAdjustedEndpoint(2) );
lock = RequestStream< ReplyPromise< struct TLogLockResult > >( base.getAdjustedEndpoint(3) );
getQueuingMetrics = RequestStream< struct TLogQueuingMetricsRequest >( base.getAdjustedEndpoint(4) );
confirmRunning = RequestStream< struct TLogConfirmRunningRequest >( base.getAdjustedEndpoint(5) );
waitFailure = RequestStream< ReplyPromise<Void> >( base.getAdjustedEndpoint(6) );
recoveryFinished = RequestStream< struct TLogRecoveryFinishedRequest >( base.getAdjustedEndpoint(7) );
disablePopRequest = RequestStream< struct TLogDisablePopRequest >( base.getAdjustedEndpoint(8) );
enablePopRequest = RequestStream< struct TLogEnablePopRequest >( base.getAdjustedEndpoint(9) );
snapRequest = RequestStream< struct TLogSnapRequest >( base.getAdjustedEndpoint(10) );
}
}
};

View File

@ -1276,6 +1276,7 @@ ACTOR Future<Void> workerServer(
}
when( InitializeMasterProxyRequest req = waitNext(interf.masterProxy.getFuture()) ) {
MasterProxyInterface recruited;
recruited.processId = locality.processId();
recruited.provisional = false;
recruited.initEndpoints();