partial refactor to support multiple threads per client library

This commit is contained in:
Russell Sears 2021-02-03 19:58:37 +00:00
parent b0f8784bf1
commit 87aeafb294
2 changed files with 67 additions and 35 deletions

View File

@ -685,10 +685,10 @@ void MultiVersionTransaction::reset() {
}
// MultiVersionDatabase
MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi *api, std::string clusterFilePath, Reference<IDatabase> db, bool openConnectors) : dbState(new DatabaseState()) {
MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi *api, int threadIdx, std::string clusterFilePath, Reference<IDatabase> db, bool openConnectors) : dbState(new DatabaseState()), threadIdx(threadIdx) {
dbState->db = db;
dbState->dbVar->set(db);
if(!openConnectors) {
dbState->currentClientIndex = 0;
}
@ -701,7 +701,7 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi *api, std::string clu
dbState->currentClientIndex = -1;
}
api->runOnExternalClients([this, clusterFilePath](Reference<ClientInfo> client) {
api->runOnExternalClients(threadIdx, [this, clusterFilePath](Reference<ClientInfo> client) {
dbState->addConnection(client, clusterFilePath);
});
@ -714,7 +714,7 @@ MultiVersionDatabase::~MultiVersionDatabase() {
}
Reference<IDatabase> MultiVersionDatabase::debugCreateFromExistingDatabase(Reference<IDatabase> db) {
return Reference<IDatabase>(new MultiVersionDatabase(MultiVersionApi::api, "", db, false));
return Reference<IDatabase>(new MultiVersionDatabase(MultiVersionApi::api, 0, "", db, false)); // XXX better choice of thread to run this database on?
}
Reference<ITransaction> MultiVersionDatabase::createTransaction() {
@ -908,26 +908,34 @@ bool MultiVersionApi::apiVersionAtLeast(int minVersion) {
return MultiVersionApi::api->apiVersion >= minVersion || MultiVersionApi::api->apiVersion < 0;
}
void MultiVersionApi::runOnExternalClientsAllThreads(std::function<void(Reference<ClientInfo>)> func, bool runOnFailedClients) {
for (int i = 0; i < tidCount; i++) {
runOnExternalClients(i, func, runOnFailedClients);
}
}
// runOnFailedClients should be used cautiously. Some failed clients may not have successfully loaded all symbols.
void MultiVersionApi::runOnExternalClients(std::function<void(Reference<ClientInfo>)> func, bool runOnFailedClients) {
void MultiVersionApi::runOnExternalClients(int threadIdx, std::function<void(Reference<ClientInfo>)> func, bool runOnFailedClients) {
bool newFailure = false;
auto c = externalClients.begin();
while(c != externalClients.end()) {
auto client = c->second[threadIdx];
try {
if(!c->second->failed || runOnFailedClients) { // TODO: Should we ignore some failures?
func(c->second);
if(!client->failed || runOnFailedClients) { // TODO: Should we ignore some failures?
func(client);
}
}
catch(Error &e) {
if(e.code() == error_code_external_client_already_loaded) {
TraceEvent(SevInfo, "ExternalClientAlreadyLoaded").error(e).detail("LibPath", c->second->libPath);
// XXX report libPath that was configured; not the temp file name
TraceEvent(SevInfo, "ExternalClientAlreadyLoaded").error(e).detail("LibPath", client->libPath);
c = externalClients.erase(c);
continue;
}
else {
TraceEvent(SevWarnAlways, "ExternalClientFailure").error(e).detail("LibPath", c->second->libPath);
c->second->failed = true;
TraceEvent(SevWarnAlways, "ExternalClientFailure").error(e).detail("LibPath", client->libPath);
client->failed = true;
newFailure = true;
}
}
@ -1003,9 +1011,11 @@ void MultiVersionApi::addExternalLibrary(std::string path) {
throw invalid_option(); // SOMEDAY: it might be good to allow clients to be added after the network is setup
}
if(externalClients.count(filename) == 0) {
// xxx external client init info
if(externalClientDescriptions.count(filename) == 0) {
TraceEvent("AddingExternalClient").detail("LibraryPath", filename);
externalClients[filename] = Reference<ClientInfo>(new ClientInfo(new DLApi(path), path));
externalClientDescriptions.emplace(std::make_pair(filename, ClientDesc(path, true))); //[filename] = { path, true };
}
}
@ -1020,9 +1030,10 @@ void MultiVersionApi::addExternalLibraryDirectory(std::string path) {
for(auto filename : files) {
std::string lib = abspath(joinPath(path, filename));
if(externalClients.count(filename) == 0) {
// xxx external client init info.
if(externalClientDescriptions.count(filename) == 0) {
TraceEvent("AddingExternalClient").detail("LibraryPath", filename);
externalClients[filename] = Reference<ClientInfo>(new ClientInfo(new DLApi(lib), lib));
externalClientDescriptions.emplace(std::make_pair(filename, ClientDesc(lib, true))); //Reference<ClientInfo>(new ClientDesc(new DLApi(lib), lib));
}
}
}
@ -1040,13 +1051,13 @@ void MultiVersionApi::setSupportedClientVersions(Standalone<StringRef> versions)
MutexHolder holder(lock);
ASSERT(networkSetup);
// This option must be set on the main thread because it modifes structures that can be used concurrently by the main thread
// This option must be set on the main thread because it modifies structures that can be used concurrently by the main thread
onMainThreadVoid([this, versions](){
localClient->api->setNetworkOption(FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS, versions);
}, NULL);
if(!bypassMultiClientApi) {
runOnExternalClients([versions](Reference<ClientInfo> client) {
runOnExternalClientsAllThreads([versions](Reference<ClientInfo> client) {
client->api->setNetworkOption(FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS, versions);
});
}
@ -1106,7 +1117,7 @@ void MultiVersionApi::setNetworkOptionInternal(FDBNetworkOptions::Option option,
if(!bypassMultiClientApi) {
if(networkSetup) {
runOnExternalClients(
runOnExternalClientsAllThreads(
[option, value](Reference<ClientInfo> client) { client->api->setNetworkOption(option, value); });
}
else {
@ -1145,14 +1156,14 @@ void MultiVersionApi::setupNetwork() {
localClient->loadProtocolVersion();
if(!bypassMultiClientApi) {
runOnExternalClients([this](Reference<ClientInfo> client) {
runOnExternalClientsAllThreads([this](Reference<ClientInfo> client) {
TraceEvent("InitializingExternalClient").detail("LibraryPath", client->libPath);
client->api->selectApiVersion(apiVersion);
client->loadProtocolVersion();
});
MutexHolder holder(lock);
runOnExternalClients([this, transportId](Reference<ClientInfo> client) {
runOnExternalClientsAllThreads([this, transportId](Reference<ClientInfo> client) {
for(auto option : options) {
client->api->setNetworkOption(option.first, option.second.castTo<StringRef>());
}
@ -1193,7 +1204,7 @@ void MultiVersionApi::runNetwork() {
std::vector<THREAD_HANDLE> handles;
if(!bypassMultiClientApi) {
runOnExternalClients([&handles](Reference<ClientInfo> client) {
runOnExternalClientsAllThreads([&handles](Reference<ClientInfo> client) {
if(client->external) {
handles.push_back(g_network->startThread(&runNetworkThread, client.getPtr()));
}
@ -1218,7 +1229,7 @@ void MultiVersionApi::stopNetwork() {
localClient->api->stopNetwork();
if(!bypassMultiClientApi) {
runOnExternalClients([](Reference<ClientInfo> client) {
runOnExternalClientsAllThreads([](Reference<ClientInfo> client) {
client->api->stopNetwork();
}, true);
}
@ -1235,7 +1246,7 @@ void MultiVersionApi::addNetworkThreadCompletionHook(void (*hook)(void*), void *
localClient->api->addNetworkThreadCompletionHook(hook, hookParameter);
if(!bypassMultiClientApi) {
runOnExternalClients([hook, hookParameter](Reference<ClientInfo> client) {
runOnExternalClientsAllThreads([hook, hookParameter](Reference<ClientInfo> client) {
client->api->addNetworkThreadCompletionHook(hook, hookParameter);
});
}
@ -1247,12 +1258,17 @@ Reference<IDatabase> MultiVersionApi::createDatabase(const char *clusterFilePath
lock.leave();
throw network_not_setup();
}
int threadIdx = nextTid;
nextTid = (nextTid + 1) % tidCount;
lock.leave();
std::string clusterFile(clusterFilePath);
if(localClientDisabled) {
return Reference<IDatabase>(new MultiVersionDatabase(this, clusterFile, Reference<IDatabase>()));
return Reference<IDatabase>(new MultiVersionDatabase(this, threadIdx, clusterFile, Reference<IDatabase>()));
}
ASSERT(tidCount == 1); // threadCount must be one if local client is enabled.
auto db = localClient->api->createDatabase(clusterFilePath);
if(bypassMultiClientApi) {
@ -1260,9 +1276,9 @@ Reference<IDatabase> MultiVersionApi::createDatabase(const char *clusterFilePath
}
else {
for(auto it : externalClients) {
TraceEvent("CreatingDatabaseOnExternalClient").detail("LibraryPath", it.second->libPath).detail("Failed", it.second->failed);
TraceEvent("CreatingDatabaseOnExternalClient").detail("LibraryPath", it.first/*[threadIdx]->libPath*/).detail("Failed", it.second[threadIdx]->failed);
}
return Reference<IDatabase>(new MultiVersionDatabase(this, clusterFile, db));
return Reference<IDatabase>(new MultiVersionDatabase(this, threadIdx, clusterFile, db));
}
}
@ -1270,7 +1286,9 @@ void MultiVersionApi::updateSupportedVersions() {
if(networkSetup) {
Standalone<VectorRef<uint8_t>> versionStr;
runOnExternalClients([&versionStr](Reference<ClientInfo> client){
// not mutating the client, so just call on one instance of each client version.
// thread 0 always exists.
runOnExternalClients(0, [&versionStr](Reference<ClientInfo> client){
const char *ver = client->api->getClientVersion();
versionStr.append(versionStr.arena(), (uint8_t*)ver, (int)strlen(ver));
versionStr.append(versionStr.arena(), (uint8_t*)";", 1);

View File

@ -278,17 +278,24 @@ private:
std::vector<std::pair<FDBTransactionOptions::Option, Optional<Standalone<StringRef>>>> persistentOptions;
};
struct ClientInfo : ThreadSafeReferenceCounted<ClientInfo> {
struct ClientDesc {
std::string const libPath;
bool const external;
ClientDesc(std::string libPath, bool external) : libPath(libPath), external(external) { }
};
struct ClientInfo : ClientDesc, ThreadSafeReferenceCounted<ClientInfo> {
ProtocolVersion protocolVersion;
IClientApi *api;
std::string libPath;
bool external;
// std::string libPath;
// bool external;
bool failed;
std::vector<std::pair<void (*)(void*), void*>> threadCompletionHooks;
ClientInfo() : protocolVersion(0), api(NULL), external(false), failed(true) {}
ClientInfo(IClientApi *api) : protocolVersion(0), api(api), libPath("internal"), external(false), failed(false) {}
ClientInfo(IClientApi *api, std::string libPath) : protocolVersion(0), api(api), libPath(libPath), external(true), failed(false) {}
ClientInfo() : ClientDesc(std::string(), false), protocolVersion(0), api(NULL), failed(true) {}
ClientInfo(IClientApi *api) : ClientDesc("internal", false), protocolVersion(0), api(api), failed(false) {}
ClientInfo(IClientApi *api, std::string libPath) : ClientDesc(libPath, true), protocolVersion(0), api(api), failed(false) {}
void loadProtocolVersion();
bool canReplace(Reference<ClientInfo> other) const;
@ -298,7 +305,7 @@ class MultiVersionApi;
class MultiVersionDatabase : public IDatabase, ThreadSafeReferenceCounted<MultiVersionDatabase> {
public:
MultiVersionDatabase(MultiVersionApi *api, std::string clusterFilePath, Reference<IDatabase> db, bool openConnectors=true);
MultiVersionDatabase(MultiVersionApi *api, int threadIdx, std::string clusterFilePath, Reference<IDatabase> db, bool openConnectors=true);
~MultiVersionDatabase();
Reference<ITransaction> createTransaction() override;
@ -361,6 +368,7 @@ private:
};
const Reference<DatabaseState> dbState;
const int threadIdx;
friend class MultiVersionTransaction;
};
@ -379,7 +387,8 @@ public:
static MultiVersionApi* api;
Reference<ClientInfo> getLocalClient();
void runOnExternalClients(std::function<void(Reference<ClientInfo>)>, bool runOnFailedClients=false);
void runOnExternalClients(int threadId, std::function<void(Reference<ClientInfo>)>, bool runOnFailedClients=false);
void runOnExternalClientsAllThreads(std::function<void(Reference<ClientInfo>)>, bool runOnFailedClients=false);
void updateSupportedVersions();
@ -403,7 +412,8 @@ private:
void setNetworkOptionInternal(FDBNetworkOptions::Option option, Optional<StringRef> value);
Reference<ClientInfo> localClient;
std::map<std::string, Reference<ClientInfo>> externalClients;
std::map<std::string, ClientDesc> externalClientDescriptions;
std::map<std::string, std::vector<Reference<ClientInfo>>> externalClients;
bool networkStartSetup;
volatile bool networkSetup;
@ -411,6 +421,10 @@ private:
volatile bool externalClient;
int apiVersion;
// XXX fixme
int nextTid = 0;
const int tidCount = 1;
Mutex lock;
std::vector<std::pair<FDBNetworkOptions::Option, Optional<Standalone<StringRef>>>> options;
std::map<FDBNetworkOptions::Option, std::set<Standalone<StringRef>>> setEnvOptions;