mirror of
https://github.com/facebook/rocksdb.git
synced 2025-05-14 17:03:11 +08:00
Fix concurrent full purge and WAL recycling (#5900)
Summary: We were removing the file from `log_recycle_files_` before renaming it with `ReuseWritableFile()`. Since `ReuseWritableFile()` occurs outside the DB mutex, it was possible for a concurrent full purge to sneak in and delete the file before it could be renamed. Consequently, `SwitchMemtable()` would fail and the DB would enter read-only mode. The fix is to hold the old file number in `log_recycle_files_` until after the file has been renamed. Full purge uses that list to decide which files to keep, so it can no longer delete a file pending recycling. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5900 Test Plan: new unit test Differential Revision: D19771719 Pulled By: ajkr fbshipit-source-id: 094346349ca3fb499712e62de03905acc30b5ce8
This commit is contained in:
parent
0f9dcb88b2
commit
c6abe30ee3
@ -1383,6 +1383,7 @@ class DBImpl : public DB {
|
|||||||
Status ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
|
Status ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
|
||||||
WriteBatch* my_batch);
|
WriteBatch* my_batch);
|
||||||
|
|
||||||
|
// REQUIRES: mutex locked and in write thread.
|
||||||
Status ScheduleFlushes(WriteContext* context);
|
Status ScheduleFlushes(WriteContext* context);
|
||||||
|
|
||||||
void MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds);
|
void MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds);
|
||||||
@ -1454,10 +1455,10 @@ class DBImpl : public DB {
|
|||||||
// REQUIRES: mutex locked and in write thread.
|
// REQUIRES: mutex locked and in write thread.
|
||||||
void AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds);
|
void AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds);
|
||||||
|
|
||||||
// REQUIRES: mutex locked
|
// REQUIRES: mutex locked and in write thread.
|
||||||
Status SwitchWAL(WriteContext* write_context);
|
Status SwitchWAL(WriteContext* write_context);
|
||||||
|
|
||||||
// REQUIRES: mutex locked
|
// REQUIRES: mutex locked and in write thread.
|
||||||
Status HandleWriteBufferFull(WriteContext* write_context);
|
Status HandleWriteBufferFull(WriteContext* write_context);
|
||||||
|
|
||||||
// REQUIRES: mutex locked
|
// REQUIRES: mutex locked
|
||||||
|
@ -24,7 +24,9 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() {
|
|||||||
void DBImpl::TEST_SwitchWAL() {
|
void DBImpl::TEST_SwitchWAL() {
|
||||||
WriteContext write_context;
|
WriteContext write_context;
|
||||||
InstrumentedMutexLock l(&mutex_);
|
InstrumentedMutexLock l(&mutex_);
|
||||||
|
void* writer = TEST_BeginWrite();
|
||||||
SwitchWAL(&write_context);
|
SwitchWAL(&write_context);
|
||||||
|
TEST_EndWrite(writer);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DBImpl::TEST_WALBufferIsEmpty(bool lock) {
|
bool DBImpl::TEST_WALBufferIsEmpty(bool lock) {
|
||||||
@ -106,15 +108,18 @@ Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) {
|
|||||||
cfd = default_cf_handle_->cfd();
|
cfd = default_cf_handle_->cfd();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Status s;
|
||||||
|
void* writer = TEST_BeginWrite();
|
||||||
if (two_write_queues_) {
|
if (two_write_queues_) {
|
||||||
WriteThread::Writer nonmem_w;
|
WriteThread::Writer nonmem_w;
|
||||||
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
|
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
|
||||||
Status s = SwitchMemtable(cfd, &write_context);
|
s = SwitchMemtable(cfd, &write_context);
|
||||||
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
|
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
|
||||||
return s;
|
|
||||||
} else {
|
} else {
|
||||||
return SwitchMemtable(cfd, &write_context);
|
s = SwitchMemtable(cfd, &write_context);
|
||||||
}
|
}
|
||||||
|
TEST_EndWrite(writer);
|
||||||
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status DBImpl::TEST_FlushMemTable(bool wait, bool allow_write_stall,
|
Status DBImpl::TEST_FlushMemTable(bool wait, bool allow_write_stall,
|
||||||
|
@ -1329,6 +1329,8 @@ Status DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
|
|||||||
recycle_log_number);
|
recycle_log_number);
|
||||||
std::string old_log_fname =
|
std::string old_log_fname =
|
||||||
LogFileName(immutable_db_options_.wal_dir, recycle_log_number);
|
LogFileName(immutable_db_options_.wal_dir, recycle_log_number);
|
||||||
|
TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
|
||||||
|
TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
|
||||||
s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
|
s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
|
||||||
&lfile, /*dbg=*/nullptr);
|
&lfile, /*dbg=*/nullptr);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1631,7 +1631,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
|||||||
if (creating_new_log && immutable_db_options_.recycle_log_file_num &&
|
if (creating_new_log && immutable_db_options_.recycle_log_file_num &&
|
||||||
!log_recycle_files_.empty()) {
|
!log_recycle_files_.empty()) {
|
||||||
recycle_log_number = log_recycle_files_.front();
|
recycle_log_number = log_recycle_files_.front();
|
||||||
log_recycle_files_.pop_front();
|
|
||||||
}
|
}
|
||||||
uint64_t new_log_number =
|
uint64_t new_log_number =
|
||||||
creating_new_log ? versions_->NewFileNumber() : logfile_number_;
|
creating_new_log ? versions_->NewFileNumber() : logfile_number_;
|
||||||
@ -1668,6 +1667,14 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
|||||||
". Immutable memtables: %d.\n",
|
". Immutable memtables: %d.\n",
|
||||||
cfd->GetName().c_str(), new_log_number, num_imm_unflushed);
|
cfd->GetName().c_str(), new_log_number, num_imm_unflushed);
|
||||||
mutex_.Lock();
|
mutex_.Lock();
|
||||||
|
if (recycle_log_number != 0) {
|
||||||
|
// Since renaming the file is done outside DB mutex, we need to ensure
|
||||||
|
// concurrent full purges don't delete the file while we're recycling it.
|
||||||
|
// To achieve that we hold the old log number in the recyclable list until
|
||||||
|
// after it has been renamed.
|
||||||
|
assert(log_recycle_files_.front() == recycle_log_number);
|
||||||
|
log_recycle_files_.pop_front();
|
||||||
|
}
|
||||||
if (s.ok() && creating_new_log) {
|
if (s.ok() && creating_new_log) {
|
||||||
log_write_mutex_.Lock();
|
log_write_mutex_.Lock();
|
||||||
assert(new_log != nullptr);
|
assert(new_log != nullptr);
|
||||||
|
@ -557,6 +557,46 @@ TEST_F(DBWALTest, FullPurgePreservesRecycledLog) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DBWALTest, FullPurgePreservesLogPendingReuse) {
|
||||||
|
// Ensures full purge cannot delete a WAL while it's in the process of being
|
||||||
|
// recycled. In particular, we force the full purge after a file has been
|
||||||
|
// chosen for reuse, but before it has been renamed.
|
||||||
|
for (int i = 0; i < 2; ++i) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.recycle_log_file_num = 1;
|
||||||
|
if (i != 0) {
|
||||||
|
options.wal_dir = alternative_wal_dir_;
|
||||||
|
}
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
// The first flush creates a second log so writes can continue before the
|
||||||
|
// flush finishes.
|
||||||
|
ASSERT_OK(Put("foo", "bar"));
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
|
||||||
|
// The second flush can recycle the first log. Sync points enforce the
|
||||||
|
// full purge happens after choosing the log to recycle and before it is
|
||||||
|
// renamed.
|
||||||
|
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
||||||
|
{"DBImpl::CreateWAL:BeforeReuseWritableFile1",
|
||||||
|
"DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge"},
|
||||||
|
{"DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge",
|
||||||
|
"DBImpl::CreateWAL:BeforeReuseWritableFile2"},
|
||||||
|
});
|
||||||
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
rocksdb::port::Thread thread([&]() {
|
||||||
|
TEST_SYNC_POINT(
|
||||||
|
"DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge");
|
||||||
|
ASSERT_OK(db_->EnableFileDeletions(true));
|
||||||
|
TEST_SYNC_POINT(
|
||||||
|
"DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge");
|
||||||
|
});
|
||||||
|
ASSERT_OK(Put("foo", "bar"));
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
thread.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(DBWALTest, GetSortedWalFiles) {
|
TEST_F(DBWALTest, GetSortedWalFiles) {
|
||||||
do {
|
do {
|
||||||
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
||||||
|
Loading…
x
Reference in New Issue
Block a user