1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-15 18:32:18 +08:00

Compare commits

...

4 Commits

Author SHA1 Message Date
Zhe Wang
9d5a768cda
Merge 4e5d46cce510c6d4979669ffea4d5e701e4f33f1 into 9e0887446305c7d96d5f9868ea68b0686974ccfb 2025-03-18 17:01:53 -07:00
Dan Lambright
9e08874463
Disable enable_version_vector_reply_recovery in version vector tests. ()
Co-authored-by: Dan Lambright <hlambright@apple.com>
2025-03-18 19:52:29 -04:00
Zhe Wang
4e5d46cce5 make bulkload scheduling more efficient 2025-03-18 15:00:13 -07:00
Jingyu Zhou
deda04b845
Fix a restore bug due to a race ()
Found by simulation:
seed:  -f tests/slow/ApiCorrectnessAtomicRestore.toml -s 177856328 -b on
Commit: 51ad8428e0fbe1d82bc76cf42b1579f51ecf2773
Compiler: clang++
Env: Rhel9 okteto

applyMutations() has processed version 801400000-803141392, and before calling sendCommitTransactionRequest(),
which was going to update apply begin version to 803141392. But DID NOT wait for the transaction commit.

Then there is an update on the apply end version to 845345760, which picks up the PREVIOUS apply begin version 801400000.
Thus started another applyMutation() with version range 801400000-845345760. Note because previous
applyMutation() has finished and didn't wait for the transaction commit, thus the starting version
is wrong. As a result, this applyMutation() re-processed version range 801400000-803141392.

The test failed during re-processing, because mutations are missing for the overlapped range.

The fix is to wait for the transaction to commit in sendCommitTransactionRequest().

This bug probably affects DR as well.

See rdar://146877552

20250317-162835-jzhou-ff4c4d6d7c51bfed
2025-03-17 16:12:33 -07:00
10 changed files with 33 additions and 40 deletions

@ -352,7 +352,8 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
TraceEvent(SevError, "OffsetOutOfBoundary")
.detail("TotalBytes", totalBytes)
.detail("Offset", offset)
.detail("Size", value.size());
.detail("Version", version)
.detail("ValueSize", value.size());
throw restore_missing_data();
}
@ -676,7 +677,7 @@ ACTOR Future<Void> readCommitted(Database cx,
// iterate on a version range, each key-value pair is (version, part)
for (auto& s : rangevalue) {
uint64_t groupKey = groupBy(s.key).first;
Version groupKey = groupBy(s.key).first; // mutation's commit version
// TraceEvent("Log_ReadCommitted")
// .detail("GroupKey", groupKey)
// .detail("SkipGroup", skipGroup)
@ -761,10 +762,8 @@ ACTOR Future<Void> sendCommitTransactionRequest(CommitTransactionRequest req,
NotifiedVersion* committedVersion,
int* totalBytes,
int* mutationSize,
PromiseStream<Future<Void>> addActor,
FlowLock* commitLock,
PublicRequestStream<CommitTransactionRequest> commit,
bool tenantMapChanging) {
PublicRequestStream<CommitTransactionRequest> commit) {
Key applyBegin = uid.withPrefix(applyMutationsBeginRange.begin);
Key versionKey = BinaryWriter::toValue(newBeginVersion, Unversioned());
Key rangeEnd = getApplyKey(newBeginVersion, uid);
@ -787,16 +786,18 @@ ACTOR Future<Void> sendCommitTransactionRequest(CommitTransactionRequest req,
*totalBytes += *mutationSize;
wait(commitLock->take(TaskPriority::DefaultYield, *mutationSize));
Future<Void> commitAndUnlock = commitLock->releaseWhen(success(commit.getReply(req)), *mutationSize);
if (tenantMapChanging) {
// If tenant map is changing, we need to wait until it's committed before processing next mutations.
// Next muations need the updated tenant map for filtering.
wait(commitAndUnlock);
} else {
addActor.send(commitAndUnlock);
}
// If tenant map is changing, we need to wait until it's committed before processing next mutations.
// Next muations need the updated tenant map for filtering.
// Because we are bumping applyBegin version, we need to wait for the commit to be done.
// Otherwise, an update to the applyEnd key will trigger another applyMutation() which can
// have an overlapping range with the current applyMutation() and cause conflicts.
wait(commitAndUnlock);
return Void();
}
// Decodes the backup mutation log and send the mutations to the CommitProxy.
// The mutation logs are grouped by version and passed in as a stream of RCGroup from readCommitted().
// The mutations are then decoded and sent to the CommitProxy in a batch.
ACTOR Future<int> kvMutationLogToTransactions(Database cx,
PromiseStream<RCGroup> results,
Reference<FlowLock> lock,
@ -807,7 +808,6 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
NotifiedVersion* committedVersion,
Optional<Version> endVersion,
Key rangeBegin,
PromiseStream<Future<Void>> addActor,
FlowLock* commitLock,
Reference<KeyRangeMap<Version>> keyVersion,
std::map<int64_t, TenantName>* tenantMap,
@ -872,10 +872,8 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
committedVersion,
&totalBytes,
&mutationSize,
addActor,
commitLock,
commit,
false));
commit));
req = CommitTransactionRequest();
mutationSize = 0;
}
@ -908,17 +906,9 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
throw;
}
}
wait(sendCommitTransactionRequest(req,
uid,
newBeginVersion,
rangeBegin,
committedVersion,
&totalBytes,
&mutationSize,
addActor,
commitLock,
commit,
tenantMapChanging));
// TraceEvent("MutationLogRestore").detail("BeginVersion", newBeginVersion);
wait(sendCommitTransactionRequest(
req, uid, newBeginVersion, rangeBegin, committedVersion, &totalBytes, &mutationSize, commitLock, commit));
if (endOfStream) {
return totalBytes;
}
@ -999,6 +989,7 @@ ACTOR Future<Void> applyMutations(Database cx,
try {
loop {
if (beginVersion >= *endVersion) {
// Why do we need to take a lock here?
wait(commitLock.take(TaskPriority::DefaultYield, CLIENT_KNOBS->BACKUP_LOCK_BYTES));
commitLock.release(CLIENT_KNOBS->BACKUP_LOCK_BYTES);
if (beginVersion >= *endVersion) {
@ -1043,7 +1034,6 @@ ACTOR Future<Void> applyMutations(Database cx,
committedVersion,
idx == ranges.size() - 1 ? newEndVersion : Optional<Version>(),
ranges[idx].begin,
addActor,
&commitLock,
keyVersion,
tenantMap,

@ -5572,6 +5572,7 @@ struct RestoreDispatchPartitionedTaskFunc : RestoreTaskFuncBase {
};
StringRef RestoreDispatchPartitionedTaskFunc::name = "restore_dispatch_partitioned"_sr;
REGISTER_TASKFUNC(RestoreDispatchPartitionedTaskFunc);
struct RestoreDispatchTaskFunc : RestoreTaskFuncBase {
static StringRef name;
static constexpr uint32_t version = 1;

@ -531,12 +531,13 @@ public:
using RangeResultWithVersion = std::pair<RangeResult, Version>;
// RCGroup contains the backup mutations for a commit version, i.e., groupKey.
struct RCGroup {
RangeResult items;
Version version; // this is read version for this group
uint64_t groupKey; // this is the original version for this group
Version groupKey; // this is the original commit version for this group
RCGroup() : version(-1), groupKey(ULLONG_MAX){};
RCGroup() : version(-1), groupKey(ULLONG_MAX) {}
template <class Ar>
void serialize(Ar& ar) {
@ -580,6 +581,8 @@ ACTOR Future<Void> readCommitted(Database cx,
Terminator terminator = Terminator::True,
AccessSystemKeys systemAccess = AccessSystemKeys::False,
LockAware lockAware = LockAware::False);
// Applies the mutations between the beginVersion and endVersion to the database during a restore.
ACTOR Future<Void> applyMutations(Database cx,
Key uid,
Key addPrefix,

@ -1571,7 +1571,7 @@ ACTOR Future<Void> bulkLoadJobWaitUntilTaskCompleteOrError(Reference<DataDistrib
}
}
// Given a list of manifestEntry, create a bulkload task and wait until the task is complete or error.
// Given a list of manifestEntry, create a bulkload task.
// There is an invariant by bulkload engine: if a task metadata is persisted, the task
// is guaranteed to be eventually marked as complete or error.
ACTOR Future<Void> bulkLoadJobNewTask(Reference<DataDistributor> self,
@ -1621,15 +1621,6 @@ ACTOR Future<Void> bulkLoadJobNewTask(Reference<DataDistributor> self,
TraceEvent(SevWarnAlways, "DDBulkLoadJobExecutorInjectDDRestart", self->ddId).detail("Context", "New");
throw movekeys_conflict(); // improve code coverage
}
// Step 4: Monitor the bulkload completion
wait(bulkLoadJobWaitUntilTaskCompleteOrError(self, jobId, bulkLoadTask));
TraceEvent(bulkLoadPerfEventSev(), "DDBulkLoadJobExecutorTask", self->ddId)
.detail("Phase", "Task complete")
.detail("JobID", jobId)
.detail("TaskID", bulkLoadTask.getTaskId())
.detail("TaskRange", bulkLoadTask.getRange())
.detail("Duration", now() - beginTime);
self->bulkLoadParallelismLimitor.decrementTaskCounter();
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {

@ -20,6 +20,7 @@ enable_read_lock_on_range = true
# Do not support version vector
enable_version_vector = false
enable_version_vector_tlog_unicast = false
enable_version_vector_reply_recovery = false
# Set high enough sample rate to test bytes sampling
min_byte_sampling_probability = 0.5

@ -20,6 +20,7 @@ enable_read_lock_on_range = true
# Do not support version vector
enable_version_vector = false
enable_version_vector_tlog_unicast = false
enable_version_vector_reply_recovery = false
# Set high enough sample rate to test bytes sampling
min_byte_sampling_probability = 0.5

@ -6,6 +6,7 @@ encryptModes = ['disabled'] # do not support encryption
enable_read_lock_on_range = true
enable_version_vector = false
enable_version_vector_tlog_unicast = false
enable_version_vector_reply_recovery = false
[[test]]
testTitle = 'RangeLockCycle'

@ -7,6 +7,7 @@ enable_read_lock_on_range = true
transaction_lock_rejection_retriable = false
enable_version_vector = false
enable_version_vector_tlog_unicast = false
enable_version_vector_reply_recovery = false
[[test]]
testTitle = 'RangeLocking'

@ -7,6 +7,7 @@ allowCreatingTenants = false
proxy_use_resolver_private_mutations = false
enable_version_vector = false
enable_version_vector_tlog_unicast = false
enable_version_vector_reply_recovery = false
[[test]]
testTitle = 'RawTenantAccessClean'

@ -1,5 +1,8 @@
[[knobs]]
enable_version_vector = false
enable_version_vector_tlog_unicast = false
enable_version_vector_reply_recovery = false
max_read_transaction_life_versions = 1000000
max_write_transaction_life_versions = 1000000