mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-24 16:20:15 +08:00
Merge branch 'master' into feature-remote-logs
# Conflicts: # tests/fast/SidebandWithStatus.txt # tests/rare/LargeApiCorrectnessStatus.txt # tests/slow/DDBalanceAndRemoveStatus.txt
This commit is contained in:
commit
3ec45d38a0
fdbbackup
fdbclient
BackupContainer.actor.cppDatabaseBackupAgent.actor.cppFileBackupAgent.actor.cppKnobs.cppKnobs.hNativeAPI.actor.cppStatusClient.actor.cppTaskBucket.h
vexillographer
fdbmonitor
fdbrpc
fdbserver
flow
tests
fast
rare
slow
status
@ -1837,16 +1837,16 @@ ACTOR Future<Void> expireBackupData(const char *name, std::string destinationCon
|
||||
ACTOR Future<Void> deleteBackupContainer(const char *name, std::string destinationContainer) {
|
||||
try {
|
||||
state Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer);
|
||||
state int numDeleted = 0;
|
||||
state Future<Void> done = c->deleteContainer(&numDeleted);
|
||||
|
||||
loop {
|
||||
choose {
|
||||
when ( Void _ = wait(c->deleteContainer()) ) {
|
||||
when ( Void _ = wait(done) ) {
|
||||
printf("The entire container has been deleted.\n");
|
||||
break;
|
||||
}
|
||||
when ( Void _ = wait(delay(3)) ) {
|
||||
int numDeleted = 0;
|
||||
c->deleteContainer(&numDeleted);
|
||||
printf("%d files have been deleted so far...\n", numDeleted);
|
||||
}
|
||||
}
|
||||
|
@ -149,36 +149,42 @@ public:
|
||||
return writeFile(format("ranges/%s/range,%lld,%s,%d", rangeVersionFolderString(version).c_str(), version, g_random->randomUniqueID().toString().c_str(), blockSize));
|
||||
}
|
||||
|
||||
static RangeFile pathToRangeFile(std::string path, int64_t size) {
|
||||
static bool pathToRangeFile(RangeFile &out, std::string path, int64_t size) {
|
||||
std::string name = basename(path);
|
||||
RangeFile f;
|
||||
f.fileName = path;
|
||||
f.fileSize = size;
|
||||
int len;
|
||||
if(sscanf(name.c_str(), "range,%lld,%*[^,],%u%n", &f.version, &f.blockSize, &len) == 2 && len == name.size())
|
||||
return f;
|
||||
throw restore_unknown_file_type();
|
||||
if(sscanf(name.c_str(), "range,%lld,%*[^,],%u%n", &f.version, &f.blockSize, &len) == 2 && len == name.size()) {
|
||||
out = f;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static LogFile pathToLogFile(std::string path, int64_t size) {
|
||||
static bool pathToLogFile(LogFile &out, std::string path, int64_t size) {
|
||||
std::string name = basename(path);
|
||||
LogFile f;
|
||||
f.fileName = path;
|
||||
f.fileSize = size;
|
||||
int len;
|
||||
if(sscanf(name.c_str(), "log,%lld,%lld,%*[^,],%u%n", &f.beginVersion, &f.endVersion, &f.blockSize, &len) == 3 && len == name.size())
|
||||
return f;
|
||||
throw restore_unknown_file_type();
|
||||
if(sscanf(name.c_str(), "log,%lld,%lld,%*[^,],%u%n", &f.beginVersion, &f.endVersion, &f.blockSize, &len) == 3 && len == name.size()) {
|
||||
out = f;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static KeyspaceSnapshotFile pathToKeyspaceSnapshotFile(std::string path) {
|
||||
static bool pathToKeyspaceSnapshotFile(KeyspaceSnapshotFile &out, std::string path) {
|
||||
std::string name = basename(path);
|
||||
KeyspaceSnapshotFile f;
|
||||
f.fileName = path;
|
||||
int len;
|
||||
if(sscanf(name.c_str(), "snapshot,%lld,%lld,%lld%n", &f.beginVersion, &f.endVersion, &f.totalSize, &len) == 3 && len == name.size())
|
||||
return f;
|
||||
throw restore_unknown_file_type();
|
||||
if(sscanf(name.c_str(), "snapshot,%lld,%lld,%lld%n", &f.beginVersion, &f.endVersion, &f.totalSize, &len) == 3 && len == name.size()) {
|
||||
out = f;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO: Do this more efficiently, as the range file list for a snapshot could potentially be hundreds of megabytes.
|
||||
@ -236,14 +242,18 @@ public:
|
||||
|
||||
Version minVer = std::numeric_limits<Version>::max();
|
||||
Version maxVer = std::numeric_limits<Version>::min();
|
||||
RangeFile rf;
|
||||
|
||||
for(auto &f : fileNames) {
|
||||
array.push_back(f);
|
||||
RangeFile rf = pathToRangeFile(f, 0);
|
||||
if(rf.version < minVer)
|
||||
minVer = rf.version;
|
||||
if(rf.version > maxVer)
|
||||
maxVer = rf.version;
|
||||
if(pathToRangeFile(rf, f, 0)) {
|
||||
array.push_back(f);
|
||||
if(rf.version < minVer)
|
||||
minVer = rf.version;
|
||||
if(rf.version > maxVer)
|
||||
maxVer = rf.version;
|
||||
}
|
||||
else
|
||||
throw restore_unknown_file_type();
|
||||
}
|
||||
|
||||
doc.create("totalBytes") = totalBytes;
|
||||
@ -267,9 +277,9 @@ public:
|
||||
Future<std::vector<LogFile>> listLogFiles(Version beginVersion = std::numeric_limits<Version>::min(), Version endVersion = std::numeric_limits<Version>::max()) {
|
||||
return map(listFiles("logs/"), [=](const FilesAndSizesT &files) {
|
||||
std::vector<LogFile> results;
|
||||
LogFile lf;
|
||||
for(auto &f : files) {
|
||||
LogFile lf = pathToLogFile(f.first, f.second);
|
||||
if(lf.endVersion > beginVersion && lf.beginVersion < endVersion)
|
||||
if(pathToLogFile(lf, f.first, f.second) && lf.endVersion > beginVersion && lf.beginVersion < endVersion)
|
||||
results.push_back(lf);
|
||||
}
|
||||
std::sort(results.begin(), results.end());
|
||||
@ -279,9 +289,9 @@ public:
|
||||
Future<std::vector<RangeFile>> listRangeFiles(Version beginVersion = std::numeric_limits<Version>::min(), Version endVersion = std::numeric_limits<Version>::max()) {
|
||||
return map(listFiles("ranges/"), [=](const FilesAndSizesT &files) {
|
||||
std::vector<RangeFile> results;
|
||||
RangeFile rf;
|
||||
for(auto &f : files) {
|
||||
RangeFile rf = pathToRangeFile(f.first, f.second);
|
||||
if(rf.version >= beginVersion && rf.version <= endVersion)
|
||||
if(pathToRangeFile(rf, f.first, f.second) && rf.version >= beginVersion && rf.version <= endVersion)
|
||||
results.push_back(rf);
|
||||
}
|
||||
std::sort(results.begin(), results.end());
|
||||
@ -291,9 +301,9 @@ public:
|
||||
Future<std::vector<KeyspaceSnapshotFile>> listKeyspaceSnapshots(Version beginVersion = std::numeric_limits<Version>::min(), Version endVersion = std::numeric_limits<Version>::max()) {
|
||||
return map(listFiles("snapshots/"), [=](const FilesAndSizesT &files) {
|
||||
std::vector<KeyspaceSnapshotFile> results;
|
||||
KeyspaceSnapshotFile sf;
|
||||
for(auto &f : files) {
|
||||
KeyspaceSnapshotFile sf = pathToKeyspaceSnapshotFile(f.first);
|
||||
if(sf.endVersion > beginVersion && sf.beginVersion < endVersion)
|
||||
if(pathToKeyspaceSnapshotFile(sf, f.first) && sf.endVersion > beginVersion && sf.beginVersion < endVersion)
|
||||
results.push_back(sf);
|
||||
}
|
||||
std::sort(results.begin(), results.end());
|
||||
@ -724,8 +734,8 @@ public:
|
||||
// TODO: If there is a need, this can be made faster by discovering common prefixes and listing levels of the folder structure in parallel.
|
||||
ACTOR static Future<Void> deleteContainer_impl(Reference<BackupContainerBlobStore> bc, int *pNumDeleted) {
|
||||
state PromiseStream<BlobStoreEndpoint::ListResult> resultStream;
|
||||
state Future<Void> done = bc->m_bstore->listBucketStream(BACKUP_BUCKET, resultStream, bc->m_name + "/");
|
||||
state std::vector<Future<Void>> deleteFutures;
|
||||
state Future<Void> done = bc->m_bstore->listBucketStream(BACKUP_BUCKET, resultStream, bc->m_name + "/", '/', std::numeric_limits<int>::max());
|
||||
state std::list<Future<Void>> deleteFutures;
|
||||
loop {
|
||||
choose {
|
||||
when(Void _ = wait(done)) {
|
||||
@ -740,11 +750,20 @@ public:
|
||||
return Void();
|
||||
}));
|
||||
}
|
||||
|
||||
while(deleteFutures.size() > CLIENT_KNOBS->BLOBSTORE_CONCURRENT_REQUESTS) {
|
||||
Void _ = wait(deleteFutures.front());
|
||||
deleteFutures.pop_front();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Void _ = wait(waitForAll(deleteFutures));
|
||||
while(deleteFutures.size() > 0) {
|
||||
Void _ = wait(deleteFutures.front());
|
||||
deleteFutures.pop_front();
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -1445,8 +1445,8 @@ public:
|
||||
tr3.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr3.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Version destVersion = wait(tr3.getReadVersion());
|
||||
TraceEvent("DBA_switchover_version_upgrade").detail("src", commitVersion).detail("dest", destVersion);
|
||||
if (destVersion <= commitVersion) {
|
||||
TraceEvent("DBA_switchover_version_upgrade").detail("src", commitVersion).detail("dest", destVersion);
|
||||
TEST(true); // Forcing dest backup cluster to higher version
|
||||
tr3.set(minRequiredCommitVersionKey, BinaryWriter::toValue(commitVersion+1, Unversioned()));
|
||||
Void _ = wait(tr3.commit());
|
||||
@ -1549,12 +1549,20 @@ public:
|
||||
if (lastApplied.present()) {
|
||||
Version current = tr->getReadVersion().get();
|
||||
Version applied = BinaryReader::fromStringRef<Version>(lastApplied.get(), Unversioned());
|
||||
TraceEvent("DBA_abort_version_upgrade").detail("src", applied).detail("dest", current);
|
||||
if (current <= applied) {
|
||||
TraceEvent("DBA_abort_version_upgrade").detail("src", applied).detail("dest", current);
|
||||
TEST(true); // Upgrading version of local database.
|
||||
// The +1 is because we want to make sure that a versionstamped operation can't reuse
|
||||
// the same version as an already-applied transaction.
|
||||
tr->set(minRequiredCommitVersionKey, BinaryWriter::toValue(applied+1, Unversioned()));
|
||||
} else {
|
||||
// We need to enforce that the read we did of the applyMutationsBeginKey is the most
|
||||
// recent and up to date value, as the proxy might have accepted a commit previously
|
||||
// queued by dumpData after our read. Transactions that don't have write conflict ranges
|
||||
// have a no-op commit(), as they become snapshot transactions to which we don't promise
|
||||
// strict serializability. Therefore, we add an arbitrary write conflict range to
|
||||
// request the strict serializability guarantee that is required.
|
||||
tr->addWriteConflictRange(singleKeyRange(minRequiredCommitVersionKey));
|
||||
}
|
||||
}
|
||||
Void _ = wait(tr->commit());
|
||||
|
@ -674,6 +674,86 @@ namespace fileBackup {
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> abortOldBackup(FileBackupAgent* backupAgent, Reference<ReadYourWritesTransaction> tr, std::string tagName) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
state Subspace tagNames = backupAgent->subspace.get(BackupAgentBase::keyTagName);
|
||||
Optional<Value> uidStr = wait(tr->get(tagNames.pack(Key(tagName))));
|
||||
if (!uidStr.present()) {
|
||||
TraceEvent(SevError, "BackupTagMissing").detail("tagName", tagName.c_str());
|
||||
throw backup_error();
|
||||
}
|
||||
state UID uid = BinaryReader::fromStringRef<UID>(uidStr.get(), Unversioned());
|
||||
|
||||
state Subspace statusSpace = backupAgent->subspace.get(BackupAgentBase::keyStates).get(uid.toString());
|
||||
state Subspace globalConfig = backupAgent->subspace.get(BackupAgentBase::keyConfig).get(uid.toString());
|
||||
state Subspace newConfigSpace = uidPrefixKey(LiteralStringRef("uid->config/").withPrefix(fileBackupPrefixRange.begin), uid);
|
||||
|
||||
Optional<Value> statusStr = wait(tr->get(statusSpace.pack(FileBackupAgent::keyStateStatus)));
|
||||
state EBackupState status = !statusStr.present() ? FileBackupAgent::STATE_NEVERRAN : BackupAgentBase::getState(statusStr.get().toString());
|
||||
|
||||
if (!backupAgent->isRunnable(status)) {
|
||||
throw backup_unneeded();
|
||||
}
|
||||
|
||||
TraceEvent(SevInfo, "FileBackupAbortOld")
|
||||
.detail("tagName", tagName.c_str())
|
||||
.detail("status", BackupAgentBase::getStateText(status));
|
||||
|
||||
// Clear the folder id will prevent future tasks from executing
|
||||
tr->clear(singleKeyRange(StringRef(globalConfig.pack(FileBackupAgent::keyFolderId))));
|
||||
|
||||
Key configPath = uidPrefixKey(logRangesRange.begin, uid);
|
||||
Key logsPath = uidPrefixKey(backupLogKeys.begin, uid);
|
||||
|
||||
tr->clear(KeyRangeRef(configPath, strinc(configPath)));
|
||||
tr->clear(KeyRangeRef(logsPath, strinc(logsPath)));
|
||||
tr->clear(newConfigSpace.range());
|
||||
|
||||
Key statusKey = StringRef(statusSpace.pack(FileBackupAgent::keyStateStatus));
|
||||
tr->set(statusKey, StringRef(FileBackupAgent::getStateText(BackupAgentBase::STATE_ABORTED)));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
struct AbortOldBackupTask : TaskFuncBase {
|
||||
static StringRef name;
|
||||
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
|
||||
state FileBackupAgent backupAgent;
|
||||
state std::string tagName = task->params[BackupAgentBase::keyConfigBackupTag].toString();
|
||||
|
||||
TEST(true); // Canceling old backup task
|
||||
|
||||
TraceEvent(SevInfo, "FileBackupCancelOldTask")
|
||||
.detail("task", printable(task->params[Task::reservedTaskParamKeyType]))
|
||||
.detail("tagName", tagName);
|
||||
Void _ = wait(abortOldBackup(&backupAgent, tr, tagName));
|
||||
|
||||
Void _ = wait(taskBucket->finish(tr, task));
|
||||
return Void();
|
||||
}
|
||||
|
||||
virtual StringRef getName() const {
|
||||
TraceEvent(SevError, "FileBackupError").detail("cause", "AbortOldBackupTaskFunc::name() should never be called");
|
||||
ASSERT(false);
|
||||
return StringRef();
|
||||
}
|
||||
|
||||
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return Future<Void>(Void()); };
|
||||
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
|
||||
};
|
||||
StringRef AbortOldBackupTask::name = LiteralStringRef("abort_legacy_backup");
|
||||
REGISTER_TASKFUNC(AbortOldBackupTask);
|
||||
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTask, file_backup_diff_logs);
|
||||
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTask, file_backup_log_range);
|
||||
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTask, file_backup_logs);
|
||||
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTask, file_backup_range);
|
||||
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTask, file_backup_restorable);
|
||||
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTask, file_finish_full_backup);
|
||||
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTask, file_finished_full_backup);
|
||||
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTask, file_start_full_backup);
|
||||
|
||||
std::function<void(Reference<Task>)> NOP_SETUP_TASK_FN = [](Reference<Task> task) { /* NOP */ };
|
||||
ACTOR static Future<Key> addBackupTask(StringRef name,
|
||||
uint32_t version,
|
||||
@ -1011,7 +1091,7 @@ namespace fileBackup {
|
||||
}
|
||||
|
||||
};
|
||||
StringRef BackupRangeTaskFunc::name = LiteralStringRef("file_backup_range");
|
||||
StringRef BackupRangeTaskFunc::name = LiteralStringRef("file_backup_write_range");
|
||||
const uint32_t BackupRangeTaskFunc::version = 1;
|
||||
REGISTER_TASKFUNC(BackupRangeTaskFunc);
|
||||
|
||||
@ -1237,11 +1317,11 @@ namespace fileBackup {
|
||||
// timeElapsed is between 0 and 1 and represents what portion of the shards we should have completed by now
|
||||
double timeElapsed;
|
||||
if(snapshotTargetEndVersion > snapshotBeginVersion)
|
||||
timeElapsed = std::max(1.0, (double)(nextDispatchVersion - snapshotBeginVersion) / (snapshotTargetEndVersion - snapshotBeginVersion));
|
||||
timeElapsed = std::min(1.0, (double)(nextDispatchVersion - snapshotBeginVersion) / (snapshotTargetEndVersion - snapshotBeginVersion));
|
||||
else
|
||||
timeElapsed = 1.0;
|
||||
|
||||
state int countExpectedShardsDone = std::min<int>(countAllShards, countAllShards * timeElapsed);
|
||||
state int countExpectedShardsDone = countAllShards * timeElapsed;
|
||||
state int countShardsToDispatch = std::max<int>(0, countExpectedShardsDone - countShardsDone);
|
||||
|
||||
TraceEvent("FileBackupSnapshotDispatchStats")
|
||||
@ -1435,7 +1515,7 @@ namespace fileBackup {
|
||||
}
|
||||
|
||||
};
|
||||
StringRef BackupSnapshotDispatchTask::name = LiteralStringRef("file_backup_snapshot_dispatch");
|
||||
StringRef BackupSnapshotDispatchTask::name = LiteralStringRef("file_backup_dispatch_ranges");
|
||||
const uint32_t BackupSnapshotDispatchTask::version = 1;
|
||||
REGISTER_TASKFUNC(BackupSnapshotDispatchTask);
|
||||
|
||||
@ -1634,7 +1714,7 @@ namespace fileBackup {
|
||||
}
|
||||
};
|
||||
|
||||
StringRef BackupLogRangeTaskFunc::name = LiteralStringRef("file_backup_log_range");
|
||||
StringRef BackupLogRangeTaskFunc::name = LiteralStringRef("file_backup_write_logs");
|
||||
const uint32_t BackupLogRangeTaskFunc::version = 1;
|
||||
REGISTER_TASKFUNC(BackupLogRangeTaskFunc);
|
||||
|
||||
@ -1722,18 +1802,18 @@ namespace fileBackup {
|
||||
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return Void(); };
|
||||
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
|
||||
};
|
||||
StringRef BackupLogsDispatchTask::name = LiteralStringRef("file_backup_logs");
|
||||
StringRef BackupLogsDispatchTask::name = LiteralStringRef("file_backup_dispatch_logs");
|
||||
const uint32_t BackupLogsDispatchTask::version = 1;
|
||||
REGISTER_TASKFUNC(BackupLogsDispatchTask);
|
||||
|
||||
struct FinishedFullBackupTaskFunc : BackupTaskFuncBase {
|
||||
struct FileBackupFinishedTask : BackupTaskFuncBase {
|
||||
static StringRef name;
|
||||
static const uint32_t version;
|
||||
|
||||
StringRef getName() const { return name; };
|
||||
|
||||
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
|
||||
Void _ = wait(checkTaskVersion(tr->getDatabase(), task, FinishedFullBackupTaskFunc::name, FinishedFullBackupTaskFunc::version));
|
||||
Void _ = wait(checkTaskVersion(tr->getDatabase(), task, FileBackupFinishedTask::name, FileBackupFinishedTask::version));
|
||||
|
||||
state BackupConfig backup(task);
|
||||
state UID uid = backup.getUid();
|
||||
@ -1754,8 +1834,8 @@ namespace fileBackup {
|
||||
}
|
||||
|
||||
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
|
||||
Key key = wait(addBackupTask(FinishedFullBackupTaskFunc::name,
|
||||
FinishedFullBackupTaskFunc::version,
|
||||
Key key = wait(addBackupTask(FileBackupFinishedTask::name,
|
||||
FileBackupFinishedTask::version,
|
||||
tr, taskBucket, completionKey,
|
||||
BackupConfig(parentTask), waitFor));
|
||||
return key;
|
||||
@ -1764,13 +1844,9 @@ namespace fileBackup {
|
||||
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return Void(); };
|
||||
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
|
||||
};
|
||||
StringRef FinishedFullBackupTaskFunc::name = LiteralStringRef("file_finished_full_backup");
|
||||
const uint32_t FinishedFullBackupTaskFunc::version = 1;
|
||||
REGISTER_TASKFUNC(FinishedFullBackupTaskFunc);
|
||||
|
||||
// TODO: Register a task that will finish/delete any tasks of these types:
|
||||
//LiteralStringRef("file_backup_diff_logs");
|
||||
//LiteralStringRef("file_finish_full_backup");
|
||||
StringRef FileBackupFinishedTask::name = LiteralStringRef("file_backup_finished");
|
||||
const uint32_t FileBackupFinishedTask::version = 1;
|
||||
REGISTER_TASKFUNC(FileBackupFinishedTask);
|
||||
|
||||
struct BackupSnapshotManifest : BackupTaskFuncBase {
|
||||
static StringRef name;
|
||||
@ -1918,7 +1994,7 @@ namespace fileBackup {
|
||||
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
|
||||
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
|
||||
};
|
||||
StringRef BackupSnapshotManifest::name = LiteralStringRef("file_backup_snapshot_manifest");
|
||||
StringRef BackupSnapshotManifest::name = LiteralStringRef("file_backup_write_snapshot_manifest");
|
||||
const uint32_t BackupSnapshotManifest::version = 1;
|
||||
REGISTER_TASKFUNC(BackupSnapshotManifest);
|
||||
|
||||
@ -1978,7 +2054,7 @@ namespace fileBackup {
|
||||
|
||||
// If a clean stop is requested, the log and snapshot tasks will quit after the backup is restorable, then the following
|
||||
// task will clean up and set the completed state.
|
||||
Key _ = wait(FinishedFullBackupTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal(), backupFinished));
|
||||
Key _ = wait(FileBackupFinishedTask::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal(), backupFinished));
|
||||
|
||||
Void _ = wait(taskBucket->finish(tr, task));
|
||||
return Void();
|
||||
@ -1998,7 +2074,7 @@ namespace fileBackup {
|
||||
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
|
||||
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
|
||||
};
|
||||
StringRef StartFullBackupTaskFunc::name = LiteralStringRef("file_start_full_backup");
|
||||
StringRef StartFullBackupTaskFunc::name = LiteralStringRef("file_backup_start");
|
||||
const uint32_t StartFullBackupTaskFunc::version = 1;
|
||||
REGISTER_TASKFUNC(StartFullBackupTaskFunc);
|
||||
|
||||
|
@ -145,9 +145,11 @@ ClientKnobs::ClientKnobs(bool randomize) {
|
||||
init( BLOBSTORE_CONNECT_TIMEOUT, 10 );
|
||||
init( BLOBSTORE_MAX_CONNECTION_LIFE, 120 );
|
||||
init( BLOBSTORE_REQUEST_TRIES, 10 );
|
||||
init( BLOBSTORE_REQUEST_TIMEOUT, 30 );
|
||||
init( BLOBSTORE_REQUEST_TIMEOUT, 60 );
|
||||
|
||||
init( BLOBSTORE_CONCURRENT_UPLOADS, BACKUP_TASKS_PER_AGENT*2 );
|
||||
init( BLOBSTORE_CONCURRENT_LISTS, 20 );
|
||||
init( BLOBSTORE_CONCURRENT_REQUESTS, BLOBSTORE_CONCURRENT_UPLOADS + BLOBSTORE_CONCURRENT_LISTS + 5);
|
||||
|
||||
init( BLOBSTORE_CONCURRENT_WRITES_PER_FILE, 5 );
|
||||
init( BLOBSTORE_CONCURRENT_READS_PER_FILE, 3 );
|
||||
@ -159,7 +161,6 @@ ClientKnobs::ClientKnobs(bool randomize) {
|
||||
|
||||
// These are basically unlimited by default but can be used to reduce blob IO if needed
|
||||
init( BLOBSTORE_REQUESTS_PER_SECOND, 200 );
|
||||
init( BLOBSTORE_CONCURRENT_REQUESTS, (BACKUP_TASKS_PER_AGENT*2)+5 );
|
||||
init( BLOBSTORE_MAX_SEND_BYTES_PER_SECOND, 1e9 );
|
||||
init( BLOBSTORE_MAX_RECV_BYTES_PER_SECOND, 1e9 );
|
||||
|
||||
|
@ -155,6 +155,7 @@ public:
|
||||
int BLOBSTORE_MULTIPART_MAX_PART_SIZE;
|
||||
int BLOBSTORE_MULTIPART_MIN_PART_SIZE;
|
||||
int BLOBSTORE_CONCURRENT_UPLOADS;
|
||||
int BLOBSTORE_CONCURRENT_LISTS;
|
||||
int BLOBSTORE_CONCURRENT_WRITES_PER_FILE;
|
||||
int BLOBSTORE_CONCURRENT_READS_PER_FILE;
|
||||
int BLOBSTORE_READ_BLOCK_SIZE;
|
||||
|
@ -490,7 +490,7 @@ ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<Cluster
|
||||
|
||||
ClusterConnectionString fileConnectionString;
|
||||
if (ccf && !ccf->fileContentsUpToDate(fileConnectionString)) {
|
||||
req.issues = LiteralStringRef("unable_to_write_cluster_file");
|
||||
req.issues = LiteralStringRef("incorrect_cluster_file_contents");
|
||||
if(ccf->canGetFilename()) {
|
||||
TraceEvent(SevWarnAlways, "IncorrectClusterFileContents").detail("Filename", ccf->getFilename())
|
||||
.detail("ConnectionStringFromFile", fileConnectionString.toString())
|
||||
|
@ -305,12 +305,13 @@ ACTOR Future<StatusObject> clientStatusFetcher(Reference<ClusterConnectionFile>
|
||||
statusObj["cluster_file"] = statusObjClusterFile;
|
||||
|
||||
if (!contentsUpToDate){
|
||||
std::string description = "Cluster file is not up to date.\nIt contains the connection string: ";
|
||||
std::string description = "Cluster file contents do not match current cluster connection string.";
|
||||
description += "\nThe file contains the connection string: ";
|
||||
description += ClusterConnectionFile(f->getFilename()).getConnectionString().toString().c_str();
|
||||
description += "\nThe original connection string is: ";
|
||||
description += "\nThe current connection string is: ";
|
||||
description += f->getConnectionString().toString().c_str();
|
||||
description += "\nThis must mean that file permissions or other platform issues have prevented the file from being updated. To change coordinators without manual intervention, the cluster file and its containing folder must be writable by all servers and clients. If a majority of the coordinators referenced by the old connection string are lost, the database will stop working until the correct cluster file is distributed to all processes.";
|
||||
messages->push_back(makeMessage("inconsistent_cluster_file", description.c_str()));
|
||||
description += "\nVerify cluster file is writable and has not been overwritten externally. To change coordinators without manual intervention, the cluster file and its containing folder must be writable by all servers and clients. If a majority of the coordinators referenced by the old connection string are lost, the database will stop working until the correct cluster file is distributed to all processes.";
|
||||
messages->push_back(makeMessage("incorrect_cluster_file_contents", description.c_str()));
|
||||
}
|
||||
|
||||
return statusObj;
|
||||
|
@ -391,6 +391,7 @@ struct TaskFuncBase : IDispatched<TaskFuncBase, Standalone<StringRef>, std::func
|
||||
};
|
||||
};
|
||||
#define REGISTER_TASKFUNC(TaskFunc) REGISTER_FACTORY(TaskFuncBase, TaskFunc, name)
|
||||
#define REGISTER_TASKFUNC_ALIAS(TaskFunc, Alias) REGISTER_DISPATCHED_ALIAS(TaskFunc, Alias, TaskFunc::name, LiteralStringRef(#Alias))
|
||||
|
||||
struct TaskCompletionKey {
|
||||
Future<Key> get(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket);
|
||||
|
@ -168,7 +168,6 @@ namespace vexillographer
|
||||
|
||||
private static void writePredicateClass(TextWriter outFile, Scope scope, IEnumerable<Option> options)
|
||||
{
|
||||
string className = scope.ToString() + "s";
|
||||
outFile.WriteLine(
|
||||
@"package com.apple.foundationdb;
|
||||
|
||||
|
@ -88,7 +88,7 @@ namespace vexillographer
|
||||
}
|
||||
|
||||
IEnumerable<Option> options;
|
||||
int result = parseOptions(args[0], out options);
|
||||
int result = parseOptions(args[0], out options, args[1]);
|
||||
if (result != 0)
|
||||
return result;
|
||||
|
||||
@ -117,7 +117,7 @@ namespace vexillographer
|
||||
Environment.GetCommandLineArgs()[0]);
|
||||
}
|
||||
|
||||
private static int parseOptions(string path, out IEnumerable<Option> options)
|
||||
private static int parseOptions(string path, out IEnumerable<Option> options, string binding)
|
||||
{
|
||||
|
||||
var list = new List<Option>();
|
||||
@ -132,18 +132,28 @@ namespace vexillographer
|
||||
{
|
||||
var paramTypeStr = oDoc.AttributeOrNull("paramType");
|
||||
ParamType p = paramTypeStr == null ? ParamType.None : (ParamType)Enum.Parse(typeof(ParamType), paramTypeStr);
|
||||
bool hidden = oDoc.AttributeOrNull("hidden") == "true";
|
||||
|
||||
list.Add(new Option
|
||||
bool hidden = oDoc.AttributeOrNull("hidden") == "true";
|
||||
string disableOn = oDoc.AttributeOrNull("disableOn");
|
||||
bool disabled = false;
|
||||
if(disableOn != null)
|
||||
{
|
||||
scope = s,
|
||||
name = oDoc.AttributeNonNull("name"),
|
||||
code = int.Parse(oDoc.AttributeNonNull("code")),
|
||||
paramType = p,
|
||||
paramDesc = oDoc.AttributeOrNull("paramDescription"),
|
||||
comment = oDoc.AttributeOrNull("description"),
|
||||
hidden = hidden
|
||||
});
|
||||
string[] disabledBindings = disableOn.Split(',');
|
||||
disabled = disabledBindings.Contains(binding);
|
||||
}
|
||||
|
||||
if (!disabled)
|
||||
{
|
||||
list.Add(new Option
|
||||
{
|
||||
scope = s,
|
||||
name = oDoc.AttributeNonNull("name"),
|
||||
code = int.Parse(oDoc.AttributeNonNull("code")),
|
||||
paramType = p,
|
||||
paramDesc = oDoc.AttributeOrNull("paramDescription"),
|
||||
comment = oDoc.AttributeOrNull("description"),
|
||||
hidden = hidden
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
options = list;
|
||||
|
@ -655,62 +655,64 @@ void load_conf(const char* confpath, uid_t &uid, gid_t &gid, sigset_t* mask, fdb
|
||||
ini.SetUnicode();
|
||||
|
||||
SI_Error err = ini.LoadFile(confpath);
|
||||
if (err<0) {
|
||||
bool loadedConf = err >= 0;
|
||||
if (!loadedConf) {
|
||||
log_msg(SevError, "Unable to load configuration file %s (SI_Error: %d, errno: %d)\n", confpath, err, errno);
|
||||
return;
|
||||
}
|
||||
|
||||
uid_t _uid;
|
||||
gid_t _gid;
|
||||
if(loadedConf) {
|
||||
uid_t _uid;
|
||||
gid_t _gid;
|
||||
|
||||
const char* user = ini.GetValue("fdbmonitor", "user", NULL);
|
||||
const char* group = ini.GetValue("fdbmonitor", "group", NULL);
|
||||
const char* user = ini.GetValue("fdbmonitor", "user", NULL);
|
||||
const char* group = ini.GetValue("fdbmonitor", "group", NULL);
|
||||
|
||||
if (user) {
|
||||
errno = 0;
|
||||
struct passwd* pw = getpwnam(user);
|
||||
if (!pw) {
|
||||
log_err( "getpwnam", errno, "Unable to lookup user %s", user );
|
||||
return;
|
||||
}
|
||||
_uid = pw->pw_uid;
|
||||
} else
|
||||
_uid = geteuid();
|
||||
if (user) {
|
||||
errno = 0;
|
||||
struct passwd* pw = getpwnam(user);
|
||||
if (!pw) {
|
||||
log_err( "getpwnam", errno, "Unable to lookup user %s", user );
|
||||
return;
|
||||
}
|
||||
_uid = pw->pw_uid;
|
||||
} else
|
||||
_uid = geteuid();
|
||||
|
||||
if (group) {
|
||||
errno = 0;
|
||||
struct group* gr = getgrnam(group);
|
||||
if (!gr) {
|
||||
log_err( "getgrnam", errno, "Unable to lookup group %s", group );
|
||||
return;
|
||||
}
|
||||
_gid = gr->gr_gid;
|
||||
} else
|
||||
_gid = getegid();
|
||||
if (group) {
|
||||
errno = 0;
|
||||
struct group* gr = getgrnam(group);
|
||||
if (!gr) {
|
||||
log_err( "getgrnam", errno, "Unable to lookup group %s", group );
|
||||
return;
|
||||
}
|
||||
_gid = gr->gr_gid;
|
||||
} else
|
||||
_gid = getegid();
|
||||
|
||||
/* Any change to uid or gid requires the process to be restarted to take effect */
|
||||
if (uid != _uid || gid != _gid) {
|
||||
std::vector<uint64_t> kill_ids;
|
||||
for (auto i : id_pid) {
|
||||
if(id_command[i.first]->kill_on_configuration_change) {
|
||||
kill_ids.push_back(i.first);
|
||||
/* Any change to uid or gid requires the process to be restarted to take effect */
|
||||
if (uid != _uid || gid != _gid) {
|
||||
std::vector<uint64_t> kill_ids;
|
||||
for (auto i : id_pid) {
|
||||
if(id_command[i.first]->kill_on_configuration_change) {
|
||||
kill_ids.push_back(i.first);
|
||||
}
|
||||
}
|
||||
for (auto i : kill_ids) {
|
||||
kill_process(i);
|
||||
delete id_command[i];
|
||||
id_command.erase(i);
|
||||
}
|
||||
}
|
||||
for (auto i : kill_ids) {
|
||||
kill_process(i);
|
||||
delete id_command[i];
|
||||
id_command.erase(i);
|
||||
}
|
||||
}
|
||||
|
||||
uid = _uid;
|
||||
gid = _gid;
|
||||
uid = _uid;
|
||||
gid = _gid;
|
||||
}
|
||||
|
||||
std::list<uint64_t> kill_ids;
|
||||
std::list<std::pair<uint64_t, Command*>> start_ids;
|
||||
|
||||
for (auto i : id_pid) {
|
||||
if (ini.GetSectionSize(id_command[i.first]->ssection.c_str()) == -1) {
|
||||
if (!loadedConf || ini.GetSectionSize(id_command[i.first]->ssection.c_str()) == -1) {
|
||||
/* Server on this port no longer configured; deconfigure it and kill it if required */
|
||||
log_msg(SevInfo, "Deconfigured %s\n", id_command[i.first]->ssection.c_str());
|
||||
|
||||
@ -751,34 +753,36 @@ void load_conf(const char* confpath, uid_t &uid, gid_t &gid, sigset_t* mask, fdb
|
||||
|
||||
/* We've handled deconfigured sections, now look for newly
|
||||
configured sections */
|
||||
CSimpleIniA::TNamesDepend sections;
|
||||
ini.GetAllSections(sections);
|
||||
for (auto i : sections) {
|
||||
if (auto dot = strrchr(i.pItem, '.')) {
|
||||
char* strtol_end;
|
||||
if(loadedConf) {
|
||||
CSimpleIniA::TNamesDepend sections;
|
||||
ini.GetAllSections(sections);
|
||||
for (auto i : sections) {
|
||||
if (auto dot = strrchr(i.pItem, '.')) {
|
||||
char* strtol_end;
|
||||
|
||||
uint64_t id = strtoull(dot + 1, &strtol_end, 10);
|
||||
uint64_t id = strtoull(dot + 1, &strtol_end, 10);
|
||||
|
||||
if (*strtol_end != '\0' || !(id > 0)) {
|
||||
log_msg(SevError, "Found bogus id in %s\n", i.pItem);
|
||||
} else {
|
||||
if (!id_pid.count(id)) {
|
||||
/* Found something we haven't yet started */
|
||||
Command *cmd;
|
||||
if (*strtol_end != '\0' || !(id > 0)) {
|
||||
log_msg(SevError, "Found bogus id in %s\n", i.pItem);
|
||||
} else {
|
||||
if (!id_pid.count(id)) {
|
||||
/* Found something we haven't yet started */
|
||||
Command *cmd;
|
||||
|
||||
auto itr = id_command.find(id);
|
||||
if(itr != id_command.end()) {
|
||||
cmd = itr->second;
|
||||
}
|
||||
else {
|
||||
std::string section(i.pItem, dot - i.pItem);
|
||||
cmd = new Command(ini, section, id, rfds, maxfd);
|
||||
id_command[id] = cmd;
|
||||
}
|
||||
auto itr = id_command.find(id);
|
||||
if(itr != id_command.end()) {
|
||||
cmd = itr->second;
|
||||
}
|
||||
else {
|
||||
std::string section(i.pItem, dot - i.pItem);
|
||||
cmd = new Command(ini, section, id, rfds, maxfd);
|
||||
id_command[id] = cmd;
|
||||
}
|
||||
|
||||
if(cmd->fork_retry_time <= timer()) {
|
||||
log_msg(SevInfo, "Starting %s\n", i.pItem);
|
||||
start_process(cmd, id, uid, gid, 0, mask);
|
||||
if(cmd->fork_retry_time <= timer()) {
|
||||
log_msg(SevInfo, "Starting %s\n", i.pItem);
|
||||
start_process(cmd, id, uid, gid, 0, mask);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -819,6 +823,48 @@ void read_child_output( Command* cmd, int pipe_idx, fdb_fd_set fds ) {
|
||||
}
|
||||
|
||||
#ifdef __APPLE__
|
||||
void watch_conf_dir( int kq, int* confd_fd, std::string confdir ) {
|
||||
struct kevent ev;
|
||||
std::string original = confdir;
|
||||
|
||||
while(true) {
|
||||
/* If already watching, drop it and close */
|
||||
if ( *confd_fd >= 0 ) {
|
||||
EV_SET( &ev, *confd_fd, EVFILT_VNODE, EV_DELETE, NOTE_WRITE, 0, NULL );
|
||||
kevent( kq, &ev, 1, NULL, 0, NULL );
|
||||
close( *confd_fd );
|
||||
}
|
||||
|
||||
confdir = original;
|
||||
std::string child = confdir;
|
||||
|
||||
/* Find the nearest existing ancestor */
|
||||
while( (*confd_fd = open( confdir.c_str(), O_EVTONLY )) < 0 && errno == ENOENT ) {
|
||||
child = confdir;
|
||||
confdir = parentDirectory(confdir);
|
||||
}
|
||||
|
||||
if ( *confd_fd >= 0 ) {
|
||||
EV_SET( &ev, *confd_fd, EVFILT_VNODE, EV_ADD | EV_CLEAR, NOTE_WRITE, 0, NULL );
|
||||
kevent( kq, &ev, 1, NULL, 0, NULL );
|
||||
|
||||
/* If our child appeared since we last tested it, start over from the beginning */
|
||||
if ( confdir != child && (access(child.c_str(), F_OK) == 0 || errno != ENOENT) ) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if(confdir != original) {
|
||||
log_msg(SevInfo, "Watching parent directory of missing directory %s\n", child.c_str());
|
||||
}
|
||||
else {
|
||||
log_msg(SevInfo, "Watching conf dir %s\n", confdir.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void watch_conf_file( int kq, int* conff_fd, const char* confpath ) {
|
||||
struct kevent ev;
|
||||
|
||||
@ -839,14 +885,12 @@ void watch_conf_file( int kq, int* conff_fd, const char* confpath ) {
|
||||
#endif
|
||||
|
||||
#ifdef __linux__
|
||||
void fdbmon_stat(const char *path, struct stat *path_stat, bool is_link) {
|
||||
int result = is_link ? lstat(path, path_stat) : stat(path, path_stat);
|
||||
if(result) {
|
||||
log_err(is_link ? "lstat" : "stat", errno, "Unable to stat %s", path);
|
||||
exit(1);
|
||||
}
|
||||
int fdbmon_stat(const char *path, struct stat *path_stat, bool is_link) {
|
||||
return is_link ? lstat(path, path_stat) : stat(path, path_stat);
|
||||
}
|
||||
|
||||
/* Sets watches to track changes to all symlinks on a path.
|
||||
* Also sets a watch on the last existing ancestor of a path if the full path doesn't exist. */
|
||||
std::unordered_map<int, std::unordered_set<std::string>> set_watches(std::string path, int ifd) {
|
||||
std::unordered_map<int, std::unordered_set<std::string>> additional_watch_wds;
|
||||
struct stat path_stat;
|
||||
@ -855,33 +899,67 @@ std::unordered_map<int, std::unordered_set<std::string>> set_watches(std::string
|
||||
return additional_watch_wds;
|
||||
|
||||
int idx = 1;
|
||||
while(idx != std::string::npos) {
|
||||
bool exists = true;
|
||||
|
||||
/* Check each level of the path, setting a watch on any symlinks.
|
||||
* Stop checking once we get to a part of the path that doesn't exist.
|
||||
* If we encounter a non-existing path, watch the closest existing ancestor. */
|
||||
while(idx != std::string::npos && exists) {
|
||||
idx = path.find_first_of('/', idx+1);
|
||||
std::string subpath = path.substr(0, idx);
|
||||
|
||||
int level = 0;
|
||||
while(true) {
|
||||
if(level++ == 100) {
|
||||
log_msg(SevError, "Too many nested symlinks in path %s\n", path.c_str());
|
||||
exit(1);
|
||||
/* Check path existence */
|
||||
int result = fdbmon_stat(subpath.c_str(), &path_stat, true);
|
||||
if(result != 0) {
|
||||
if(errno == ENOENT) {
|
||||
exists = false;
|
||||
}
|
||||
else {
|
||||
log_err("lstat", errno, "Unable to stat %s", path.c_str());
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
fdbmon_stat(subpath.c_str(), &path_stat, true);
|
||||
if(!S_ISLNK(path_stat.st_mode)) {
|
||||
break;
|
||||
if(exists) {
|
||||
/* Don't do anything for existing non-links */
|
||||
if(!S_ISLNK(path_stat.st_mode)) {
|
||||
break;
|
||||
}
|
||||
else if(level++ == 100) {
|
||||
log_msg(SevError, "Too many nested symlinks in path %s\n", path.c_str());
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
std::string parent = parentDirectory(subpath);
|
||||
|
||||
/* Watch the parent directory of the current path for changes */
|
||||
int wd = inotify_add_watch(ifd, parent.c_str(), IN_CREATE | IN_MOVED_TO);
|
||||
if (wd < 0) {
|
||||
log_err("inotify_add_watch", errno, "Unable to add watch to parent directory %s", parent.c_str());
|
||||
exit(1);
|
||||
}
|
||||
|
||||
log_msg(SevInfo, "Watching parent directory of symlink %s (%d)\n", subpath.c_str(), wd);
|
||||
additional_watch_wds[wd].insert(subpath.substr(parent.size()+1));
|
||||
if(exists) {
|
||||
log_msg(SevInfo, "Watching parent directory of symlink %s (%d)\n", subpath.c_str(), wd);
|
||||
additional_watch_wds[wd].insert(subpath.substr(parent.size()+1));
|
||||
}
|
||||
else {
|
||||
/* If the subpath has appeared since we set the watch, we should cancel it and resume traversing the path */
|
||||
int result = fdbmon_stat(subpath.c_str(), &path_stat, true);
|
||||
if(result == 0 || errno != ENOENT) {
|
||||
inotify_rm_watch(ifd, wd);
|
||||
continue;
|
||||
}
|
||||
|
||||
log_msg(SevInfo, "Watching parent directory of missing directory %s (%d)\n", subpath.c_str(), wd);
|
||||
additional_watch_wds[wd].insert(subpath.substr(parent.size()+1));
|
||||
break;
|
||||
}
|
||||
|
||||
/* Follow the symlink */
|
||||
char buf[PATH_MAX+1];
|
||||
ssize_t len = readlink(subpath.c_str(), buf, PATH_MAX);
|
||||
if(len < 0) {
|
||||
@ -1088,13 +1166,12 @@ int main(int argc, char** argv) {
|
||||
EV_SET( &ev, SIGCHLD, EVFILT_SIGNAL, EV_ADD, 0, 0, NULL);
|
||||
kevent( kq, &ev, 1, NULL, 0, NULL );
|
||||
|
||||
int confd_fd = open(confdir.c_str(), O_EVTONLY);
|
||||
int confd_fd = -1;
|
||||
int conff_fd = -1;
|
||||
|
||||
// Watch the directory holding the configuration file
|
||||
EV_SET( &ev, confd_fd, EVFILT_VNODE, EV_ADD | EV_CLEAR, NOTE_WRITE, 0, NULL );
|
||||
kevent( kq, &ev, 1, NULL, 0, NULL );
|
||||
watch_conf_dir( kq, &confd_fd, confdir );
|
||||
|
||||
int conff_fd = -1;
|
||||
#endif
|
||||
|
||||
#ifdef __linux__
|
||||
@ -1131,28 +1208,43 @@ int main(int argc, char** argv) {
|
||||
if (reload) {
|
||||
reload = false;
|
||||
#ifdef __linux__
|
||||
/* Remove existing watches on conf file and directory */
|
||||
if(confdir_wd >= 0 && inotify_rm_watch(ifd, confdir_wd) < 0) {
|
||||
log_msg(SevInfo, "Could not remove inotify conf dir watch, continuing...\n");
|
||||
}
|
||||
if(conffile_wd >= 0 && inotify_rm_watch(ifd, conffile_wd) < 0) {
|
||||
log_msg(SevInfo, "Could not remove inotify conf file watch, continuing...\n");
|
||||
}
|
||||
|
||||
/* Create new watches */
|
||||
conffile_wd = inotify_add_watch(ifd, confpath.c_str(), IN_CLOSE_WRITE);
|
||||
if (conffile_wd < 0) {
|
||||
log_err("inotify_add_watch", errno, "Unable to set watch on configuration file %s", confpath.c_str());
|
||||
exit(1); // Deleting the conf file causes fdbmonitor to terminate
|
||||
if(errno != ENOENT) {
|
||||
log_err("inotify_add_watch", errno, "Unable to set watch on configuration file %s", confpath.c_str());
|
||||
exit(1);
|
||||
}
|
||||
else {
|
||||
log_msg(SevInfo, "Conf file has been deleted %s\n", confpath.c_str());
|
||||
}
|
||||
} else {
|
||||
log_msg(SevInfo, "Watching config file %s\n", confpath.c_str());
|
||||
log_msg(SevInfo, "Watching conf file %s\n", confpath.c_str());
|
||||
}
|
||||
|
||||
confdir_wd = inotify_add_watch(ifd, confdir.c_str(), IN_CLOSE_WRITE | IN_MOVED_TO);
|
||||
if (confdir_wd < 0) {
|
||||
log_err("inotify_add_watch", errno, "Unable to set watch on configuration file parent directory %s", confdir.c_str());
|
||||
exit(1);
|
||||
if(errno != ENOENT) {
|
||||
log_err("inotify_add_watch", errno, "Unable to set watch on configuration file parent directory %s", confdir.c_str());
|
||||
exit(1);
|
||||
}
|
||||
else {
|
||||
reload_additional_watches = true;
|
||||
log_msg(SevInfo, "Conf dir has been deleted %s\n", confdir.c_str());
|
||||
}
|
||||
} else {
|
||||
log_msg(SevInfo, "Watching config dir %s (%d)\n", confdir.c_str(), confdir_wd);
|
||||
log_msg(SevInfo, "Watching conf dir %s (%d)\n", confdir.c_str(), confdir_wd);
|
||||
}
|
||||
|
||||
/* Reload watches on symlinks and/or the oldest existing ancestor */
|
||||
if(reload_additional_watches) {
|
||||
additional_watch_wds = set_watches(_confpath, ifd);
|
||||
}
|
||||
@ -1162,6 +1254,7 @@ int main(int argc, char** argv) {
|
||||
#elif defined(__APPLE__)
|
||||
load_conf( confpath.c_str(), uid, gid, &normal_mask, watched_fds, &maxfd );
|
||||
watch_conf_file( kq, &conff_fd, confpath.c_str() );
|
||||
watch_conf_dir( kq, &confd_fd, confdir );
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -59,6 +59,7 @@ BlobStoreEndpoint::BlobKnobs::BlobKnobs() {
|
||||
multipart_max_part_size = CLIENT_KNOBS->BLOBSTORE_MULTIPART_MAX_PART_SIZE;
|
||||
multipart_min_part_size = CLIENT_KNOBS->BLOBSTORE_MULTIPART_MIN_PART_SIZE;
|
||||
concurrent_uploads = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_UPLOADS;
|
||||
concurrent_lists = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_LISTS;
|
||||
concurrent_reads_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_READS_PER_FILE;
|
||||
concurrent_writes_per_file = CLIENT_KNOBS->BLOBSTORE_CONCURRENT_WRITES_PER_FILE;
|
||||
read_block_size = CLIENT_KNOBS->BLOBSTORE_READ_BLOCK_SIZE;
|
||||
@ -80,6 +81,7 @@ bool BlobStoreEndpoint::BlobKnobs::set(StringRef name, int value) {
|
||||
TRY_PARAM(multipart_max_part_size, maxps);
|
||||
TRY_PARAM(multipart_min_part_size, minps);
|
||||
TRY_PARAM(concurrent_uploads, cu);
|
||||
TRY_PARAM(concurrent_lists, cl);
|
||||
TRY_PARAM(concurrent_reads_per_file, crpf);
|
||||
TRY_PARAM(concurrent_writes_per_file, cwpf);
|
||||
TRY_PARAM(read_block_size, rbs);
|
||||
@ -106,6 +108,7 @@ std::string BlobStoreEndpoint::BlobKnobs::getURLParameters() const {
|
||||
_CHECK_PARAM(multipart_max_part_size, maxps);
|
||||
_CHECK_PARAM(multipart_min_part_size, minps);
|
||||
_CHECK_PARAM(concurrent_uploads, cu);
|
||||
_CHECK_PARAM(concurrent_lists, cl);
|
||||
_CHECK_PARAM(concurrent_reads_per_file, crpf);
|
||||
_CHECK_PARAM(concurrent_writes_per_file, cwpf);
|
||||
_CHECK_PARAM(read_block_size, rbs);
|
||||
@ -417,7 +420,6 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoi
|
||||
// This must be done AFTER the connection is ready because if credentials are coming from disk they are refreshed
|
||||
// when a new connection is established and setAuthHeaders() would need the updated secret.
|
||||
bstore->setAuthHeaders(verb, resource, headers);
|
||||
|
||||
remoteAddress = rconn.conn->getPeerAddress();
|
||||
Void _ = wait(bstore->requestRate->getAllowance(1));
|
||||
state Reference<HTTP::Response> r = wait(timeoutError(HTTP::doRequest(rconn.conn, verb, resource, headers, &contentCopy, contentLen, bstore->sendRate, &bstore->s_stats.bytes_sent, bstore->recvRate), bstore->knobs.request_timeout));
|
||||
@ -478,16 +480,18 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoi
|
||||
}
|
||||
|
||||
if(retryable) {
|
||||
// If retrying, obey the Retry-After response header if present.
|
||||
auto iRetryAfter = r->headers.find("Retry-After");
|
||||
if(iRetryAfter != r->headers.end()) {
|
||||
event.detail("RetryAfterHeader", iRetryAfter->second);
|
||||
char *pEnd;
|
||||
double retryAfter = strtod(iRetryAfter->second.c_str(), &pEnd);
|
||||
if(*pEnd) // If there were other characters then don't trust the parsed value, use a probably safe value of 5 minutes.
|
||||
retryAfter = 300;
|
||||
// Update delay
|
||||
delay = std::max(delay, retryAfter);
|
||||
// If r is valid then obey the Retry-After response header if present.
|
||||
if(r) {
|
||||
auto iRetryAfter = r->headers.find("Retry-After");
|
||||
if(iRetryAfter != r->headers.end()) {
|
||||
event.detail("RetryAfterHeader", iRetryAfter->second);
|
||||
char *pEnd;
|
||||
double retryAfter = strtod(iRetryAfter->second.c_str(), &pEnd);
|
||||
if(*pEnd) // If there were other characters then don't trust the parsed value, use a probably safe value of 5 minutes.
|
||||
retryAfter = 300;
|
||||
// Update delay
|
||||
delay = std::max(delay, retryAfter);
|
||||
}
|
||||
}
|
||||
|
||||
// Log the delay then wait.
|
||||
@ -498,7 +502,7 @@ ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<BlobStoreEndpoi
|
||||
// We can't retry, so throw something.
|
||||
|
||||
// This error code means the authentication header was not accepted, likely the account or key is wrong.
|
||||
if(r->code == 406)
|
||||
if(r && r->code == 406)
|
||||
throw http_not_accepted();
|
||||
|
||||
throw http_request_failed();
|
||||
@ -526,8 +530,12 @@ ACTOR Future<Void> listBucketStream_impl(Reference<BlobStoreEndpoint> bstore, st
|
||||
state std::vector<Future<Void>> subLists;
|
||||
|
||||
while(more) {
|
||||
Void _ = wait(bstore->concurrentLists.take());
|
||||
state FlowLock::Releaser listReleaser(bstore->concurrentLists, 1);
|
||||
|
||||
HTTP::Headers headers;
|
||||
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource + HTTP::urlEncode(lastFile), headers, NULL, 0, {200}));
|
||||
listReleaser.release();
|
||||
|
||||
try {
|
||||
BlobStoreEndpoint::ListResult result;
|
||||
@ -535,11 +543,13 @@ ACTOR Future<Void> listBucketStream_impl(Reference<BlobStoreEndpoint> bstore, st
|
||||
json_spirit::mValue json;
|
||||
json_spirit::read_string(r->content, json);
|
||||
JSONDoc doc(json);
|
||||
|
||||
std::string isTruncated;
|
||||
if (!doc.tryGet("truncated", more)) {
|
||||
doc.get("ListBucketResult.IsTruncated", isTruncated);
|
||||
more = isTruncated == "false" ? false : true;
|
||||
}
|
||||
|
||||
if (doc.has("results")) {
|
||||
for (auto &jsonObject : doc.at("results").get_array()) {
|
||||
JSONDoc objectDoc(jsonObject);
|
||||
@ -549,6 +559,7 @@ ACTOR Future<Void> listBucketStream_impl(Reference<BlobStoreEndpoint> bstore, st
|
||||
result.objects.push_back(std::move(object));
|
||||
}
|
||||
}
|
||||
|
||||
if(doc.has("ListBucketResult.Contents")) {
|
||||
if (doc.at("ListBucketResult.Contents").type() == json_spirit::array_type) {
|
||||
for (auto &jsonObject : doc.at("ListBucketResult.Contents").get_array()) {
|
||||
@ -572,22 +583,26 @@ ACTOR Future<Void> listBucketStream_impl(Reference<BlobStoreEndpoint> bstore, st
|
||||
result.objects.push_back(std::move(object));
|
||||
}
|
||||
}
|
||||
|
||||
if(doc.has("CommonPrefixes")) {
|
||||
for(auto &jsonObject : doc.at("CommonPrefixes").get_array()) {
|
||||
JSONDoc objectDoc(jsonObject);
|
||||
std::string p;
|
||||
objectDoc.get("Prefix", p);
|
||||
result.commonPrefixes.push_back(p);
|
||||
if(maxDepth > 0) {
|
||||
// If recursing, queue a sub-request, otherwise add the common prefix to the result.
|
||||
if(maxDepth > 0)
|
||||
subLists.push_back(bstore->listBucketStream(bucket, results, p, delimiter, maxDepth - 1));
|
||||
}
|
||||
else
|
||||
result.commonPrefixes.push_back(p);
|
||||
}
|
||||
}
|
||||
|
||||
if(!result.objects.empty())
|
||||
lastFile = result.objects.back().name;
|
||||
if(!result.commonPrefixes.empty() && lastFile < result.commonPrefixes.back())
|
||||
lastFile = result.commonPrefixes.back();
|
||||
if(more) {
|
||||
if(!result.objects.empty())
|
||||
lastFile = result.objects.back().name;
|
||||
if(!result.commonPrefixes.empty() && lastFile < result.commonPrefixes.back())
|
||||
lastFile = result.commonPrefixes.back();
|
||||
}
|
||||
|
||||
results.send(result);
|
||||
} catch(Error &e) {
|
||||
|
@ -53,10 +53,11 @@ public:
|
||||
request_tries,
|
||||
request_timeout,
|
||||
requests_per_second,
|
||||
concurrent_requests,
|
||||
multipart_max_part_size,
|
||||
multipart_min_part_size,
|
||||
concurrent_requests,
|
||||
concurrent_uploads,
|
||||
concurrent_lists,
|
||||
concurrent_reads_per_file,
|
||||
concurrent_writes_per_file,
|
||||
read_block_size,
|
||||
@ -74,10 +75,11 @@ public:
|
||||
"request_tries (or rt) Number of times to try each request until a parseable HTTP response other than 429 is received.",
|
||||
"request_timeout (or rto) Number of seconds to wait for a request to succeed after a connection is established.",
|
||||
"requests_per_second (or rps) Max number of requests to start per second.",
|
||||
"concurrent_requests (or cr) Max number of requests in progress at once.",
|
||||
"multipart_max_part_size (or maxps) Max part size for multipart uploads.",
|
||||
"multipart_min_part_size (or minps) Min part size for multipart uploads.",
|
||||
"concurrent_requests (or cr) Max number of total requests in progress at once, regardless of operation-specific concurrency limits.",
|
||||
"concurrent_uploads (or cu) Max concurrent uploads (part or whole) that can be in progress at once.",
|
||||
"concurrent_lists (or cl) Max concurrent list operations that can be in progress at once.",
|
||||
"concurrent_reads_per_file (or crps) Max concurrent reads in progress for any one file.",
|
||||
"concurrent_writes_per_file (or cwps) Max concurrent uploads in progress for any one file.",
|
||||
"read_block_size (or rbs) Block size in bytes to be used for reads.",
|
||||
@ -95,7 +97,8 @@ public:
|
||||
sendRate(new SpeedLimit(knobs.max_send_bytes_per_second, 1)),
|
||||
recvRate(new SpeedLimit(knobs.max_recv_bytes_per_second, 1)),
|
||||
concurrentRequests(knobs.concurrent_requests),
|
||||
concurrentUploads(knobs.concurrent_uploads) {
|
||||
concurrentUploads(knobs.concurrent_uploads),
|
||||
concurrentLists(knobs.concurrent_lists) {
|
||||
|
||||
if(host.empty())
|
||||
throw connection_string_invalid();
|
||||
@ -133,6 +136,7 @@ public:
|
||||
Reference<IRateControl> recvRate;
|
||||
FlowLock concurrentRequests;
|
||||
FlowLock concurrentUploads;
|
||||
FlowLock concurrentLists;
|
||||
|
||||
Future<Void> updateSecret();
|
||||
|
||||
|
@ -1542,11 +1542,11 @@ static StatusObject faultToleranceStatusFetcher(DatabaseConfiguration configurat
|
||||
}
|
||||
|
||||
static std::string getIssueDescription(std::string name) {
|
||||
if(name == "unable_to_write_cluster_file") {
|
||||
return "Unable to update cluster file.";
|
||||
if(name == "incorrect_cluster_file_contents") {
|
||||
return "Cluster file contents do not match current cluster connection string. Verify cluster file is writable and has not been overwritten externally.";
|
||||
}
|
||||
|
||||
// FIXME: name and description will be the same unless the message is 'unable_to_write_cluster_file', which is currently the only possible message
|
||||
// FIXME: name and description will be the same unless the message is 'incorrect_cluster_file_contents', which is currently the only possible message
|
||||
return name;
|
||||
}
|
||||
|
||||
|
@ -446,7 +446,7 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
|
||||
|
||||
ClusterConnectionString fileConnectionString;
|
||||
if (connFile && !connFile->fileContentsUpToDate(fileConnectionString)) {
|
||||
req.issues = LiteralStringRef("unable_to_write_cluster_file");
|
||||
req.issues = LiteralStringRef("incorrect_cluster_file_contents");
|
||||
if(connFile->canGetFilename()) {
|
||||
TraceEvent(SevWarnAlways, "IncorrectClusterFileContents").detail("Filename", connFile->getFilename())
|
||||
.detail("ConnectionStringFromFile", fileConnectionString.toString())
|
||||
|
@ -26,14 +26,14 @@
|
||||
|
||||
//A workload which test the correctness of backup and restore process
|
||||
struct AtomicRestoreWorkload : TestWorkload {
|
||||
double startAfter, switch1After;
|
||||
double startAfter, restoreAfter;
|
||||
Standalone<VectorRef<KeyRangeRef>> backupRanges;
|
||||
|
||||
AtomicRestoreWorkload(WorkloadContext const& wcx)
|
||||
: TestWorkload(wcx) {
|
||||
|
||||
startAfter = getOption(options, LiteralStringRef("startAfter"), 10.0);
|
||||
switch1After = getOption(options, LiteralStringRef("switch1After"), 20.0);
|
||||
restoreAfter = getOption(options, LiteralStringRef("restoreAfter"), 20.0);
|
||||
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
|
||||
}
|
||||
|
||||
@ -60,10 +60,10 @@ struct AtomicRestoreWorkload : TestWorkload {
|
||||
|
||||
ACTOR static Future<Void> _start(Database cx, AtomicRestoreWorkload* self) {
|
||||
state FileBackupAgent backupAgent;
|
||||
state Future<Void> switch1After = delay(self->switch1After);
|
||||
state Future<Void> disabler = disableConnectionFailuresAfter(300, "atomicRestore");
|
||||
Void _ = wait( delay(self->startAfter) );
|
||||
TraceEvent("AR_Start");
|
||||
|
||||
Void _ = wait( delay(self->startAfter * g_random->random01()) );
|
||||
TraceEvent("AtomicRestore_Start");
|
||||
|
||||
state std::string backupContainer = "file://simfdb/backups/";
|
||||
try {
|
||||
@ -74,11 +74,11 @@ struct AtomicRestoreWorkload : TestWorkload {
|
||||
throw;
|
||||
}
|
||||
|
||||
TraceEvent("AS_Wait1");
|
||||
TraceEvent("AtomicRestore_Wait");
|
||||
int _ = wait( backupAgent.waitBackup(cx, BackupAgentBase::getDefaultTagName(), false) );
|
||||
TraceEvent("AS_Ready1");
|
||||
Void _ = wait( switch1After );
|
||||
TraceEvent("AS_Switch1");
|
||||
TraceEvent("AtomicRestore_BackupStart");
|
||||
Void _ = wait( delay(self->restoreAfter * g_random->random01()) );
|
||||
TraceEvent("AtomicRestore_RestoreStart");
|
||||
|
||||
loop {
|
||||
std::vector<Future<Version>> restores;
|
||||
@ -102,7 +102,7 @@ struct AtomicRestoreWorkload : TestWorkload {
|
||||
g_simulator.backupAgents = ISimulator::NoBackupAgents;
|
||||
}
|
||||
|
||||
TraceEvent("AS_Done");
|
||||
TraceEvent("AtomicRestore_Done");
|
||||
return Void();
|
||||
}
|
||||
};
|
||||
|
@ -41,7 +41,7 @@ struct IDispatched {
|
||||
};
|
||||
|
||||
#define REGISTER_DISPATCHED(Type, Instance, Key, Func) struct Type##Instance { Type##Instance() { ASSERT(Type::dispatches().find(Key) == Type::dispatches().end()); Type::dispatches()[Key] = Func; } }; Type##Instance _Type##Instance
|
||||
|
||||
#define REGISTER_DISPATCHED_ALIAS(Type, Instance, Target, Alias) struct Type##Instance { Type##Instance() { ASSERT(Type::dispatches().find(Alias) == Type::dispatches().end()); ASSERT(Type::dispatches().find(Target) != Type::dispatches().end()); Type::dispatches()[Alias] = Type::dispatches()[Target]; } }; Type##Instance _Type##Instance;
|
||||
#define REGISTER_COMMAND(Type, Instance, Key, Func) REGISTER_DISPATCHED(Type, Instance, Instance::Key, Instance::Func)
|
||||
|
||||
/*
|
||||
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
28
tests/slow/ApiCorrectnessAtomicRestore.txt
Normal file
28
tests/slow/ApiCorrectnessAtomicRestore.txt
Normal file
@ -0,0 +1,28 @@
|
||||
testTitle=ApiCorrectnessTest
|
||||
testName=ApiCorrectness
|
||||
runSetup=true
|
||||
clearAfterTest=true
|
||||
numKeys=5000
|
||||
onlyLowerCase=true
|
||||
shortKeysRatio=0.5
|
||||
minShortKeyLength=1
|
||||
maxShortKeyLength=3
|
||||
minLongKeyLength=1
|
||||
maxLongKeyLength=128
|
||||
minValueLength=1
|
||||
maxValueLength=1000
|
||||
numGets=1000
|
||||
numGetRanges=100
|
||||
numGetRangeSelectors=100
|
||||
numGetKeys=100
|
||||
numClears=100
|
||||
numClearRanges=10
|
||||
maxTransactionBytes=500000
|
||||
randomTestDuration=60
|
||||
timeout=2100
|
||||
|
||||
testName=AtomicRestore
|
||||
startAfter=10.0
|
||||
restoreAfter=50.0
|
||||
clearAfterTest=false
|
||||
simBackupAgents=BackupToFile
|
File diff suppressed because one or more lines are too long
@ -8,7 +8,7 @@ testTitle=WriteDuringReadTest
|
||||
|
||||
testName=AtomicRestore
|
||||
startAfter=10.0
|
||||
switch1After=50.0
|
||||
restoreAfter=50.0
|
||||
clearAfterTest=false
|
||||
simBackupAgents=BackupToFile
|
||||
|
||||
|
@ -100,8 +100,8 @@
|
||||
"description" : "Some clients of this cluster have issues.",
|
||||
"issues" : [
|
||||
{
|
||||
"description" : "Unable to update cluster file.",
|
||||
"name" : "unable_to_write_cluster_file"
|
||||
"description" : "Cluster file contents do not match current cluster connection string. Verify cluster file is writable and has not been overwritten externally.",
|
||||
"name" : "incorrect_cluster_file_contents"
|
||||
}
|
||||
],
|
||||
"name" : "client_issues"
|
||||
|
@ -1,7 +1,7 @@
|
||||
<?xml version="1.0"?>
|
||||
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
|
||||
<PropertyGroup>
|
||||
<Version>5.1.0</Version>
|
||||
<PackageName>5.1</PackageName>
|
||||
<Version>5.2.0</Version>
|
||||
<PackageName>5.2</PackageName>
|
||||
</PropertyGroup>
|
||||
</Project>
|
||||
|
Loading…
x
Reference in New Issue
Block a user