mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-15 18:32:18 +08:00
Compare commits
4 Commits
e47b56c044
...
9d5a768cda
Author | SHA1 | Date | |
---|---|---|---|
|
9d5a768cda | ||
|
9e08874463 | ||
|
4e5d46cce5 | ||
|
deda04b845 |
@ -352,7 +352,8 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
|
||||
TraceEvent(SevError, "OffsetOutOfBoundary")
|
||||
.detail("TotalBytes", totalBytes)
|
||||
.detail("Offset", offset)
|
||||
.detail("Size", value.size());
|
||||
.detail("Version", version)
|
||||
.detail("ValueSize", value.size());
|
||||
throw restore_missing_data();
|
||||
}
|
||||
|
||||
@ -676,7 +677,7 @@ ACTOR Future<Void> readCommitted(Database cx,
|
||||
|
||||
// iterate on a version range, each key-value pair is (version, part)
|
||||
for (auto& s : rangevalue) {
|
||||
uint64_t groupKey = groupBy(s.key).first;
|
||||
Version groupKey = groupBy(s.key).first; // mutation's commit version
|
||||
// TraceEvent("Log_ReadCommitted")
|
||||
// .detail("GroupKey", groupKey)
|
||||
// .detail("SkipGroup", skipGroup)
|
||||
@ -761,10 +762,8 @@ ACTOR Future<Void> sendCommitTransactionRequest(CommitTransactionRequest req,
|
||||
NotifiedVersion* committedVersion,
|
||||
int* totalBytes,
|
||||
int* mutationSize,
|
||||
PromiseStream<Future<Void>> addActor,
|
||||
FlowLock* commitLock,
|
||||
PublicRequestStream<CommitTransactionRequest> commit,
|
||||
bool tenantMapChanging) {
|
||||
PublicRequestStream<CommitTransactionRequest> commit) {
|
||||
Key applyBegin = uid.withPrefix(applyMutationsBeginRange.begin);
|
||||
Key versionKey = BinaryWriter::toValue(newBeginVersion, Unversioned());
|
||||
Key rangeEnd = getApplyKey(newBeginVersion, uid);
|
||||
@ -787,16 +786,18 @@ ACTOR Future<Void> sendCommitTransactionRequest(CommitTransactionRequest req,
|
||||
*totalBytes += *mutationSize;
|
||||
wait(commitLock->take(TaskPriority::DefaultYield, *mutationSize));
|
||||
Future<Void> commitAndUnlock = commitLock->releaseWhen(success(commit.getReply(req)), *mutationSize);
|
||||
if (tenantMapChanging) {
|
||||
// If tenant map is changing, we need to wait until it's committed before processing next mutations.
|
||||
// Next muations need the updated tenant map for filtering.
|
||||
wait(commitAndUnlock);
|
||||
} else {
|
||||
addActor.send(commitAndUnlock);
|
||||
}
|
||||
// If tenant map is changing, we need to wait until it's committed before processing next mutations.
|
||||
// Next muations need the updated tenant map for filtering.
|
||||
// Because we are bumping applyBegin version, we need to wait for the commit to be done.
|
||||
// Otherwise, an update to the applyEnd key will trigger another applyMutation() which can
|
||||
// have an overlapping range with the current applyMutation() and cause conflicts.
|
||||
wait(commitAndUnlock);
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Decodes the backup mutation log and send the mutations to the CommitProxy.
|
||||
// The mutation logs are grouped by version and passed in as a stream of RCGroup from readCommitted().
|
||||
// The mutations are then decoded and sent to the CommitProxy in a batch.
|
||||
ACTOR Future<int> kvMutationLogToTransactions(Database cx,
|
||||
PromiseStream<RCGroup> results,
|
||||
Reference<FlowLock> lock,
|
||||
@ -807,7 +808,6 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
|
||||
NotifiedVersion* committedVersion,
|
||||
Optional<Version> endVersion,
|
||||
Key rangeBegin,
|
||||
PromiseStream<Future<Void>> addActor,
|
||||
FlowLock* commitLock,
|
||||
Reference<KeyRangeMap<Version>> keyVersion,
|
||||
std::map<int64_t, TenantName>* tenantMap,
|
||||
@ -872,10 +872,8 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
|
||||
committedVersion,
|
||||
&totalBytes,
|
||||
&mutationSize,
|
||||
addActor,
|
||||
commitLock,
|
||||
commit,
|
||||
false));
|
||||
commit));
|
||||
req = CommitTransactionRequest();
|
||||
mutationSize = 0;
|
||||
}
|
||||
@ -908,17 +906,9 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
|
||||
throw;
|
||||
}
|
||||
}
|
||||
wait(sendCommitTransactionRequest(req,
|
||||
uid,
|
||||
newBeginVersion,
|
||||
rangeBegin,
|
||||
committedVersion,
|
||||
&totalBytes,
|
||||
&mutationSize,
|
||||
addActor,
|
||||
commitLock,
|
||||
commit,
|
||||
tenantMapChanging));
|
||||
// TraceEvent("MutationLogRestore").detail("BeginVersion", newBeginVersion);
|
||||
wait(sendCommitTransactionRequest(
|
||||
req, uid, newBeginVersion, rangeBegin, committedVersion, &totalBytes, &mutationSize, commitLock, commit));
|
||||
if (endOfStream) {
|
||||
return totalBytes;
|
||||
}
|
||||
@ -999,6 +989,7 @@ ACTOR Future<Void> applyMutations(Database cx,
|
||||
try {
|
||||
loop {
|
||||
if (beginVersion >= *endVersion) {
|
||||
// Why do we need to take a lock here?
|
||||
wait(commitLock.take(TaskPriority::DefaultYield, CLIENT_KNOBS->BACKUP_LOCK_BYTES));
|
||||
commitLock.release(CLIENT_KNOBS->BACKUP_LOCK_BYTES);
|
||||
if (beginVersion >= *endVersion) {
|
||||
@ -1043,7 +1034,6 @@ ACTOR Future<Void> applyMutations(Database cx,
|
||||
committedVersion,
|
||||
idx == ranges.size() - 1 ? newEndVersion : Optional<Version>(),
|
||||
ranges[idx].begin,
|
||||
addActor,
|
||||
&commitLock,
|
||||
keyVersion,
|
||||
tenantMap,
|
||||
|
@ -5572,6 +5572,7 @@ struct RestoreDispatchPartitionedTaskFunc : RestoreTaskFuncBase {
|
||||
};
|
||||
StringRef RestoreDispatchPartitionedTaskFunc::name = "restore_dispatch_partitioned"_sr;
|
||||
REGISTER_TASKFUNC(RestoreDispatchPartitionedTaskFunc);
|
||||
|
||||
struct RestoreDispatchTaskFunc : RestoreTaskFuncBase {
|
||||
static StringRef name;
|
||||
static constexpr uint32_t version = 1;
|
||||
|
@ -531,12 +531,13 @@ public:
|
||||
|
||||
using RangeResultWithVersion = std::pair<RangeResult, Version>;
|
||||
|
||||
// RCGroup contains the backup mutations for a commit version, i.e., groupKey.
|
||||
struct RCGroup {
|
||||
RangeResult items;
|
||||
Version version; // this is read version for this group
|
||||
uint64_t groupKey; // this is the original version for this group
|
||||
Version groupKey; // this is the original commit version for this group
|
||||
|
||||
RCGroup() : version(-1), groupKey(ULLONG_MAX){};
|
||||
RCGroup() : version(-1), groupKey(ULLONG_MAX) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
@ -580,6 +581,8 @@ ACTOR Future<Void> readCommitted(Database cx,
|
||||
Terminator terminator = Terminator::True,
|
||||
AccessSystemKeys systemAccess = AccessSystemKeys::False,
|
||||
LockAware lockAware = LockAware::False);
|
||||
|
||||
// Applies the mutations between the beginVersion and endVersion to the database during a restore.
|
||||
ACTOR Future<Void> applyMutations(Database cx,
|
||||
Key uid,
|
||||
Key addPrefix,
|
||||
|
@ -1571,7 +1571,7 @@ ACTOR Future<Void> bulkLoadJobWaitUntilTaskCompleteOrError(Reference<DataDistrib
|
||||
}
|
||||
}
|
||||
|
||||
// Given a list of manifestEntry, create a bulkload task and wait until the task is complete or error.
|
||||
// Given a list of manifestEntry, create a bulkload task.
|
||||
// There is an invariant by bulkload engine: if a task metadata is persisted, the task
|
||||
// is guaranteed to be eventually marked as complete or error.
|
||||
ACTOR Future<Void> bulkLoadJobNewTask(Reference<DataDistributor> self,
|
||||
@ -1621,15 +1621,6 @@ ACTOR Future<Void> bulkLoadJobNewTask(Reference<DataDistributor> self,
|
||||
TraceEvent(SevWarnAlways, "DDBulkLoadJobExecutorInjectDDRestart", self->ddId).detail("Context", "New");
|
||||
throw movekeys_conflict(); // improve code coverage
|
||||
}
|
||||
|
||||
// Step 4: Monitor the bulkload completion
|
||||
wait(bulkLoadJobWaitUntilTaskCompleteOrError(self, jobId, bulkLoadTask));
|
||||
TraceEvent(bulkLoadPerfEventSev(), "DDBulkLoadJobExecutorTask", self->ddId)
|
||||
.detail("Phase", "Task complete")
|
||||
.detail("JobID", jobId)
|
||||
.detail("TaskID", bulkLoadTask.getTaskId())
|
||||
.detail("TaskRange", bulkLoadTask.getRange())
|
||||
.detail("Duration", now() - beginTime);
|
||||
self->bulkLoadParallelismLimitor.decrementTaskCounter();
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) {
|
||||
|
@ -20,6 +20,7 @@ enable_read_lock_on_range = true
|
||||
# Do not support version vector
|
||||
enable_version_vector = false
|
||||
enable_version_vector_tlog_unicast = false
|
||||
enable_version_vector_reply_recovery = false
|
||||
|
||||
# Set high enough sample rate to test bytes sampling
|
||||
min_byte_sampling_probability = 0.5
|
||||
|
@ -20,6 +20,7 @@ enable_read_lock_on_range = true
|
||||
# Do not support version vector
|
||||
enable_version_vector = false
|
||||
enable_version_vector_tlog_unicast = false
|
||||
enable_version_vector_reply_recovery = false
|
||||
|
||||
# Set high enough sample rate to test bytes sampling
|
||||
min_byte_sampling_probability = 0.5
|
||||
|
@ -6,6 +6,7 @@ encryptModes = ['disabled'] # do not support encryption
|
||||
enable_read_lock_on_range = true
|
||||
enable_version_vector = false
|
||||
enable_version_vector_tlog_unicast = false
|
||||
enable_version_vector_reply_recovery = false
|
||||
|
||||
[[test]]
|
||||
testTitle = 'RangeLockCycle'
|
||||
|
@ -7,6 +7,7 @@ enable_read_lock_on_range = true
|
||||
transaction_lock_rejection_retriable = false
|
||||
enable_version_vector = false
|
||||
enable_version_vector_tlog_unicast = false
|
||||
enable_version_vector_reply_recovery = false
|
||||
|
||||
[[test]]
|
||||
testTitle = 'RangeLocking'
|
||||
|
@ -7,6 +7,7 @@ allowCreatingTenants = false
|
||||
proxy_use_resolver_private_mutations = false
|
||||
enable_version_vector = false
|
||||
enable_version_vector_tlog_unicast = false
|
||||
enable_version_vector_reply_recovery = false
|
||||
|
||||
[[test]]
|
||||
testTitle = 'RawTenantAccessClean'
|
||||
|
@ -1,5 +1,8 @@
|
||||
[[knobs]]
|
||||
enable_version_vector = false
|
||||
enable_version_vector_tlog_unicast = false
|
||||
enable_version_vector_reply_recovery = false
|
||||
|
||||
max_read_transaction_life_versions = 1000000
|
||||
max_write_transaction_life_versions = 1000000
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user