mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-25 17:00:05 +08:00
Blob connection provider test (#8478)
* Refactoring test blob metadata creation * Implementing BlobConnectionProviderTest * createRandomTestBlobMetadata supports blobstore and works outside simulation
This commit is contained in:
parent
c6adb3a98c
commit
4d3553481f
109
fdbclient/BlobMetadataUtils.cpp
Normal file
109
fdbclient/BlobMetadataUtils.cpp
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
/*
|
||||||
|
* BlobMetadataUtils.cpp
|
||||||
|
*
|
||||||
|
* This source file is part of the FoundationDB open source project
|
||||||
|
*
|
||||||
|
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "fdbclient/BlobMetadataUtils.h"
|
||||||
|
|
||||||
|
#include "fmt/format.h"
|
||||||
|
#include "flow/IRandom.h"
|
||||||
|
#include "flow/flow.h"
|
||||||
|
#include "fdbclient/Knobs.h"
|
||||||
|
#include "fdbclient/S3BlobStore.h"
|
||||||
|
|
||||||
|
std::string buildPartitionPath(const std::string& url, const std::string& partition) {
|
||||||
|
ASSERT(!partition.empty());
|
||||||
|
ASSERT(partition.front() != '/');
|
||||||
|
ASSERT(partition.back() == '/');
|
||||||
|
StringRef u(url);
|
||||||
|
if (u.startsWith("file://"_sr)) {
|
||||||
|
ASSERT(u.endsWith("/"_sr));
|
||||||
|
return url + partition;
|
||||||
|
} else if (u.startsWith("blobstore://"_sr)) {
|
||||||
|
std::string resource;
|
||||||
|
std::string lastOpenError;
|
||||||
|
S3BlobStoreEndpoint::ParametersT backupParams;
|
||||||
|
|
||||||
|
std::string urlCopy = url;
|
||||||
|
|
||||||
|
Reference<S3BlobStoreEndpoint> bstore =
|
||||||
|
S3BlobStoreEndpoint::fromString(url, {}, &resource, &lastOpenError, &backupParams);
|
||||||
|
|
||||||
|
ASSERT(!resource.empty());
|
||||||
|
ASSERT(resource.back() != '/');
|
||||||
|
size_t resourceStart = url.find(resource);
|
||||||
|
ASSERT(resourceStart != std::string::npos);
|
||||||
|
|
||||||
|
return urlCopy.insert(resourceStart + resource.size(), "/" + partition);
|
||||||
|
} else {
|
||||||
|
// FIXME: support azure
|
||||||
|
throw backup_invalid_url();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME: make this (more) deterministic outside of simulation for FDBPerfKmsConnector
|
||||||
|
Standalone<BlobMetadataDetailsRef> createRandomTestBlobMetadata(const std::string& baseUrl,
|
||||||
|
BlobMetadataDomainId domainId,
|
||||||
|
BlobMetadataDomainName domainName) {
|
||||||
|
Standalone<BlobMetadataDetailsRef> metadata;
|
||||||
|
metadata.domainId = domainId;
|
||||||
|
metadata.arena().dependsOn(domainName.arena());
|
||||||
|
metadata.domainName = domainName;
|
||||||
|
// 0 == no partition, 1 == suffix partitioned, 2 == storage location partitioned
|
||||||
|
int type = deterministicRandom()->randomInt(0, 3);
|
||||||
|
int partitionCount = (type == 0) ? 0 : deterministicRandom()->randomInt(2, 12);
|
||||||
|
TraceEvent ev(SevDebug, "SimBlobMetadata");
|
||||||
|
ev.detail("DomainId", domainId).detail("TypeNum", type).detail("PartitionCount", partitionCount);
|
||||||
|
if (type == 0) {
|
||||||
|
// single storage location
|
||||||
|
std::string partition = std::to_string(domainId) + "/";
|
||||||
|
metadata.base = StringRef(metadata.arena(), buildPartitionPath(baseUrl, partition));
|
||||||
|
ev.detail("Base", metadata.base);
|
||||||
|
}
|
||||||
|
if (type == 1) {
|
||||||
|
// simulate hash prefixing in s3
|
||||||
|
metadata.base = StringRef(metadata.arena(), baseUrl);
|
||||||
|
ev.detail("Base", metadata.base);
|
||||||
|
for (int i = 0; i < partitionCount; i++) {
|
||||||
|
metadata.partitions.push_back_deep(metadata.arena(),
|
||||||
|
deterministicRandom()->randomUniqueID().shortString() + "-" +
|
||||||
|
std::to_string(domainId) + "/");
|
||||||
|
ev.detail("P" + std::to_string(i), metadata.partitions.back());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (type == 2) {
|
||||||
|
// simulate separate storage location per partition
|
||||||
|
for (int i = 0; i < partitionCount; i++) {
|
||||||
|
std::string partition = std::to_string(domainId) + "_" + std::to_string(i) + "/";
|
||||||
|
metadata.partitions.push_back_deep(metadata.arena(), buildPartitionPath(baseUrl, partition));
|
||||||
|
ev.detail("P" + std::to_string(i), metadata.partitions.back());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// set random refresh + expire time
|
||||||
|
if (deterministicRandom()->coinflip()) {
|
||||||
|
metadata.refreshAt = now() + deterministicRandom()->random01() * CLIENT_KNOBS->BLOB_METADATA_REFRESH_INTERVAL;
|
||||||
|
metadata.expireAt =
|
||||||
|
metadata.refreshAt + deterministicRandom()->random01() * CLIENT_KNOBS->BLOB_METADATA_REFRESH_INTERVAL;
|
||||||
|
} else {
|
||||||
|
metadata.refreshAt = std::numeric_limits<double>::max();
|
||||||
|
metadata.expireAt = metadata.refreshAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
return metadata;
|
||||||
|
}
|
@ -281,6 +281,7 @@ void ClientKnobs::initialize(Randomize randomize) {
|
|||||||
// Blob granules
|
// Blob granules
|
||||||
init( BG_MAX_GRANULE_PARALLELISM, 10 );
|
init( BG_MAX_GRANULE_PARALLELISM, 10 );
|
||||||
init( BG_TOO_MANY_GRANULES, 10000 );
|
init( BG_TOO_MANY_GRANULES, 10000 );
|
||||||
|
init( BLOB_METADATA_REFRESH_INTERVAL, 3600 ); if ( randomize && BUGGIFY ) { BLOB_METADATA_REFRESH_INTERVAL = deterministicRandom()->randomInt(5, 120); }
|
||||||
|
|
||||||
init( CHANGE_QUORUM_BAD_STATE_RETRY_TIMES, 3 );
|
init( CHANGE_QUORUM_BAD_STATE_RETRY_TIMES, 3 );
|
||||||
init( CHANGE_QUORUM_BAD_STATE_RETRY_DELAY, 2.0 );
|
init( CHANGE_QUORUM_BAD_STATE_RETRY_DELAY, 2.0 );
|
||||||
|
@ -1002,8 +1002,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||||||
// Blob Metadata
|
// Blob Metadata
|
||||||
init( BLOB_METADATA_CACHE_TTL, isSimulated ? 120 : 24 * 60 * 60 );
|
init( BLOB_METADATA_CACHE_TTL, isSimulated ? 120 : 24 * 60 * 60 );
|
||||||
if ( randomize && BUGGIFY) { BLOB_METADATA_CACHE_TTL = deterministicRandom()->randomInt(50, 100); }
|
if ( randomize && BUGGIFY) { BLOB_METADATA_CACHE_TTL = deterministicRandom()->randomInt(50, 100); }
|
||||||
init( BLOB_METADATA_REFRESH_INTERVAL, isSimulated ? 60 : 60 * 60 );
|
|
||||||
if ( randomize && BUGGIFY) { BLOB_METADATA_REFRESH_INTERVAL = deterministicRandom()->randomInt(5, 120); }
|
|
||||||
|
|
||||||
// HTTP KMS Connector
|
// HTTP KMS Connector
|
||||||
init( REST_KMS_CONNECTOR_KMS_DISCOVERY_URL_MODE, "file");
|
init( REST_KMS_CONNECTOR_KMS_DISCOVERY_URL_MODE, "file");
|
||||||
|
@ -91,4 +91,8 @@ struct BlobMetadataDetailsRef {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Standalone<BlobMetadataDetailsRef> createRandomTestBlobMetadata(const std::string& baseUrl,
|
||||||
|
BlobMetadataDomainId domainId,
|
||||||
|
BlobMetadataDomainName domainName);
|
||||||
|
|
||||||
#endif
|
#endif
|
@ -272,6 +272,7 @@ public:
|
|||||||
// Blob Granules
|
// Blob Granules
|
||||||
int BG_MAX_GRANULE_PARALLELISM;
|
int BG_MAX_GRANULE_PARALLELISM;
|
||||||
int BG_TOO_MANY_GRANULES;
|
int BG_TOO_MANY_GRANULES;
|
||||||
|
int64_t BLOB_METADATA_REFRESH_INTERVAL;
|
||||||
|
|
||||||
// The coordinator key/value in storage server might be inconsistent to the value stored in the cluster file.
|
// The coordinator key/value in storage server might be inconsistent to the value stored in the cluster file.
|
||||||
// This might happen when a recovery is happening together with a cluster controller coordinator key change.
|
// This might happen when a recovery is happening together with a cluster controller coordinator key change.
|
||||||
|
@ -979,7 +979,6 @@ public:
|
|||||||
|
|
||||||
// Blob metadata
|
// Blob metadata
|
||||||
int64_t BLOB_METADATA_CACHE_TTL;
|
int64_t BLOB_METADATA_CACHE_TTL;
|
||||||
int64_t BLOB_METADATA_REFRESH_INTERVAL;
|
|
||||||
|
|
||||||
// HTTP KMS Connector
|
// HTTP KMS Connector
|
||||||
std::string REST_KMS_CONNECTOR_KMS_DISCOVERY_URL_MODE;
|
std::string REST_KMS_CONNECTOR_KMS_DISCOVERY_URL_MODE;
|
||||||
|
202
fdbserver/BlobConnectionProviderTest.actor.cpp
Normal file
202
fdbserver/BlobConnectionProviderTest.actor.cpp
Normal file
@ -0,0 +1,202 @@
|
|||||||
|
/*
|
||||||
|
* BlobConnectionProviderTest.actor.cpp
|
||||||
|
*
|
||||||
|
* This source file is part of the FoundationDB open source project
|
||||||
|
*
|
||||||
|
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "fdbclient/BlobConnectionProvider.h"
|
||||||
|
|
||||||
|
#include "flow/UnitTest.h"
|
||||||
|
#include "fdbserver/Knobs.h"
|
||||||
|
#include "flow/actorcompiler.h" // has to be last include
|
||||||
|
|
||||||
|
void forceLinkBlobConnectionProviderTests() {}
|
||||||
|
|
||||||
|
struct ConnectionProviderTestSettings {
|
||||||
|
uint32_t numProviders;
|
||||||
|
uint32_t filesPerProvider;
|
||||||
|
uint32_t maxFileMemory;
|
||||||
|
uint32_t maxFileSize;
|
||||||
|
uint32_t threads;
|
||||||
|
bool uniformProviderChoice;
|
||||||
|
double readWriteSplit;
|
||||||
|
|
||||||
|
double runtime;
|
||||||
|
|
||||||
|
int writeOps;
|
||||||
|
int readOps;
|
||||||
|
|
||||||
|
ConnectionProviderTestSettings() {
|
||||||
|
numProviders = deterministicRandom()->randomSkewedUInt32(1, 1000);
|
||||||
|
filesPerProvider =
|
||||||
|
1 + std::min((uint32_t)100, deterministicRandom()->randomSkewedUInt32(10, 10000) / numProviders);
|
||||||
|
maxFileMemory = 1024 * 1024 * 1024;
|
||||||
|
maxFileSize = maxFileMemory / (numProviders * filesPerProvider);
|
||||||
|
maxFileSize = deterministicRandom()->randomSkewedUInt32(8, std::min((uint32_t)(16 * 1024 * 1024), maxFileSize));
|
||||||
|
threads = deterministicRandom()->randomInt(16, 128);
|
||||||
|
|
||||||
|
uniformProviderChoice = deterministicRandom()->coinflip();
|
||||||
|
readWriteSplit = deterministicRandom()->randomInt(1, 10) / 10.0;
|
||||||
|
|
||||||
|
runtime = 60.0;
|
||||||
|
|
||||||
|
writeOps = 0;
|
||||||
|
readOps = 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct ProviderTestData {
|
||||||
|
Reference<BlobConnectionProvider> provider;
|
||||||
|
std::vector<std::pair<std::string, Value>> data;
|
||||||
|
std::unordered_set<std::string> usedNames;
|
||||||
|
|
||||||
|
ProviderTestData() {}
|
||||||
|
explicit ProviderTestData(Reference<BlobConnectionProvider> provider) : provider(provider) {}
|
||||||
|
};
|
||||||
|
|
||||||
|
ACTOR Future<Void> createObject(ConnectionProviderTestSettings* settings, ProviderTestData* provider) {
|
||||||
|
// pick object name before wait so no collisions between concurrent writes
|
||||||
|
std::string objName;
|
||||||
|
loop {
|
||||||
|
objName = deterministicRandom()->randomAlphaNumeric(12);
|
||||||
|
if (provider->usedNames.insert(objName).second) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int randomDataSize = deterministicRandom()->randomInt(1, settings->maxFileSize);
|
||||||
|
state Value data = makeString(randomDataSize);
|
||||||
|
deterministicRandom()->randomBytes(mutateString(data), randomDataSize);
|
||||||
|
|
||||||
|
state Reference<BackupContainerFileSystem> bstore;
|
||||||
|
state std::string fullPath;
|
||||||
|
std::tie(bstore, fullPath) = provider->provider->createForWrite(objName);
|
||||||
|
|
||||||
|
state Reference<IBackupFile> file = wait(bstore->writeFile(fullPath));
|
||||||
|
wait(file->append(data.begin(), data.size()));
|
||||||
|
wait(file->finish());
|
||||||
|
|
||||||
|
// after write, put in the readable list
|
||||||
|
provider->data.push_back({ fullPath, data });
|
||||||
|
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR Future<Void> readAndVerifyObject(ProviderTestData* provider, std::string objFullPath, Value expectedData) {
|
||||||
|
Reference<BackupContainerFileSystem> bstore = provider->provider->getForRead(objFullPath);
|
||||||
|
state Reference<IAsyncFile> reader = wait(bstore->readFile(objFullPath));
|
||||||
|
|
||||||
|
state Value actualData = makeString(expectedData.size());
|
||||||
|
int readSize = wait(reader->read(mutateString(actualData), expectedData.size(), 0));
|
||||||
|
ASSERT_EQ(expectedData.size(), readSize);
|
||||||
|
ASSERT(expectedData == actualData);
|
||||||
|
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Void> deleteObject(ProviderTestData* provider, std::string objFullPath) {
|
||||||
|
Reference<BackupContainerFileSystem> bstore = provider->provider->getForRead(objFullPath);
|
||||||
|
return bstore->deleteFile(objFullPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR Future<Void> workerThread(ConnectionProviderTestSettings* settings, std::vector<ProviderTestData>* providers) {
|
||||||
|
state double endTime = now() + settings->runtime;
|
||||||
|
try {
|
||||||
|
while (now() < endTime) {
|
||||||
|
// randomly pick provider
|
||||||
|
int providerIdx;
|
||||||
|
if (settings->uniformProviderChoice) {
|
||||||
|
providerIdx = deterministicRandom()->randomInt(0, providers->size());
|
||||||
|
} else {
|
||||||
|
providerIdx = deterministicRandom()->randomSkewedUInt32(0, providers->size());
|
||||||
|
}
|
||||||
|
ProviderTestData* provider = &(*providers)[providerIdx];
|
||||||
|
|
||||||
|
// randomly pick create or read
|
||||||
|
bool doWrite = deterministicRandom()->random01() < settings->readWriteSplit;
|
||||||
|
if (provider->usedNames.size() < settings->filesPerProvider && (provider->data.empty() || doWrite)) {
|
||||||
|
// create an object
|
||||||
|
wait(createObject(settings, provider));
|
||||||
|
settings->writeOps++;
|
||||||
|
} else if (!provider->data.empty()) {
|
||||||
|
// read a random object
|
||||||
|
auto& readInfo = provider->data[deterministicRandom()->randomInt(0, provider->data.size())];
|
||||||
|
wait(readAndVerifyObject(provider, readInfo.first, readInfo.second));
|
||||||
|
settings->readOps++;
|
||||||
|
} else {
|
||||||
|
// other threads are creating files up to filesPerProvider limit, but none finished yet. Just wait
|
||||||
|
wait(delay(0.1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Void();
|
||||||
|
} catch (Error& e) {
|
||||||
|
fmt::print("WorkerThread Unexpected Error {0}\n", e.name());
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR Future<Void> checkAndCleanUp(ProviderTestData* provider) {
|
||||||
|
state int i;
|
||||||
|
ASSERT(provider->usedNames.size() == provider->data.size());
|
||||||
|
|
||||||
|
for (i = 0; i < provider->data.size(); i++) {
|
||||||
|
auto& readInfo = provider->data[i];
|
||||||
|
wait(readAndVerifyObject(provider, readInfo.first, readInfo.second));
|
||||||
|
wait(deleteObject(provider, provider->data[i].first));
|
||||||
|
}
|
||||||
|
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
// maybe this should be a workload instead?
|
||||||
|
TEST_CASE("/fdbserver/blob/connectionprovider") {
|
||||||
|
state ConnectionProviderTestSettings settings;
|
||||||
|
|
||||||
|
state std::vector<ProviderTestData> providers;
|
||||||
|
providers.reserve(settings.numProviders);
|
||||||
|
for (int i = 0; i < settings.numProviders; i++) {
|
||||||
|
std::string nameStr = std::to_string(i);
|
||||||
|
BlobMetadataDomainName name(nameStr);
|
||||||
|
auto metadata = createRandomTestBlobMetadata(SERVER_KNOBS->BG_URL, i, name);
|
||||||
|
providers.emplace_back(BlobConnectionProvider::newBlobConnectionProvider(metadata));
|
||||||
|
}
|
||||||
|
fmt::print("BlobConnectionProviderTest\n");
|
||||||
|
|
||||||
|
state std::vector<Future<Void>> futures;
|
||||||
|
futures.reserve(settings.threads);
|
||||||
|
for (int i = 0; i < settings.threads; i++) {
|
||||||
|
futures.push_back(workerThread(&settings, &providers));
|
||||||
|
}
|
||||||
|
|
||||||
|
wait(waitForAll(futures));
|
||||||
|
|
||||||
|
fmt::print("BlobConnectionProviderTest workload phase complete with {0} files and {1} reads\n",
|
||||||
|
settings.writeOps,
|
||||||
|
settings.readOps);
|
||||||
|
|
||||||
|
futures.clear();
|
||||||
|
futures.reserve(providers.size());
|
||||||
|
for (int i = 0; i < providers.size(); i++) {
|
||||||
|
futures.push_back(checkAndCleanUp(&providers[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
wait(waitForAll(futures));
|
||||||
|
|
||||||
|
fmt::print("BlobConnectionProviderTest check and cleanup phase complete\n");
|
||||||
|
return Void();
|
||||||
|
}
|
@ -625,7 +625,7 @@ bool isBlobMetadataEligibleForRefresh(const BlobMetadataDetailsRef& blobMetadata
|
|||||||
if (BUGGIFY_WITH_PROB(0.01)) {
|
if (BUGGIFY_WITH_PROB(0.01)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
int64_t nextRefreshCycleTS = currTS + SERVER_KNOBS->BLOB_METADATA_REFRESH_INTERVAL;
|
int64_t nextRefreshCycleTS = currTS + CLIENT_KNOBS->BLOB_METADATA_REFRESH_INTERVAL;
|
||||||
return nextRefreshCycleTS > blobMetadata.expireAt || nextRefreshCycleTS > blobMetadata.refreshAt;
|
return nextRefreshCycleTS > blobMetadata.expireAt || nextRefreshCycleTS > blobMetadata.refreshAt;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -895,7 +895,7 @@ ACTOR Future<Void> encryptKeyProxyServer(EncryptKeyProxyInterface ekpInterface,
|
|||||||
TaskPriority::Worker);
|
TaskPriority::Worker);
|
||||||
|
|
||||||
self->blobMetadataRefresher = recurring([&]() { refreshBlobMetadata(self, kmsConnectorInf); },
|
self->blobMetadataRefresher = recurring([&]() { refreshBlobMetadata(self, kmsConnectorInf); },
|
||||||
SERVER_KNOBS->BLOB_METADATA_REFRESH_INTERVAL,
|
CLIENT_KNOBS->BLOB_METADATA_REFRESH_INTERVAL,
|
||||||
TaskPriority::Worker);
|
TaskPriority::Worker);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -192,61 +192,6 @@ ACTOR Future<Void> ekLookupByDomainIds(Reference<SimKmsConnectorContext> ctx,
|
|||||||
success ? req.reply.send(rep) : req.reply.sendError(encrypt_key_not_found());
|
success ? req.reply.send(rep) : req.reply.sendError(encrypt_key_not_found());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
// TODO: switch this to use bg_url instead of hardcoding file://fdbblob, so it works as FDBPerfKmsConnector
|
|
||||||
// FIXME: make this (more) deterministic outside of simulation for FDBPerfKmsConnector
|
|
||||||
static Standalone<BlobMetadataDetailsRef> createBlobMetadata(BlobMetadataDomainId domainId,
|
|
||||||
BlobMetadataDomainName domainName) {
|
|
||||||
Standalone<BlobMetadataDetailsRef> metadata;
|
|
||||||
metadata.domainId = domainId;
|
|
||||||
metadata.arena().dependsOn(domainName.arena());
|
|
||||||
metadata.domainName = domainName;
|
|
||||||
// 0 == no partition, 1 == suffix partitioned, 2 == storage location partitioned
|
|
||||||
int type = deterministicRandom()->randomInt(0, 3);
|
|
||||||
int partitionCount = (type == 0) ? 0 : deterministicRandom()->randomInt(2, 12);
|
|
||||||
fmt::print("SimBlobMetadata ({})\n", domainId);
|
|
||||||
TraceEvent ev(SevDebug, "SimBlobMetadata");
|
|
||||||
ev.detail("DomainId", domainId).detail("TypeNum", type).detail("PartitionCount", partitionCount);
|
|
||||||
if (type == 0) {
|
|
||||||
// single storage location
|
|
||||||
metadata.base = StringRef(metadata.arena(), "file://fdbblob/" + std::to_string(domainId) + "/");
|
|
||||||
fmt::print(" {}\n", metadata.base.get().printable());
|
|
||||||
ev.detail("Base", metadata.base);
|
|
||||||
}
|
|
||||||
if (type == 1) {
|
|
||||||
// simulate hash prefixing in s3
|
|
||||||
metadata.base = StringRef(metadata.arena(), "file://fdbblob/"_sr);
|
|
||||||
ev.detail("Base", metadata.base);
|
|
||||||
fmt::print(" {} ({})\n", metadata.base.get().printable(), partitionCount);
|
|
||||||
for (int i = 0; i < partitionCount; i++) {
|
|
||||||
metadata.partitions.push_back_deep(metadata.arena(),
|
|
||||||
deterministicRandom()->randomUniqueID().shortString() + "-" +
|
|
||||||
std::to_string(domainId) + "/");
|
|
||||||
fmt::print(" {}\n", metadata.partitions.back().printable());
|
|
||||||
ev.detail("P" + std::to_string(i), metadata.partitions.back());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (type == 2) {
|
|
||||||
// simulate separate storage location per partition
|
|
||||||
for (int i = 0; i < partitionCount; i++) {
|
|
||||||
metadata.partitions.push_back_deep(
|
|
||||||
metadata.arena(), "file://fdbblob" + std::to_string(domainId) + "_" + std::to_string(i) + "/");
|
|
||||||
fmt::print(" {}\n", metadata.partitions.back().printable());
|
|
||||||
ev.detail("P" + std::to_string(i), metadata.partitions.back());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// set random refresh + expire time
|
|
||||||
if (deterministicRandom()->coinflip()) {
|
|
||||||
metadata.refreshAt = now() + deterministicRandom()->random01() * SERVER_KNOBS->BLOB_METADATA_REFRESH_INTERVAL;
|
|
||||||
metadata.expireAt =
|
|
||||||
metadata.refreshAt + deterministicRandom()->random01() * SERVER_KNOBS->BLOB_METADATA_REFRESH_INTERVAL;
|
|
||||||
} else {
|
|
||||||
metadata.refreshAt = std::numeric_limits<double>::max();
|
|
||||||
metadata.expireAt = metadata.refreshAt;
|
|
||||||
}
|
|
||||||
|
|
||||||
return metadata;
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTOR Future<Void> blobMetadataLookup(KmsConnectorInterface interf, KmsConnBlobMetadataReq req) {
|
ACTOR Future<Void> blobMetadataLookup(KmsConnectorInterface interf, KmsConnBlobMetadataReq req) {
|
||||||
state KmsConnBlobMetadataRep rep;
|
state KmsConnBlobMetadataRep rep;
|
||||||
@ -261,7 +206,9 @@ ACTOR Future<Void> blobMetadataLookup(KmsConnectorInterface interf, KmsConnBlobM
|
|||||||
if (it == simBlobMetadataStore.end()) {
|
if (it == simBlobMetadataStore.end()) {
|
||||||
// construct new blob metadata
|
// construct new blob metadata
|
||||||
it = simBlobMetadataStore
|
it = simBlobMetadataStore
|
||||||
.insert({ domainInfo.domainId, createBlobMetadata(domainInfo.domainId, domainInfo.domainName) })
|
.insert({ domainInfo.domainId,
|
||||||
|
createRandomTestBlobMetadata(
|
||||||
|
SERVER_KNOBS->BG_URL, domainInfo.domainId, domainInfo.domainName) })
|
||||||
.first;
|
.first;
|
||||||
} else if (now() >= it->second.expireAt) {
|
} else if (now() >= it->second.expireAt) {
|
||||||
// update random refresh and expire time
|
// update random refresh and expire time
|
||||||
|
@ -44,6 +44,7 @@ void forceLinkRESTKmsConnectorTest();
|
|||||||
void forceLinkCompressionUtilsTest();
|
void forceLinkCompressionUtilsTest();
|
||||||
void forceLinkAtomicTests();
|
void forceLinkAtomicTests();
|
||||||
void forceLinkIdempotencyIdTests();
|
void forceLinkIdempotencyIdTests();
|
||||||
|
void forceLinkBlobConnectionProviderTests();
|
||||||
|
|
||||||
struct UnitTestWorkload : TestWorkload {
|
struct UnitTestWorkload : TestWorkload {
|
||||||
static constexpr auto NAME = "UnitTests";
|
static constexpr auto NAME = "UnitTests";
|
||||||
@ -104,6 +105,7 @@ struct UnitTestWorkload : TestWorkload {
|
|||||||
forceLinkCompressionUtilsTest();
|
forceLinkCompressionUtilsTest();
|
||||||
forceLinkAtomicTests();
|
forceLinkAtomicTests();
|
||||||
forceLinkIdempotencyIdTests();
|
forceLinkIdempotencyIdTests();
|
||||||
|
forceLinkBlobConnectionProviderTests();
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<Void> setup(Database const& cx) override {
|
Future<Void> setup(Database const& cx) override {
|
||||||
|
@ -7,4 +7,4 @@ startDelay = 0
|
|||||||
[[test.workload]]
|
[[test.workload]]
|
||||||
testName = 'UnitTests'
|
testName = 'UnitTests'
|
||||||
maxTestCases = 0
|
maxTestCases = 0
|
||||||
testsMatching = '/GrvProxyTransactionTagThrottler/Fifo'
|
testsMatching = '/'
|
||||||
|
Loading…
x
Reference in New Issue
Block a user